标签:关联 大量 上进 div 大致 依次 values abort 定义函数
一般运用的场景是一组线程希望同一时候到达某个运行点后(先到达的会被堵塞),运行一个指定任务,然后这些线程才被唤醒继续运行其他任务。
Phaser对照起CyclicBarrier。不仅它是能够反复同步,而且parties数是能够动态注冊的,另外还提供了非堵塞的arrive方法表示先到达阶段等,大大提高了同步模型的灵活性,当然了,实现也会相对复杂。
阶段号从0開始。当全部parties都到达阶段的时候就会加一。直到Integer.MAX_VALUE后返回0。阶段号的使用同意在到达某个阶段以及在等待其他parites的时候,通过下面两类能够被不论什么注冊过的party调用的方法。运行单独的操作:
这些方法不会堵塞。但会返回一个关联的到达阶段号(arrival phase number)。也就是,phaser在到达以后所用的阶段号。
当最后的party到达一个给定的阶段,就能够运行可选的操作,而且阶段号自加一。
这些操作都会被触发阶段添加的party运行。而且会被可重写方法onAdvance(同一时候管理Phaser的终结)安排管理。重写onAdvance方法比起CyclicBarrier提供的栅栏操作非常相似。但更加灵活。
当然也有可中断和超时版本号。可是当任务等待发生中断或者超时遇到的异常也不会改变phaser的状态。假设必要,你能够在这些异常的处理器里运行关联的恢复操作,通常是在调用forceTermination之后恢复。Phaser可能也会被运行在ForkJoinPool中的任务使用,这样当其他线程在等待phaser添加被堵塞的时候,就能够确保有效平行地运行任务。
终结(Termination)
phaser可能进入一个终结状态。能够通过isTerminated来检查。当终结的时候。全部的同步方法都不会在等待下一个阶段而直接返回。返回一个负值来表示该状态。类似地,在终结的时候尝试注冊没有不论什么效果。
当onAdvance调用返回true的时候就会触发终结。
onAdvance默认实现为当一个反注冊导致注冊parties数降为0的时候返回true。当phser要控制操作在一个固定得迭代次数时。就能够非常方便地重写这种方法,当当前阶段号到达阀值得时候就返回true导致终结。forceTermination方法也时还有一个能够突然释放等待线程而且同意它们终结。
堆叠(Tiering)
Phaser能够被堆叠在一起(也就是说,以树形结构构造)来减少竞争。Phaser的parties数非常大的时候。以一组子phasers共享一个公共父亲能够减轻严重的同步竞争的成本。这样做能够大大提供吞吐量,但同一时候也会导致每一个操作的更高的成本。
在一棵堆叠的phaser树中,子phaser在父亲上的注冊和反注冊都会被自己主动管理。当子phaser的注冊parties树为非0的时候。子phaser就会注冊到父亲上。
当因为arriveAndDeregister的调用使注冊的parties数变为0时,子phaser就会从父亲中反注冊。这样就算父phaser的全部parties都到达了阶段,也必须等待子phaser的全部parties都到达了阶段并显式调用父phaser的awaitAdvance才算到达新的阶段。反之亦然。这样父phaser或者子phaser里注冊过的全部parties就能够一起互相等待到新的阶段。
另外,在这个堆叠结构的实现里,能够确保root结点必定是最先更新阶段号。然后才到其子结点,逐渐传递下去。
+------+ +------+ +------+ | root | <-- |parent| <-- | this | +------+ +------+ +------+ parties:3+1 parties:3+1 parties:3如上图所看到的,假设parties数多的时候,能够依据堆叠成为一颗树。这里假设root和parent和this都各初始化3个parties数。然后假设当前结点this有注冊parties数,则会在parent上注冊一个parties,因此其实root和parent都注冊了4个parties数。这样,假设this结点的3个parties数都到达了。就会调用parent的arrive,把parties数减去一,然后parent等待自己3个parties数都到达,就会调用root来减去一,这样root的3个parties数都到达就会一同释放全部等待结点,就实现了整棵树parties之间同步等待的功能。另外这个结构也非常easy看到root结点是最快进行阶段增长的。
这样做最大的优点就是降低对同一个state变量的CAS竞争带来的性能下降。只是同一时候每一个同步操作也会添加对应的负担(每次获取状态都要和root进行阶段同步),所以一般在高并发下造成的性能下降才考虑。
监控(Monitoring)
同步方法仅仅能被注冊的parties调用时,phaser的当前状态能够被不论什么调用者监控。在不论什么时刻。有getRegisteredParties总共的parties。当中,有getArrivedParties个parites到达getPhase的当前阶段。当剩下getUnarrivedParties个parties到达。phase添加。这些方法的返回值可能反映短暂的状态,因此一般在同步控制中不太实用。toString方法以一种能够方便信息监控的格式返回这些状态的快照。
Phaser的详细使用方法能够參考样例:这里private volatile long state; private static final int MAX_PARTIES = 0xffff; private static final int MAX_PHASE = Integer.MAX_VALUE; private static final int PARTIES_SHIFT = 16; private static final int PHASE_SHIFT = 32; private static final int UNARRIVED_MASK = 0xffff; // to mask ints private static final long PARTIES_MASK = 0xffff0000L; // to mask longs private static final long COUNTS_MASK = 0xffffffffL; private static final long TERMINATION_BIT = 1L << 63; // some special values private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; private static final int EMPTY = 1; //内部状态辅助方法 private static int unarrivedOf(long s) { int counts = (int)s; return (counts == EMPTY) ?state变量为long类型,长度为64位。当中:0 : (counts & UNARRIVED_MASK); } private static int partiesOf(long s) { return (int)s >>> PARTIES_SHIFT; } private static int phaseOf(long s) { return (int)(s >>> PHASE_SHIFT); } private static int arrivedOf(long s) { int counts = (int)s; return (counts == EMPTY) ? 0 : (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); }
public int register() { return doRegister(1); } public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) return getPhase(); return doRegister(parties); }两者实现都非常easy,bulkRegister方法中加入了对parties数的检查。两个方法都调用了doRegister方法实现。
private int doRegister(int registrations) { // adjustment to state long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { long s = (parent == null) ?doRegister方法做了下面事情:state : reconcileState(); int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) break; if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0) // wait out advance root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) break; } } else if (parent == null) { // 1st root registration long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } else { synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock phase = parent.doRegister(1); if (phase < 0) break; // finish registration whenever parent registration // succeeded, even when racing with termination, // since these are part of the same "transaction". while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } break; } } } } return phase; }
假设CAS成功就能够break推出循环同步。
其实,这里的unarrived推断相当重要。假设这样的情况,假设此时仅仅剩下最后一个未到达的parties数,而它刚好调用了arrive到达阶段,由于考虑到最后一个到达的party必须运行onAdvance函数,假设此时有新的注冊party,则要又一次等待party完毕,但已经造成错误的onAdvance调用,因此必需要在最后的party到达的时候禁止注冊。到达函数doArrive中有两次CAS的调用(具体实如今以下具体说明),第一次会把当前的未到达状态数变为0(这个CAS同一时候也是表示最后一个party已经到达)。第二次在调用onAdvance后。会又一次设置新的状态值。在doRegister函数里。unarrived的推断能够防止第一个CAS以后(即最后一个party到达)时会运行CAS来注冊新的party(由于此时unarrived==0。doRegister会进入堵塞等待doArrive完毕第二个CAS)。
获取synchronized锁后,再又一次检查一次状态是否发生变化,然后就调用parent.doRegister(1)向父phaser注冊自己。然后假设父phaser注冊成功(返回的phase>=0),就要利用自旋CAS把当前状态加入adjust,注意这个自旋就是强制当前状态值必需要成功注冊,这是由于这个和父phaser注冊都属于同一个原子事务,要在锁里面完毕。否则可能会状态不一致。
private long reconcileState() { final Phaser root = this.root; long s = state; if (root != this) { int phase, p; // CAS to root phase with current parties, tripping unarrived while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && !UNSAFE.compareAndSwapLong (this, stateOffset, s, s = (((long)phase << PHASE_SHIFT) | ((phase < 0) ?reconcileState主要目的是和根结点保持阶段号同步。前面说过。假设出现堆叠情况,根结点是最先进行阶段号添加,尽管阶段号添加的操作会逐渐传递到子phaser,但某些同步操作,如动态注冊等,须要立即获悉整棵树的阶段号状态避免多余的CAS,因此就须要显式和根结点保持同步。reconcileState实现就是如此,假设root!=this。即发生堆叠。就利用自旋CAS把当前改动状态值,要注意的是因为阶段号添加。会同一时候会把未到达的parties数设置为原来的注冊parties数。主要实现都是移位和掩位操作,就不再赘述。(s & COUNTS_MASK) : (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : ((s & PARTIES_MASK) | p)))))) s = state; } return s; }
int awaitAdvance(int phase) int awaitAdvanceInterruptibly(int phase) throws InterruptedException int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException这三个方法的实现事实上都大同小异,主要是添加来对中断和超时的控制。详细实现例如以下:
public int awaitAdvance(int phase) { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) return root.internalAwaitAdvance(phase, null); return p; } public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { //省略一样的代码 if (p == phase) { QNode node = new QNode(this, phase, true, false, 0L); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); } return p; } public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long nanos = unit.toNanos(timeout); //省略一样的代码 if (p == phase) { QNode node = new QNode(this, phase, true, true, nanos); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); else if (p == phase) throw new TimeoutException(); } return p; }三者实现大致结构都一样。首先获取当前状态值。假设堆叠则调用reconcileState获取根结点同步后的状态值。然后假设当前阶段号与请求等待的阶段号相等,则调用根结点的internalAwaitAdvance方法(根结点是最先进行阶段号增长)。
static final class QNode implements ForkJoinPool.ManagedBlocker { //省略其他成员变量以及构造函数 QNode next; public boolean isReleasable() { if (thread == null) return true; if (phaser.getPhase() != phase) { thread = null; return true; } if (Thread.interrupted()) wasInterrupted = true; if (wasInterrupted && interruptible) { thread = null; return true; } if (timed) { if (nanos > 0L) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } if (nanos <= 0L) { thread = null; return true; } } return false; } public boolean block() { if (isReleasable()) return true; else if (!timed) LockSupport.park(this); else if (nanos > 0) LockSupport.parkNanos(this, nanos); return isReleasable(); } }Phaser的等待队列使用的是Treiber无锁算法的栈操作。
样例实现能够參考这里。
首先能够注意到QNode类是实现了ForkJoinPool.ManagedBlocker接口,这个接口能够确保假设使用ForkJoinWorkerThread的时候就能够保持并发运行任务。
//NCPU是当前CPU数量 static final int SPINS_PER_ARRIVAL = (NCPU < 2) ?函数做了下面事情:1 : 1 << 8; private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; releaseWaiters(phase-1); // ensure old queue clean boolean queued = false; // true when node is enqueued int lastUnarrived = 0; // to increase spins upon change int spins = SPINS_PER_ARRIVAL; long s; int p; while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { if (node == null) { // spinning in noninterruptible mode int unarrived = (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; boolean interrupted = Thread.interrupted(); if (interrupted || --spins < 0) { // need node to record intr node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } else if (node.isReleasable()) // done or aborted break; else if (!queued) { // push onto queue AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); } else { try { ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { node.wasInterrupted = true; } } } if (node != null) { if (node.thread != null) node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } releaseWaiters(phase); return p; }
因为这两者都非常相似,因此一并剖析。
private void releaseWaiters(int phase) { QNode q; // first element of queue Thread t; // its thread AtomicReference<QNode> head = (phase & 1) == 0 ?releaseWaiters方法主要利用自旋从head结点起把队列里的结点出队,假设结点的thread引用为非null。则顺便唤醒。evenQ : oddQ; while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { if (head.compareAndSet(q, q.next) && (t = q.thread) != null) { q.thread = null; LockSupport.unpark(t); } } } private int abortWait(int phase) { AtomicReference<QNode> head = (phase & 1) == 0 ?
evenQ : oddQ; for (;;) { Thread t; QNode q = head.get(); int p = (int)(root.state >>> PHASE_SHIFT); if (q == null || ((t = q.thread) != null && q.phase == p)) return p; if (head.compareAndSet(q, q.next) && t != null) { q.thread = null; LockSupport.unpark(t); } } }
另外注意的是。每次出队前都会推断当前结点的阶段号是否与状态的阶段号相等,这里的状态阶段号用的是root.state,这是考虑到堆叠的情况。
int arrive() //一个party到达 int arriveAndDeregister() //一个party到达而且反注冊这个party这两个函数的实现都非常easy:
public int arrive() { return doArrive(ONE_ARRIVAL); } public int arriveAndDeregister() { return doArrive(ONE_DEREGISTER); }主要是调用doArrive实现,doArrive实现例如以下
private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ?doArrive看上去非常复杂,但事实上逻辑并不算太复杂。0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { if (unarrived == 1) { long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); releaseWaiters(phase); } else if (nextUnarrived == 0) { // propagate deregistration phase = parent.doArrive(ONE_DEREGISTER); UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else phase = parent.doArrive(ONE_ARRIVAL); } return phase; } } }
public int arriveAndAwaitAdvance() { final Phaser root = this.root; for (;;) { long s = (root == this) ?函数的大致结构和doArrive几乎相同,在CAS之后假设unarrived大于1,则须要调用根结点的internalAwaitAdvance进行堵塞等待直到阶段号增长。假设unarrived小于等于1,则假设有堆叠发生(root != this)则调用父phaser的arriveAndAwaitAdvance。否则的话调用onAdvance,而且调用CAS把状态更新,然后调用releaseWaiters把之前的阶段等待队列释放。该函数对照起先调用arrive和awaitAdvance。更加方便而且因为降低了一些多余的变量读取和逻辑。速度更加快。state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ?
0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) return root.internalAwaitAdvance(phase, null); if (root != this) return parent.arriveAndAwaitAdvance(); long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) return (int)(state >>> PHASE_SHIFT); // terminated releaseWaiters(phase); return nextPhase; } } }
考虑到高并发条件下的CAS竞争,也提供了不同机制去优化性能(堆叠,自旋等)。
标签:关联 大量 上进 div 大致 依次 values abort 定义函数
原文地址:http://www.cnblogs.com/lytwajue/p/7258278.html