码迷,mamicode.com
首页 > 其他好文 > 详细

Exchanger 源码分析

时间:2018-12-15 18:44:56      阅读:128      评论:0      收藏:0      [点我收藏+]

标签:current   methods   false   rom   eset   spin   thread   creates   process   

Exchanger

此类提供对外的操作是同步的;
用于成对出现的线程之间交换数据【主场景】;
可以视作双向的同步队列;
可应用于基因算法、流水线设计、数据校对等场景

创建实例

    /**
     * arena 数组中两个已使用的 slot 之间的索引距离,将它们分开以避免错误的共享
     */
    private static final int ASHIFT = 5;

    /**
     * arena 数组的最大索引值,最大 size 值为 0xff+1
     */
    private static final int MMASK = 0xff;

    /**
     * Unit for sequence/version bits of bound field. Each successful
     * change to the bound also adds SEQ.
     */
    private static final int SEQ = MMASK + 1;

    /** JVM 的 CPU 核数,用于自旋和扩容控制 */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     *  arena 的最大索引值:原则上可以让所有线程不发生竞争
     */
    static final int FULL = NCPU >= MMASK << 1 ? MMASK : NCPU >>> 1;

    /**
     * 当前线程阻塞等待匹配节点前的自旋次数,CPU==1 时不进行自旋
     */
    private static final int SPINS = 1 << 10;

    /**
     * Value representing null arguments/returns from public methods.
     * 旧 API 不支持 null 值所以需要适配。
     */
    private static final Object NULL_ITEM = new Object();

    /**
     *  交换超时的返回值对象
     */
    private static final Object TIMED_OUT = new Object();

    @jdk.internal.vm.annotation.Contended static final class Node {
        // Arena index
        int index;          
        // Last recorded value of Exchanger.bound
        int bound;              
        // Number of CAS failures at current bound
        int collides;  
        // 自旋的伪随机数
        int hash;               
        // 线程的当前数据对象
        Object item;           
        // 匹配线程的数据对象
        volatile Object match; 
        // 驻留阻塞线程
        volatile Thread parked; 
    }

    /** 参与者 */
    static final class Participant extends ThreadLocal<Node> {
        @Override
        public Node initialValue() { return new Node(); }
    }

    /**
     *  每个线程的状态
     */
    private final Participant participant;

    /**
     *  消除数组,只在出现竞争时初始化。
     */
    private volatile Node[] arena;

    /**
     *  未发生竞争时使用的 slot
     */
    private volatile Node slot;

    /**
     * The index of the largest valid arena position, OR‘ed with SEQ
     * number in high bits, incremented on each update.  The initial
     * update from 0 to SEQ is used to ensure that the arena array is
     * constructed only once.
     */
    private volatile int bound;

    /**
     * Creates a new Exchanger.
     */
    public Exchanger() {
        participant = new Participant();
    }

