Exchanger是双向的数据传输,2个线程在一个同步点,交换数据。先到的线程会等待第二个线程执行exchange
SynchronousQueue,是2个线程之间单向的数据传输,一个put,一个take。
先举个例子说明一下如何使用
public class ExchangerDemo { public static void main(String[] args) { Exchanger<List<Integer>> exchanger = new Exchanger<>(); new Consumer(exchanger).start(); //方便调试,让consumer先执行exchange try { Thread.sleep(1000 * 5); } catch (InterruptedException e) { e.printStackTrace(); } new Producer(exchanger).start(); } static class Consumer extends Thread { List<Integer> list = new ArrayList<>(); Exchanger<List<Integer>> exchanger = null; public Consumer(Exchanger<List<Integer>> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { for (int i = 0; i < 1; i++) { try { list = exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print(list.get(0) + ", "); System.out.print(list.get(1) + ", "); System.out.print(list.get(2) + ", "); System.out.print(list.get(3) + ", "); System.out.println(list.get(4) + ", "); } } } static class Producer extends Thread { List<Integer> list = new ArrayList<>(); Exchanger<List<Integer>> exchanger = null; public Producer(Exchanger<List<Integer>> exchanger) { super(); this.exchanger = exchanger; } @Override public void run() { Random rand = new Random(); for (int i = 0; i < 1; i++) { list.clear(); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); list.add(rand.nextInt(10000)); try { list = exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
再看一下内部结构
private static final class Node extends AtomicReference<Object> { /** 创建这个节点的线程提供的用于交换的数据。 */ public final Object item; /** 等待唤醒的线程 */ public volatile Thread waiter; /** * Creates node with given item and empty hole. * @param item the item */ public Node(Object item) { this.item = item; } } /** * 一个Slot就是一对线程交换数据的地方。 * 这里对Slot做了缓存行填充,能够避免伪共享问题。 * 虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。 */ private static final class Slot extends AtomicReference<Object> { // Improve likelihood of isolation on <= 64 byte cache lines long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; } /** * Slot数组,在需要时才进行初始化。 * 用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。 */ private volatile Slot[] arena = new Slot[CAPACITY]; /** * arena(Slot数组)的容量。设置这个值用来避免竞争。 */ private static final int CAPACITY = 32; /** * 正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后, * 这个值会递增;当一个线程自旋等待超时后,这个值会递减。 */ private final AtomicInteger max = new AtomicInteger();
关键技术点1:CacheLine填充
交换数据的场所就是Slot,每个要进行数据交换的线程在内部会用一个Node来表示。Slot其实是一个AtomicReference
关键技术点2:锁分离
同ConcurrentHashMap类型,Exchange没有只定义一个slot,而是定义了一个slot的数组。这样在多线程调用exchange的时候,可以各自在不同的slot里面进行匹配。
exchange的基本思路如下:
(1)根据每个线程的thread id, hash计算出自己所在的slot index;
(2)如果运气好,这个slot被人占着(slot里面有node),并且有人正在等待交换,那就和它进行交换;
(3)slot为空的(slot里面没有node),自己占着,等人交换。没人交换,向前挪个位置,把当前slot里面内容取消,index减半,再看有没有交换;
(4)挪到0这个位置,还没有人交互,那就阻塞,一直等着。别的线程,也会一直挪动,直到0这个位置。
所以0这个位置,是一个交易的“终结点”位置!别的位置上找不到人交易,最后都会到0这个位置。
/** * 等待其他线程到达交换点,然后与其进行数据交换。 * * 如果其他线程到来,那么交换数据,返回。 * * 如果其他线程未到来,那么当前线程等待,知道如下情况发生: * 1.有其他线程来进行数据交换。 * 2.当前线程被中断。 */ public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) {//检测当前线程是否被中断。 //进行数据交换。 Object v = doExchange(x == null? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; //检测结果是否为null。 if (v != CANCEL) //检测是否被取消。 return (V)v; Thread.interrupted(); // 清除中断标记。 } throw new InterruptedException(); } /** * 等待其他线程到达交换点,然后与其进行数据交换。 * * 如果其他线程到来,那么交换数据,返回。 * * 如果其他线程未到来,那么当前线程等待,知道如下情况发生: * 1.有其他线程来进行数据交换。 * 2.当前线程被中断。 * 3.超时。 */ public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { if (!Thread.interrupted()) { Object v = doExchange(x == null? NULL_ITEM : x, true, unit.toNanos(timeout)); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; if (!Thread.interrupted()) throw new TimeoutException(); } throw new InterruptedException(); }
上面的方法都调用了doExchange方法,主要逻辑在这个方法里,分析下这个方法:
private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); int index = hashIndex(); //根据thread id计算出自己要去的那个交易位置(slot) int fails = 0; for (;;) { Object y; Slot slot = arena[index]; if (slot == null) createSlot(index); //slot = null,创建一个slot,然后会回到for循环,再次开始 else if ((y = slot.get()) != null && //slot里面有人等着(有Node),则尝试和其交换 slot.compareAndSet(y, null)) { //关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点 Node you = (Node)y; if (you.compareAndSet(null, item)) {//把Node里面的东西,换成自己的 LockSupport.unpark(you.waiter); //唤醒对方 return you.item; //自己把对方的东西拿走 } //关键点2:如果你运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始 } else if (y == null && //slot里面为空(没有Node),则自己把位置占住 slot.compareAndSet(null, me)) { if (index == 0) //如果是0这个位置,自己阻塞,等待别人来交换 return timed? awaitNanos(me, slot, nanos): await(me, slot); Object v = spinWait(me, slot); //不是0这个位置,自旋等待 if (v != CANCEL) //自旋等待的时候,运气好,有人来交换了,返回 return v; me = new Node(item); //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环 int m = max.get(); if (m > (index >>>= 1)) max.compareAndSet(m, m - 1); } else if (++fails > 1) { //失败 case1: slot有人,要交互,但被人家抢了 case2: slot没人,自己要占位置,又被人家抢了 int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; //3次匹配失败,把index扩大,再次开始for循环 else if (--index < 0) index = m; } } }
/** * 在下标为0的Slot上等待获取其他线程填充的值。 * 如果在Slot被填充之前超时或者被中断,那么操作失败。 */ private Object awaitNanos(Node node, Slot slot, long nanos) { int spins = TIMED_SPINS; long lastTime = 0; Thread w = null; for (;;) { Object v = node.get(); if (v != null) //如果已经被其他线程填充了值,那么返回这个值。 return v; long now = System.nanoTime(); if (w == null) w = Thread.currentThread(); else nanos -= now - lastTime; lastTime = now; if (nanos > 0) { if (spins > 0) --spins; //先自旋几次。 else if (node.waiter == null) node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。 else if (w.isInterrupted()) tryCancel(node, slot); //如果当前线程被中断,尝试取消node。 else LockSupport.parkNanos(node, nanos); //阻塞给定的时间。 } else if (tryCancel(node, slot) && !w.isInterrupted()) //超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点 return scanOnTimeout(node); } }
awaitNanos中的自旋次数为TIMED_SPINS,这里说明一下自旋次数:
/** * 单核处理器下这个自旋次数为0 * 多核情况下,这个值设置为大多数系统中上下文切换时间的平均值。 */ private static final int SPINS = (NCPU == 1) ? 0 : 2000; /** * 在有超时情况下阻塞等待之前自旋的次数。. * 超时等待的自旋次数之所以更少,是因为检测时间也需要耗费时间。 * 这里的值是一个经验值。 */ private static final int TIMED_SPINS = SPINS / 20;
最后看一下arena(Slot数组),默认的容量和实际使用的下标最大值:
private static final int CAPACITY = 32; /** * The value of "max" that will hold all threads without * contention. When this value is less than CAPACITY, some * otherwise wasted expansion can be avoided. */ private static final int FULL = Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
前面说过arena容量默认为32,目的是为了减少线程的竞争,但实际上对arena的使用不会超过FULL这个值(避免一些空间浪费)。这个值取的是32(默认CAPACITY)和CPU核心数量的一半,这两个数的较小值在减1的数和0的较大值.... 也就是说,如果CPU核很多的情况下,这个值最大也就是31,;如果是单核或者双核CPU,这个值就是0,也就是说只能用arena[0]。这也是为什么前面的hashIndex方法里面会做的(近似)取模操作比较复杂,因为实际的能使用的Slot数组范围可能不是2的幂。
出处:
http://blog.csdn.net/chunlongyu/article/details/52504895
http://brokendreams.iteye.com/blog/2253956