标签:other expec rect last next edr ace cst support
netty 是用 Recycler 实现对象池。
每个线程有一个 ThreadLocalMap 变量,ThreadLocalMap 本质是一个哈希表,用 index + 1 来避免槽冲突,键是 ThreadLocal 变量,值是尖括号里的对象。netty 里面大量使用 ThreadLocal,目的是减少多线程之间锁竞争。
// 每个线程一个 Stack,用作对象池 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Stack<T> value) { // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } };
// 每个线程有一个 Map,存储其他线程的 Stack -> WeakOrderQueue 对 private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() { @Override protected Map<Stack<?>, WeakOrderQueue> initialValue() { return new WeakHashMap<Stack<?>, WeakOrderQueue>(); } };
DefaultHandle 封装了对象,是直接存放在 Stack 中的元素
private static final class DefaultHandle<T> implements Handle<T> { Stack<?> stack; Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } }
Stack 是最直接的对象池,用数组实现的栈,存放 DefaultHandle
private static final class Stack<T> { // stack 属于某一个线程,用弱引用 final WeakReference<Thread> threadRef; DefaultHandle<?>[] elements; }
如果我们获取一个对象,需要调用
public final T get() { // 没有启用对象池 if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } // 获取当前对象的 Stack Stack<T> stack = threadLocal.get(); // 从栈顶推出一个元素 DefaultHandle<T> handle = stack.pop(); if (handle == null) { // 栈中没有元素,则需要创建对象 handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
用完对象之后,归还
public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } // DefaultHandle 绑定的 stack Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } stack.push(this); }
void push(DefaultHandle<?> item) { Thread currentThread = Thread.currentThread(); // 当前线程持有该 stack if (threadRef.get() == currentThread) { // 直接入栈 pushNow(item); } else { // 归还的对象不是由当前线程获取的 pushLater(item, currentThread); } }
private void pushLater(DefaultHandle<?> item, Thread thread) { if (maxDelayedQueues == 0) { // We don‘t support recycling across threads and should just drop the item on the floor. return; } // 当前线程的 Map Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this); if (queue == null) { if (delayedRecycled.size() >= maxDelayedQueues) { // Add a dummy queue so we know we should drop the object delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } // 创建 WeakOrderQueue if ((queue = newWeakOrderQueue(thread)) == null) { // drop object return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { // drop object return; } // DefaultHandle 放入 queue queue.add(item); }
那么,归还到 WeakOrderQueue 中的元素,啥时候用到呢
DefaultHandle<T> pop() { int size = this.size; if (size == 0) { if (!scavenge()) { return null; } size = this.size; if (size <= 0) { // double check, avoid races return null; } } size --; DefaultHandle ret = elements[size]; elements[size] = null; // As we already set the element[size] to null we also need to store the updated size before we do // any validation. Otherwise we may see a null value when later try to pop again without a new element // added before. this.size = size; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } ret.recycleId = 0; ret.lastRecycledId = 0; return ret; }
把 WeakOrderQueue 中的元素移到 Stack 中
boolean transfer(Stack<?> dst) { Link head = this.head.link; if (head == null) { return false; } if (head.readIndex == LINK_CAPACITY) { if (head.next == null) { return false; } head = head.next; this.head.relink(head); } final int srcStart = head.readIndex; int srcEnd = head.get(); final int srcSize = srcEnd - srcStart; if (srcSize == 0) { return false; } final int dstSize = dst.size; final int expectedCapacity = dstSize + srcSize; if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { final DefaultHandle[] srcElems = head.elements; final DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { DefaultHandle<?> element = srcElems[i]; if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } srcElems[i] = null; if (dst.dropHandle(element)) { // Drop the object. continue; } element.stack = dst; dstElems[newDstSize ++] = element; } if (srcEnd == LINK_CAPACITY && head.next != null) { // Add capacity back as the Link is GCed. this.head.relink(head.next); } head.readIndex = srcEnd; if (dst.size == newDstSize) { return false; } dst.size = newDstSize; return true; } else { // The destination stack is full already. return false; } }
标签:other expec rect last next edr ace cst support
原文地址:https://www.cnblogs.com/allenwas3/p/12229532.html