码迷,mamicode.com
首页 > Web开发 > 详细

netty 的 Recycler

时间:2020-01-22 22:02:32      阅读:113      评论:0      收藏:0      [点我收藏+]

标签:other   expec   rect   last   next   edr   ace   cst   support   

netty 是用 Recycler 实现对象池。

每个线程有一个 ThreadLocalMap 变量,ThreadLocalMap 本质是一个哈希表,用 index + 1 来避免槽冲突,键是 ThreadLocal 变量,值是尖括号里的对象。netty 里面大量使用 ThreadLocal,目的是减少多线程之间锁竞争。

// 每个线程一个 Stack,用作对象池
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
    @Override
    protected Stack<T> initialValue() {
        return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                interval, maxDelayedQueuesPerThread);
    }

    @Override
    protected void onRemoval(Stack<T> value) {
        // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
        if (value.threadRef.get() == Thread.currentThread()) {
           if (DELAYED_RECYCLED.isSet()) {
               DELAYED_RECYCLED.get().remove(value);
           }
        }
    }
};
// 每个线程有一个 Map,存储其他线程的 Stack -> WeakOrderQueue 对
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
    @Override
    protected Map<Stack<?>, WeakOrderQueue> initialValue() {
        return new WeakHashMap<Stack<?>, WeakOrderQueue>();
    }
};

DefaultHandle 封装了对象,是直接存放在 Stack 中的元素

private static final class DefaultHandle<T> implements Handle<T> {
    Stack<?> stack;
    Object value;

    DefaultHandle(Stack<?> stack) {
        this.stack = stack;
    }
}

Stack 是最直接的对象池,用数组实现的栈,存放 DefaultHandle

private static final class Stack<T> {
    // stack 属于某一个线程,用弱引用
    final WeakReference<Thread> threadRef;

    DefaultHandle<?>[] elements;

}

如果我们获取一个对象,需要调用 

public final T get() {
    // 没有启用对象池
    if (maxCapacityPerThread == 0) {
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    // 获取当前对象的 Stack
    Stack<T> stack = threadLocal.get();
    // 从栈顶推出一个元素
    DefaultHandle<T> handle = stack.pop();
    if (handle == null) {
        // 栈中没有元素,则需要创建对象
        handle = stack.newHandle();
        handle.value = newObject(handle);
    }
    return (T) handle.value;
}

用完对象之后,归还

public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }

    // DefaultHandle 绑定的 stack
    Stack<?> stack = this.stack;
    if (lastRecycledId != recycleId || stack == null) {
        throw new IllegalStateException("recycled already");
    }

    stack.push(this);
}
void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    // 当前线程持有该 stack
    if (threadRef.get() == currentThread) {
        // 直接入栈
        pushNow(item);
    } else {
        // 归还的对象不是由当前线程获取的
        pushLater(item, currentThread);
    }
}
private void pushLater(DefaultHandle<?> item, Thread thread) {
    if (maxDelayedQueues == 0) {
        // We don‘t support recycling across threads and should just drop the item on the floor.
        return;
    }

    // 当前线程的 Map
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    WeakOrderQueue queue = delayedRecycled.get(this);
    if (queue == null) {
        if (delayedRecycled.size() >= maxDelayedQueues) {
            // Add a dummy queue so we know we should drop the object
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        // 创建 WeakOrderQueue
        if ((queue = newWeakOrderQueue(thread)) == null) {
            // drop object
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        // drop object
        return;
    }
    
    // DefaultHandle 放入 queue
    queue.add(item);
}

那么,归还到 WeakOrderQueue 中的元素,啥时候用到呢

DefaultHandle<T> pop() {
    int size = this.size;
    if (size == 0) {
        if (!scavenge()) {
            return null;
        }
        size = this.size;
        if (size <= 0) {
            // double check, avoid races
            return null;
        }
    }
    size --;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    // As we already set the element[size] to null we also need to store the updated size before we do
    // any validation. Otherwise we may see a null value when later try to pop again without a new element
    // added before.
    this.size = size;

    if (ret.lastRecycledId != ret.recycleId) {
        throw new IllegalStateException("recycled multiple times");
    }
    ret.recycleId = 0;
    ret.lastRecycledId = 0;
    return ret;
}

把 WeakOrderQueue 中的元素移到 Stack 中

boolean transfer(Stack<?> dst) {
    Link head = this.head.link;
    if (head == null) {
        return false;
    }

    if (head.readIndex == LINK_CAPACITY) {
        if (head.next == null) {
            return false;
        }
        head = head.next;
        this.head.relink(head);
    }

    final int srcStart = head.readIndex;
    int srcEnd = head.get();
    final int srcSize = srcEnd - srcStart;
    if (srcSize == 0) {
        return false;
    }

    final int dstSize = dst.size;
    final int expectedCapacity = dstSize + srcSize;

    if (expectedCapacity > dst.elements.length) {
        final int actualCapacity = dst.increaseCapacity(expectedCapacity);
        srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
    }

    if (srcStart != srcEnd) {
        final DefaultHandle[] srcElems = head.elements;
        final DefaultHandle[] dstElems = dst.elements;
        int newDstSize = dstSize;
        for (int i = srcStart; i < srcEnd; i++) {
            DefaultHandle<?> element = srcElems[i];
            if (element.recycleId == 0) {
                element.recycleId = element.lastRecycledId;
            } else if (element.recycleId != element.lastRecycledId) {
                throw new IllegalStateException("recycled already");
            }
            srcElems[i] = null;

            if (dst.dropHandle(element)) {
                // Drop the object.
                continue;
            }
            element.stack = dst;
            dstElems[newDstSize ++] = element;
        }

        if (srcEnd == LINK_CAPACITY && head.next != null) {
            // Add capacity back as the Link is GCed.
            this.head.relink(head.next);
        }

        head.readIndex = srcEnd;
        if (dst.size == newDstSize) {
            return false;
        }
        dst.size = newDstSize;
        return true;
    } else {
        // The destination stack is full already.
        return false;
    }
}

netty 的 Recycler

标签:other   expec   rect   last   next   edr   ace   cst   support   

原文地址:https://www.cnblogs.com/allenwas3/p/12229532.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!