标签:关联 大量 上进 div 大致 依次 values abort 定义函数
一般运用的场景是一组线程希望同一时候到达某个运行点后(先到达的会被堵塞),运行一个指定任务,然后这些线程才被唤醒继续运行其他任务。
Phaser对照起CyclicBarrier。不仅它是能够反复同步,而且parties数是能够动态注冊的,另外还提供了非堵塞的arrive方法表示先到达阶段等,大大提高了同步模型的灵活性,当然了,实现也会相对复杂。
阶段号从0開始。当全部parties都到达阶段的时候就会加一。直到Integer.MAX_VALUE后返回0。阶段号的使用同意在到达某个阶段以及在等待其他parites的时候,通过下面两类能够被不论什么注冊过的party调用的方法。运行单独的操作:
这些方法不会堵塞。但会返回一个关联的到达阶段号(arrival phase number)。也就是,phaser在到达以后所用的阶段号。
当最后的party到达一个给定的阶段,就能够运行可选的操作,而且阶段号自加一。
这些操作都会被触发阶段添加的party运行。而且会被可重写方法onAdvance(同一时候管理Phaser的终结)安排管理。重写onAdvance方法比起CyclicBarrier提供的栅栏操作非常相似。但更加灵活。
当然也有可中断和超时版本号。可是当任务等待发生中断或者超时遇到的异常也不会改变phaser的状态。假设必要,你能够在这些异常的处理器里运行关联的恢复操作,通常是在调用forceTermination之后恢复。Phaser可能也会被运行在ForkJoinPool中的任务使用,这样当其他线程在等待phaser添加被堵塞的时候,就能够确保有效平行地运行任务。
终结(Termination)
phaser可能进入一个终结状态。能够通过isTerminated来检查。当终结的时候。全部的同步方法都不会在等待下一个阶段而直接返回。返回一个负值来表示该状态。类似地,在终结的时候尝试注冊没有不论什么效果。
当onAdvance调用返回true的时候就会触发终结。
onAdvance默认实现为当一个反注冊导致注冊parties数降为0的时候返回true。当phser要控制操作在一个固定得迭代次数时。就能够非常方便地重写这种方法,当当前阶段号到达阀值得时候就返回true导致终结。forceTermination方法也时还有一个能够突然释放等待线程而且同意它们终结。
堆叠(Tiering)
Phaser能够被堆叠在一起(也就是说,以树形结构构造)来减少竞争。Phaser的parties数非常大的时候。以一组子phasers共享一个公共父亲能够减轻严重的同步竞争的成本。这样做能够大大提供吞吐量,但同一时候也会导致每一个操作的更高的成本。
在一棵堆叠的phaser树中,子phaser在父亲上的注冊和反注冊都会被自己主动管理。当子phaser的注冊parties树为非0的时候。子phaser就会注冊到父亲上。
当因为arriveAndDeregister的调用使注冊的parties数变为0时,子phaser就会从父亲中反注冊。这样就算父phaser的全部parties都到达了阶段,也必须等待子phaser的全部parties都到达了阶段并显式调用父phaser的awaitAdvance才算到达新的阶段。反之亦然。这样父phaser或者子phaser里注冊过的全部parties就能够一起互相等待到新的阶段。
另外,在这个堆叠结构的实现里,能够确保root结点必定是最先更新阶段号。然后才到其子结点,逐渐传递下去。
+------+ +------+ +------+
| root | <-- |parent| <-- | this |
+------+ +------+ +------+
parties:3+1 parties:3+1 parties:3
如上图所看到的,假设parties数多的时候,能够依据堆叠成为一颗树。这里假设root和parent和this都各初始化3个parties数。然后假设当前结点this有注冊parties数,则会在parent上注冊一个parties,因此其实root和parent都注冊了4个parties数。这样,假设this结点的3个parties数都到达了。就会调用parent的arrive,把parties数减去一,然后parent等待自己3个parties数都到达,就会调用root来减去一,这样root的3个parties数都到达就会一同释放全部等待结点,就实现了整棵树parties之间同步等待的功能。另外这个结构也非常easy看到root结点是最快进行阶段增长的。这样做最大的优点就是降低对同一个state变量的CAS竞争带来的性能下降。只是同一时候每一个同步操作也会添加对应的负担(每次获取状态都要和root进行阶段同步),所以一般在高并发下造成的性能下降才考虑。
监控(Monitoring)
同步方法仅仅能被注冊的parties调用时,phaser的当前状态能够被不论什么调用者监控。在不论什么时刻。有getRegisteredParties总共的parties。当中,有getArrivedParties个parites到达getPhase的当前阶段。当剩下getUnarrivedParties个parties到达。phase添加。这些方法的返回值可能反映短暂的状态,因此一般在同步控制中不太实用。toString方法以一种能够方便信息监控的格式返回这些状态的快照。
Phaser的详细使用方法能够參考样例:这里 private volatile long state;
private static final int MAX_PARTIES = 0xffff;
private static final int MAX_PHASE = Integer.MAX_VALUE;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;
// some special values
private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
private static final int EMPTY = 1;
//内部状态辅助方法
private static int unarrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
private static int partiesOf(long s) {
return (int)s >>> PARTIES_SHIFT;
}
private static int phaseOf(long s) {
return (int)(s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 :
(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
state变量为long类型,长度为64位。当中: public int register() {
return doRegister(1);
}
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
return doRegister(parties);
} 两者实现都非常easy,bulkRegister方法中加入了对parties数的检查。两个方法都调用了doRegister方法实现。 private int doRegister(int registrations) {
// adjustment to state
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
if (counts != EMPTY) { // not 1st registration
if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
else if (parent == null) { // 1st root registration
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}
doRegister方法做了下面事情:假设CAS成功就能够break推出循环同步。
其实,这里的unarrived推断相当重要。假设这样的情况,假设此时仅仅剩下最后一个未到达的parties数,而它刚好调用了arrive到达阶段,由于考虑到最后一个到达的party必须运行onAdvance函数,假设此时有新的注冊party,则要又一次等待party完毕,但已经造成错误的onAdvance调用,因此必需要在最后的party到达的时候禁止注冊。到达函数doArrive中有两次CAS的调用(具体实如今以下具体说明),第一次会把当前的未到达状态数变为0(这个CAS同一时候也是表示最后一个party已经到达)。第二次在调用onAdvance后。会又一次设置新的状态值。在doRegister函数里。unarrived的推断能够防止第一个CAS以后(即最后一个party到达)时会运行CAS来注冊新的party(由于此时unarrived==0。doRegister会进入堵塞等待doArrive完毕第二个CAS)。
获取synchronized锁后,再又一次检查一次状态是否发生变化,然后就调用parent.doRegister(1)向父phaser注冊自己。然后假设父phaser注冊成功(返回的phase>=0),就要利用自旋CAS把当前状态加入adjust,注意这个自旋就是强制当前状态值必需要成功注冊,这是由于这个和父phaser注冊都属于同一个原子事务,要在锁里面完毕。否则可能会状态不一致。
private long reconcileState() {
final Phaser root = this.root;
long s = state;
if (root != this) {
int phase, p;
// CAS to root phase with current parties, tripping unarrived
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
((phase < 0) ? (s & COUNTS_MASK) :
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
s = state;
}
return s;
}
reconcileState主要目的是和根结点保持阶段号同步。前面说过。假设出现堆叠情况,根结点是最先进行阶段号添加,尽管阶段号添加的操作会逐渐传递到子phaser,但某些同步操作,如动态注冊等,须要立即获悉整棵树的阶段号状态避免多余的CAS,因此就须要显式和根结点保持同步。reconcileState实现就是如此,假设root!=this。即发生堆叠。就利用自旋CAS把当前改动状态值,要注意的是因为阶段号添加。会同一时候会把未到达的parties数设置为原来的注冊parties数。主要实现都是移位和掩位操作,就不再赘述。 int awaitAdvance(int phase)
int awaitAdvanceInterruptibly(int phase) throws InterruptedException
int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException 这三个方法的实现事实上都大同小异,主要是添加来对中断和超时的控制。详细实现例如以下: public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException {
//省略一样的代码
if (p == phase) {
QNode node = new QNode(this, phase, true, false, 0L);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
}
return p;
}
public int awaitAdvanceInterruptibly(int phase,
long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
long nanos = unit.toNanos(timeout);
//省略一样的代码
if (p == phase) {
QNode node = new QNode(this, phase, true, true, nanos);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
else if (p == phase)
throw new TimeoutException();
}
return p;
}
三者实现大致结构都一样。首先获取当前状态值。假设堆叠则调用reconcileState获取根结点同步后的状态值。然后假设当前阶段号与请求等待的阶段号相等,则调用根结点的internalAwaitAdvance方法(根结点是最先进行阶段号增长)。 static final class QNode implements ForkJoinPool.ManagedBlocker {
//省略其他成员变量以及构造函数
QNode next;
public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
if (nanos <= 0L) {
thread = null;
return true;
}
}
return false;
}
public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
Phaser的等待队列使用的是Treiber无锁算法的栈操作。样例实现能够參考这里。
首先能够注意到QNode类是实现了ForkJoinPool.ManagedBlocker接口,这个接口能够确保假设使用ForkJoinWorkerThread的时候就能够保持并发运行任务。
//NCPU是当前CPU数量
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
函数做了下面事情:因为这两者都非常相似,因此一并剖析。
private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
private int abortWait(int phase) {
AtomicReference<QNode> head = (phase & 1) == 0 ?
evenQ : oddQ;
for (;;) {
Thread t;
QNode q = head.get();
int p = (int)(root.state >>> PHASE_SHIFT);
if (q == null || ((t = q.thread) != null && q.phase == p))
return p;
if (head.compareAndSet(q, q.next) && t != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
releaseWaiters方法主要利用自旋从head结点起把队列里的结点出队,假设结点的thread引用为非null。则顺便唤醒。另外注意的是。每次出队前都会推断当前结点的阶段号是否与状态的阶段号相等,这里的状态阶段号用的是root.state,这是考虑到堆叠的情况。
int arrive() //一个party到达
int arriveAndDeregister() //一个party到达而且反注冊这个party 这两个函数的实现都非常easy: public int arrive() {
return doArrive(ONE_ARRIVAL);
}
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
} 主要是调用doArrive实现,doArrive实现例如以下 private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
doArrive看上去非常复杂,但事实上逻辑并不算太复杂。 public int arriveAndAwaitAdvance() {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ?
0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
}
}
}
函数的大致结构和doArrive几乎相同,在CAS之后假设unarrived大于1,则须要调用根结点的internalAwaitAdvance进行堵塞等待直到阶段号增长。假设unarrived小于等于1,则假设有堆叠发生(root != this)则调用父phaser的arriveAndAwaitAdvance。否则的话调用onAdvance,而且调用CAS把状态更新,然后调用releaseWaiters把之前的阶段等待队列释放。该函数对照起先调用arrive和awaitAdvance。更加方便而且因为降低了一些多余的变量读取和逻辑。速度更加快。考虑到高并发条件下的CAS竞争,也提供了不同机制去优化性能(堆叠,自旋等)。
标签:关联 大量 上进 div 大致 依次 values abort 定义函数
原文地址:http://www.cnblogs.com/lytwajue/p/7258278.html