标签:atomic help 构造 leak link array 系统 rev oid
文章主要介绍的是PoolArena,PoolChunk,PoolSubpage 三个类的源码
PoolArena 是netty 的内存池实现类,通过预先申请一块大的空间,然后对空间进行分配和回收,这样就不用频繁进行系统调用,提高性能。 PoolArena 由多个 chunk 组成,chunk 则由多个page 组成.
Chunk 主要用来组织和管理多个Page的内存分配和释放,在netty中,Chunk 中的Page被构造成一棵二叉树。
PoolSubpage保存long[] 数组表示占用情况。数组中一个long元素最多表示64分空间的占用情况,假如某块空间分的分数多于64份,那么数组中加多一个元素,例如4字节分4份,long元素的第四位就可以表示完,要是128字节分128份,long数组就要添加多一个元素共2个元素,一个long表示64份。
上面就是我们刚才讲的几个类的类结构。
final class PoolSubpage<T> {
//保持一个 PoolChunk
final PoolChunk<T> chunk;
final int memoryMapIdx;
final int runOffset;
final int pageSize;
final long[] bitmap;
//前后指针
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;
int maxNumElems;
int nextAvail;
int bitmapLength;
int numAvail;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
/** Special constructor that creates a linked list head */
PoolSubpage(int pageSize) {
chunk = null;
memoryMapIdx = -1;
runOffset = -1;
elemSize = -1;
this.pageSize = pageSize;
bitmap = null;
}
...
final class PoolChunkList<T> {
private final PoolArena<T> arena;
//前后指针
private final PoolChunkList<T> nextList;
PoolChunkList<T> prevList;
...
}
abstract class PoolArena<T> {
final PooledByteBufAllocator parent;
private final int pageSize;
private final int maxOrder;
private final int pageShifts;
private final int chunkSize;
private final int subpageOverflowMask;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(512 >>> 4);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
smallSubpagePools = newSubpagePoolArray(pageShifts - 9);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE);
q075 = new PoolChunkList<T>(this, q100, 75, 100);
q050 = new PoolChunkList<T>(this, q075, 50, 100);
q025 = new PoolChunkList<T>(this, q050, 25, 75);
q000 = new PoolChunkList<T>(this, q025, 1, 50);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25);
q100.prevList = q075;
q075.prevList = q050;
q050.prevList = q025;
q025.prevList = q000;
q000.prevList = null;
qInit.prevList = qInit;
}
....
开始看 PooledDirectByteBuf 的方法有点乱,我们可以从调用它的那个类开始分析。 PooledByteBufAllocator内部就是可以调用 PooledDirectByteBuf 生成 ByteBuf的类。它的使用如下 :
public static void main(String[] args){
/*
* 内存池buffer 提供为外部的使用主要是通过 PooledByteBufAllocator 这个类堆外提供
*/
PooledByteBufAllocator allocator = new PooledByteBufAllocator();
ByteBuf directBuffer = allocator.directBuffer();
}
下面我们看一下 directBuffer 方法。
@Override
public ByteBuf directBuffer() {
return directBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
//校验
validate(initialCapacity, maxCapacity);
//子类实现,申请空间
return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator 类中的实现
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//No.1 缓冲中获取
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
//假如缓冲中的 PoolArena 不存在
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
// PooledByteBufAllocator 类中的threadCache ,是个ThreadLocal
final ThreadLocal<PoolThreadCache> threadCache = new ThreadLocal<PoolThreadCache>() {
private final AtomicInteger index = new AtomicInteger();
@Override
protected PoolThreadCache initialValue() {
final int idx = index.getAndIncrement();
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
if (heapArenas != null) {
heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
} else {
heapArena = null;
}
if (directArenas != null) {
directArena = directArenas[Math.abs(idx % directArenas.length)];
} else {
directArena = null;
}
return new PoolThreadCache(heapArena, directArena);
}
};
标签:atomic help 构造 leak link array 系统 rev oid
原文地址:https://www.cnblogs.com/Benjious/p/11634891.html