线程间交换数据

    /**
     *  阻塞等待其他线程到达交换点后执行数据交换,支持中断
     */
    @SuppressWarnings("unchecked")
    public V exchange(V x) throws InterruptedException {
        Object v;
        // 将目标对象 v 进行编码
        final Object item = x == null ? NULL_ITEM : x; // translate null args
        /**
         * 1)arena==null,表示未出现线程竞争,则使用 slot 进行数据交换
         * 2)线程已经中断,则抛出 InterruptedException
         * 3)arena!=null,则使用 arena 中的 slot 进行数据交换
         */
        if ((arena != null ||
                (v = slotExchange(item, false, 0L)) == null) &&
                (Thread.interrupted() || // disambiguates null return
                        (v = arenaExchange(item, false, 0L)) == null)) {
            throw new InterruptedException();
        }
        // 解码目标对象
        return v == NULL_ITEM ? null : (V)v;
    }

    /**
     *  未出现竞争时的数据交换方式
     * @param item  需要交换的目标对象
     * @param timed 是否是超时模式
     * @param ns    超时的纳秒数
     * @return 
     *  1)目标线程的数据对象
     *  2)null slot 交换出现竞争、线程被中断
     *  3)TIMED_OUT 交换超时
     */
    private final Object slotExchange(Object item, boolean timed, long ns) {
        // 读取参与者节点
        final Node p = participant.get();
        // 读取当前线程
        final Thread t = Thread.currentThread();
        // 线程被设置了中断标识,则返回 null
        if (t.isInterrupted()) {
            return null;
        }

        for (Node q;;) {
            // 1)已经有线程在阻塞等待交换数据
            if ((q = slot) != null) {
                // 将 slot 置为 null
                if (SLOT.compareAndSet(this, q, null)) {
                    // 读取目标对象
                    final Object v = q.item;
                    // 写入交换对象
                    q.match = item;
                    // 如果线程在阻塞等待
                    final Thread w = q.parked;
                    if (w != null) {
                        // 则唤醒交换线程
                        LockSupport.unpark(w);
                    }
                    // 返回交换到的对象
                    return v;
                }
                /**
                 * NCPU > 1 多核 CPU 才会启用竞技场 && 
                 * 设置最大有效的 arena 索引值
                 */
                if (NCPU > 1 && bound == 0 &&
                        BOUND.compareAndSet(this, 0, SEQ)) {
                    // 创建竞技场
                    arena = new Node[FULL + 2 << ASHIFT];
                }
            }
            // 2)启用了 arena
            else if (arena != null) {
                return null; // caller must reroute to arenaExchange
            // 3)slot 为空 && 未启用 arena
            } else {
                // 写入交换数据
                p.item = item;
                // 将 Node 写入 slot,成功则退出循环
                if (SLOT.compareAndSet(this, null, p)) {
                    break;
                }
                // 出现竞争,则重试
                p.item = null;
            }
        }

        // 等待释放
        int h = p.hash;
        // 计算截止时间
        final long end = timed ? System.nanoTime() + ns : 0L;
        // 计算自旋次数,多核 CPU 为 1024
        int spins = NCPU > 1 ? SPINS : 1;
        Object v;
        // 只要没有匹配的交换数据
        while ((v = p.match) == null) {
            // 1)自旋还未完成
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0) {
                    h = SPINS | (int)t.getId();
                } else if (h < 0 && (--spins & (SPINS >>> 1) - 1) == 0) {
                    Thread.yield();
                }
            }
            // 2)slot 已经更新
            else if (slot != p) {
                spins = SPINS;
            /**
             * 3)线程未中断 && 未启用竞技场 && 不是超时模式;
             *  如果是超时模式,则计算剩余时间,当前还未超时  
             */
            } else if (!t.isInterrupted() && arena == null &&
                    (!timed || (ns = end - System.nanoTime()) > 0L)) {
                // 写入驻留线程
                p.parked = t;
                // 如果 slot 未更新,没有线程来进行数据交换
                if (slot == p) {
                    // 1)阻塞等待
                    if (ns == 0L) {
                        LockSupport.park(this);
                    // 2)超时阻塞等待 
                    } else {
                        LockSupport.parkNanos(this, ns);
                    }
                }
                // 线程释放后,清空 parked
                p.parked = null;
            }
            // 如果线程被中断或已经超时,则将 slot 清空
            else if (SLOT.compareAndSet(this, p, null)) {
                // 如果是超时,则返回 TIMED_OUT;线程中断,则返回 null
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // 清空 match
        MATCH.setRelease(p, null);
        // 清空 item
        p.item = null;
        p.hash = h;
        // 返回交换到的数据对象
        return v;
    }

    /**
     * Exchange function when arenas enabled. See above for explanation.
     *
     * @param item  需要交换的目标对象
     * @param timed 是否是超时模式
     * @param ns    超时的纳秒数
     * @return
     *  1)目标线程的数据对象
     *  2)null 线程被中断
     *  3)TIMED_OUT 交换超时
     */
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        // 读取 arena
        final Node[] a = arena;
        // 读取数组长度
        final int alen = a.length;
        // 读取当前线程的参与者,初始值为 0
        final Node p = participant.get();
        for (int i = p.index;;) { // access slot at i
            int b, m, c;
            // 一般为 31
            int j = (i << ASHIFT) + (1 << ASHIFT) - 1;
            if (j < 0 || j >= alen) {
                j = alen - 1;
            }
            // 读取指定 slot 的 Node
            final Node q = (Node) AA.getAcquire(a, j);
            // 1)目标 slot 已经有线程在等待交换数据,则尝试清空 slot
            if (q != null && AA.compareAndSet(a, j, q, null)) {
                // 读取目标对象
                final Object v = q.item; // release
                // 写入交换对象
                q.match = item;
                final Thread w = q.parked;
                if (w != null) {
                    // 唤醒驻留线程
                    LockSupport.unpark(w);
                }
                // 返回交换到的值
                return v;
            // 2)目标索引 i 在有效索引范围内 && slot 为 null 
            } else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // 写入 item
                p.item = item; // offer
                // 写入节点
                if (AA.compareAndSet(a, j, null, p)) {
                    // 计算截止时间
                    final long end = timed && m == 0 ? System.nanoTime() + ns : 0L;
                    // 读取当前线程
                    final Thread t = Thread.currentThread(); // wait
                    // 读取自旋次数 1024
                    for (int h = p.hash, spins = SPINS;;) {
                        // 读取匹配数据
                        final Object v = p.match;
                        // 1)已经有线程将交换数据写入
                        if (v != null) {
                            MATCH.setRelease(p, null);
                            p.item = null; // clear for next use
                            p.hash = h;
                            return v;
                        // 2)自旋还未结束 
                        } else if (spins > 0) {
                            h ^= h << 1;
                            h ^= h >>> 3;
                            h ^= h << 10; // xorshift
                            if (h == 0) {
                                h = SPINS | (int) t.getId();
                            } else if (h < 0 && // approx 50% true
                                    (--spins & (SPINS >>> 1) - 1) == 0) {
                                Thread.yield(); // two yields per wait
                            }
                        // 3)slot 已经更新  
                        } else if (AA.getAcquire(a, j) != p) {
                            spins = SPINS; // releaser hasn‘t set match yet
                        // 4) 线程未中断、未超时
                        } else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) {
                            // 写入驻留线程
                            p.parked = t; // minimize window
                            // 如果 slot 未更新,则线程被阻塞
                            if (AA.getAcquire(a, j) == p) {
                                if (ns == 0L) {
                                    LockSupport.park(this);
                                } else {
                                    LockSupport.parkNanos(this, ns);
                                }
                            }
                            p.parked = null;
                        // 5)slot 未更新 && 线程超时或中断,则清空 slot   
                        } else if (AA.getAcquire(a, j) == p && AA.compareAndSet(a, j, p, null)) {
                            if (m != 0) {
                                BOUND.compareAndSet(this, b, b + SEQ - 1);
                            }
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1; // descend
                            // 线程被中断
                            if (Thread.interrupted()) {
                                return null;
                            }
                            // 线程超时
                            if (timed && m == 0 && ns <= 0L) {
                                return TIMED_OUT;
                            }
                            break; // expired; restart
                        }
                    }
                // 2)写入 slot 出现竞争   
                } else {
                    p.item = null; // clear offer
                }
            } else {
                if (p.bound != b) { // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = i != m || m == 0 ? m : m - 1;
                } else if ((c = p.collides) < m || m == FULL || !BOUND.compareAndSet(this, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = i == 0 ? m : i - 1; // cyclically traverse
                } else {
                    i = m + 1; // grow
                }
                p.index = i;
            }
        }
    }

Exchanger 源码分析

标签:current   methods   false   rom   eset   spin   thread   creates   process   

原文地址:https://www.cnblogs.com/zhuxudong/p/10124099.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!