码迷,mamicode.com
首页 > 其他好文 > 详细

Phaser实现源码剖析

时间:2014-07-19 23:14:19      阅读:437      评论:0      收藏:0      [点我收藏+]

标签:android   concurrent   多线程   并发   java   

    在这里首先说明一下,由于Phaser在4.3代码里是存在,但并没有被开放出来供使用,但已经被本人大致研究了,因此也一并进行剖析。
    Phaser是一个可以重复利用的同步栅栏,功能上与CyclicBarrier和CountDownLatch相似,不过提供更加灵活的用法。也就是说,Phaser的同步模型与它们差不多。一般运用的场景是一组线程希望同时到达某个执行点后(先到达的会被阻塞),执行一个指定任务,然后这些线程才被唤醒继续执行其它任务。
    Phaser一般是定义一个parties数(parties一般代指需要进行同步的线程),当这些parties到达某个执行点,就会调用await方法,表示到达某个阶段(phase),然后就会阻塞直到有足够的parites数(也就是线程数)都调用过await方法后,这些线程才会被逐个唤醒,另外,在唤醒之前,可以选择性地执行某个定制的任务。Phaser对比起CyclicBarrier,不仅它是可以重复同步,并且parties数是可以动态注册的,另外还提供了非阻塞的arrive方法表示先到达阶段等,大大提高了同步模型的灵活性,当然了,实现也会相对复杂。

注册(Registration)
    和其它栅栏不一样,在phaser上进行同步的注册parites数可能会随着时间改变而不同。可以在任何时间注册任务(调用register,bulkRegister方法,或者可以初始化parties数的构造函数形式),另外还可以在任何到达栅栏的时候反注册(arriveAndDeregister方法)。作为大部分同步结构的基础,注册和反注册只会影响内部计数;它们没有做任何额外的内部记录,所以任务不能够查询它们是否被注册。(不过你可以继承这个类以引入这样的记录)

同步(Synchronization)
    就像CyclicBarrier一样,Phaser可以重复地等待。arriveAndAwaitAdvance方法与CyclicBarrier.await作用类似。每一代(generation)的phaser有一个关联的阶段号(phase number)。阶段号从0开始,当所有parties都到达阶段的时候就会加一,直到Integer.MAX_VALUE后返回0。阶段号的使用允许在到达某个阶段以及在等待其它parites的时候,通过以下两类可以被任何注册过的party调用的方法,执行单独的操作:
    1.到达(Arrival)
        arrive方法和arriveAndDeregister方法记录到达。这些方法不会阻塞,但会返回一个关联的到达阶段号(arrival phase number),也就是,phaser在到达以后所用的阶段号。当最后的party到达一个给定的阶段,就可以执行可选的操作,并且阶段号自加一。这些操作都会被触发阶段增加的party执行,并且会被可重写方法onAdvance(同时管理Phaser的终结)安排管理。重写onAdvance方法比起CyclicBarrier提供的栅栏操作很相似,但更加灵活。
    2.等待(Waiting)
        awaitAdvance方法需要一个表示到达阶段号的参数,并在phaser增加(或者已经在)不同的阶段的时候返回。与CyclicBarrier类似结构不同,awaitAdvance方法会在等待线程被中断的时候继续等待。当然也有可中断和超时版本,但是当任务等待发生中断或者超时遇到的异常也不会改变phaser的状态。如果必要,你可以在这些异常的处理器里执行关联的恢复操作,一般是在调用forceTermination之后恢复。Phaser可能也会被执行在ForkJoinPool中的任务使用,这样当其它线程在等待phaser增加被阻塞的时候,就可以确保有效平行地执行任务。

终结(Termination)

phaser可能进入一个终结状态,可以通过isTerminated来检查。当终结的时候,所有的同步方法都不会在等待下一个阶段而直接返回,返回一个负值来表示该状态。类似地,在终结的时候尝试注册没有任何效果。当onAdvance调用返回true的时候就会触发终结。onAdvance默认实现为当一个反注册导致注册parties数降为0的时候返回true。当phser要控制操作在一个固定得迭代次数时,就可以很方便地重写这个方法,当当前阶段号到达阀值得时候就返回true导致终结。forceTermination方法也时另一个可以突然释放等待线程并且允许它们终结。


堆叠(Tiering)

Phaser可以被堆叠在一起(也就是说,以树形结构构造)来降低竞争。Phaser的parties数很大的时候,以一组子phasers共享一个公共父亲能够减轻严重的同步竞争的成本。这样做可以大大提供吞吐量,但同时也会导致每个操作的更高的成本。

    在一棵堆叠的phaser树中,子phaser在父亲上的注册和反注册都会被自动管理。当子phaser的注册parties树为非0的时候,子phaser就会注册到父亲上。当由于arriveAndDeregister的调用使注册的parties数变为0时,子phaser就会从父亲中反注册。这样就算父phaser的所有parties都到达了阶段,也必须等待子phaser的所有parties都到达了阶段并显式调用父phaser的awaitAdvance才算到达新的阶段。反之亦然。这样父phaser或者子phaser里注册过的所有parties就可以一起互相等待到新的阶段。另外,在这个堆叠结构的实现里,可以确保root结点必然是最先更新阶段号,然后才到其子结点,逐渐传递下去。

    +------+      +------+      +------+
    | root |  <-- |parent| <--  | this |
    +------+      +------+      +------+


   parties:3+1   parties:3+1    parties:3

    如上图所示,如果parties数多的时候,可以根据堆叠成为一颗树,这里假设root和parent和this都各初始化3个parties数,然后如果当前结点this有注册parties数,则会在parent上注册一个parties,因此事实上root和parent都注册了4个parties数。这样,如果this结点的3个parties数都到达了,就会调用parent的arrive,把parties数减去一,然后parent等待自己3个parties数都到达,就会调用root来减去一,这样root的3个parties数都到达就会一同释放所有等待结点,就实现了整棵树parties之间同步等待的功能。另外这个结构也很容易看到root结点是最快进行阶段增长的。这样做最大的好处就是减少对同一个state变量的CAS竞争带来的性能下降,不过同时每个同步操作也会增加相应的负担(每次获取状态都要和root进行阶段同步),所以一般在高并发下造成的性能下降才考虑。

监控(Monitoring)

同步方法只能被注册的parties调用时,phaser的当前状态可以被任何调用者监控。在任何时刻,有getRegisteredParties总共的parties,其中,有getArrivedParties个parites到达getPhase的当前阶段。当剩下getUnarrivedParties个parties到达,phase增加。这些方法的返回值可能反映短暂的状态,因此一般在同步控制中不太有用。toString方法以一种可以方便信息监控的格式返回这些状态的快照。

    Phaser的具体用法可以参考例子:这里

源码剖析

内部状态表示

    Phaser内部对于状态(包括parties注册数、阶段号、未到达的parties数等)管理都使用一个volatile long型变量,同时利用CAS进行更新。这样做就可以保证在状态改变时,保持所有状态一致改变,这是实现无锁算法的基础之一。
    private volatile long state;


    private static final int  MAX_PARTIES     = 0xffff;
    private static final int  MAX_PHASE       = Integer.MAX_VALUE;
    private static final int  PARTIES_SHIFT   = 16;
    private static final int  PHASE_SHIFT     = 32;
    private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
    private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
    private static final long COUNTS_MASK     = 0xffffffffL;
    private static final long TERMINATION_BIT = 1L << 63;


    // some special values
    private static final int  ONE_ARRIVAL     = 1;
    private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
    private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
    private static final int  EMPTY           = 1;


    //内部状态辅助方法
    private static int unarrivedOf(long s) {
        int counts = (int)s;
        return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
    }


    private static int partiesOf(long s) {
        return (int)s >>> PARTIES_SHIFT;
    }


    private static int phaseOf(long s) {
        return (int)(s >>> PHASE_SHIFT);
    }


    private static int arrivedOf(long s) {
        int counts = (int)s;
        return (counts == EMPTY) ? 0 :
            (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
    }
    state变量为long类型,长度为64位,其中:
    未达到parties数     (0-15位)
    注册的parties数     (16-31位)
    当前阶段号          (32-62位)
    结束状态标识        (63位 符号位)
    把符号位设置位结束状态可以简单判断state是否为负表示是否结束。另外,如果当phaser没有任何注册parties数,则会用一个无效状态EMPTY(0个已注册和1个未到达parites数)来区分其它状态。
    除此之外,phaser定义了一些静态常量方便对state变量进行移位解析,如*_SHIFT移位和*_MASK掩位。另外还有一些特殊值方便计算。还有一些辅助方法能够从state提取某些状态值。

动态注册
    接着看看动态注册parties的实现。动态注册有两个可用的接口,register方法和bulkRegister方法,其中register方法默认注册一个party数,bulkRegister方法可以注册数加上已经注册的,最大不超过MAX_PARTIES。
    public int register() {
        return doRegister(1);
    }


    public int bulkRegister(int parties) {
        if (parties < 0)
            throw new IllegalArgumentException();
        if (parties == 0)
            return getPhase();
        return doRegister(parties);
    }
    两者实现都很简单,bulkRegister方法中添加了对parties数的检查。两个方法都调用了doRegister方法实现。
    private int doRegister(int registrations) {
        // adjustment to state
        long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
        final Phaser parent = this.parent;
        int phase;
        for (;;) {
            long s = (parent == null) ? state : reconcileState();
            int counts = (int)s;
            int parties = counts >>> PARTIES_SHIFT;
            int unarrived = counts & UNARRIVED_MASK;
            if (registrations > MAX_PARTIES - parties)
                throw new IllegalStateException(badRegister(s));
            phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                break;
            if (counts != EMPTY) {                  // not 1st registration
                if (parent == null || reconcileState() == s) {
                    if (unarrived == 0)             // wait out advance
                        root.internalAwaitAdvance(phase, null);
                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                       s, s + adjust))
                        break;
                }
            }
            else if (parent == null) {              // 1st root registration
                long next = ((long)phase << PHASE_SHIFT) | adjust;
                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                    break;
            }
            else {
                synchronized (this) {               // 1st sub registration
                    if (state == s) {               // recheck under lock
                        phase = parent.doRegister(1);
                        if (phase < 0)
                            break;
                        // finish registration whenever parent registration
                        // succeeded, even when racing with termination,
                        // since these are part of the same "transaction".
                        while (!UNSAFE.compareAndSwapLong
                               (this, stateOffset, s,
                                ((long)phase << PHASE_SHIFT) | adjust)) {
                            s = state;
                            phase = (int)(root.state >>> PHASE_SHIFT);
                            // assert (int)s == EMPTY;
                        }
                        break;
                    }
                }
            }
        }
        return phase;
    }
    doRegister方法做了以下事情:
    首先计算注册后要当前state要调整的值adjust,注意adjust把未到达的parties数和注册的parties数都设为registrations;接着就进入自循环,首先考虑到堆叠的情况(parent不为null),就要调用reconcileState方法与parent的阶段号同步,并计算出未注册前正确的state值,然后再依次计算注册parties数parties,未到达数unarrived,阶段号phase,并且判断注册后是否超过MAX_PARTIES等一系列的准备工作,接下来就是三个判断区分三种不同的情况:
    (1)如果counts!=EMPTY(也即已经有parties注册),则此时如果parent不为null,则要调用reconcileState方法和当前的状态s对比,这是因为要保证parent和子phaser的阶段号保持一致。如果一致或者parent为null(没有堆叠),此时可以先拿unarrived值与0判断,如果为0,则表示所有parties已经到达,阶段号增加的情况,因此调用root.internalAwaitAdvance(在没有堆叠的情况下,root为this,如果发生堆叠,root则为整棵树的根节点)来等待增加完成并再次循环同步,如果unarrived非0,则直接利用CAS把当前state添加adjust。如果CAS成功就可以break推出循环同步。事实上,这里的unarrived判断相当重要,假设这种情况,如果此时只剩下最后一个未到达的parties数,而它刚好调用了arrive到达阶段,由于考虑到最后一个到达的party必须执行onAdvance函数,如果此时有新的注册party,则要重新等待party完成,但已经造成错误的onAdvance调用,因此必须要在最后的party到达的时候禁止注册。到达函数doArrive中有两次CAS的调用(具体实现在下面详细说明),第一次会把当前的未到达状态数变为0(这个CAS同时也是表示最后一个party已经到达),第二次在调用onAdvance后,会重新设置新的状态值。在doRegister函数里,unarrived的判断可以防止第一个CAS以后(即最后一个party到达)时会执行CAS来注册新的party(因为此时unarrived==0,doRegister会进入阻塞等待doArrive完成第二个CAS)。
    (2)如果(1)判断失败,并且parent==null时,则表示这次是第一个注册,因此直接算出新的state值,并且CAS即可,注意这里没有unarrived判断的优化,因为这次是第一个注册,所以不会出现所有parties已经到达,阶段号增加的情况。
    (3)如果(1)和(2)判断失败,则表示这次是子phaser的第一次注册。这时我们使用来synchronized的内置锁来防止并发出现,这时因为子phaser第一次注册,堆叠结构就必须要向parent注册一次并只有一次。获取synchronized锁后,再重新检查一次状态是否发生变化,然后就调用parent.doRegister(1)向父phaser注册自己。然后如果父phaser注册成功(返回的phase>=0),就要利用自旋CAS把当前状态添加adjust,注意这个自旋就是强制当前状态值必须要成功注册,这是因为这个和父phaser注册都属于同一个原子事务,要在锁里面完成,否则可能会状态不一致。
    doRegister大致逻辑如上,接着我们来看看其中的一些方法实现,首先是reconcileState。
    private long reconcileState() {
        final Phaser root = this.root;
        long s = state;
        if (root != this) {
            int phase, p;
            // CAS to root phase with current parties, tripping unarrived
            while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
                   (int)(s >>> PHASE_SHIFT) &&
                   !UNSAFE.compareAndSwapLong
                   (this, stateOffset, s,
                    s = (((long)phase << PHASE_SHIFT) |
                         ((phase < 0) ? (s & COUNTS_MASK) :
                          (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
                           ((s & PARTIES_MASK) | p))))))
                s = state;
        }
        return s;
    }
    reconcileState主要目的是和根结点保持阶段号同步。前面说过,如果出现堆叠情况,根结点是最先进行阶段号增加,虽然阶段号增加的操作会逐渐传递到子phaser,但某些同步操作,如动态注册等,需要马上获悉整棵树的阶段号状态避免多余的CAS,因此就需要显式和根结点保持同步。reconcileState实现就是如此,如果root!=this,即发生堆叠,就利用自旋CAS把当前修改状态值,要注意的是由于阶段号增加,会同时会把未到达的parties数设置为原来的注册parties数。主要实现都是移位和掩位操作,就不再赘述。

awaitAdvance实现
    Phaser的一个重要的同步操作就是awaitAdvance系列方法。awaitAdvance是阻塞等待指定阶段号增长的一系列方法,包括
    int awaitAdvance(int phase)
    int awaitAdvanceInterruptibly(int phase) throws InterruptedException
    int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
    这三个方法的实现其实都大同小异,主要是增加来对中断和超时的控制,具体实现如下:
    public int awaitAdvance(int phase) {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase)
            return root.internalAwaitAdvance(phase, null);
        return p;
    }

    public int awaitAdvanceInterruptibly(int phase)
        throws InterruptedException {
        //省略一样的代码

        if (p == phase) {
            QNode node = new QNode(this, phase, true, false, 0L);
            p = root.internalAwaitAdvance(phase, node);
            if (node.wasInterrupted)
                throw new InterruptedException();
        }
        return p;
    }

     public int awaitAdvanceInterruptibly(int phase,
                                         long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        long nanos = unit.toNanos(timeout);

        //省略一样的代码
        if (p == phase) {
            QNode node = new QNode(this, phase, true, true, nanos);
            p = root.internalAwaitAdvance(phase, node);
            if (node.wasInterrupted)
                throw new InterruptedException();
            else if (p == phase)
                throw new TimeoutException();
        }
        return p;
    }
    三者实现大致结构都一样,首先获取当前状态值,如果堆叠则调用reconcileState获取根结点同步后的状态值。然后如果当前阶段号与请求等待的阶段号相等,则调用根结点的internalAwaitAdvance方法(根结点是最先进行阶段号增长)。
    internalAwaitAdvance有两个参数,一个是指定等待的阶段号,另外一个是等待结点QNode,如果这个参数为null则会在内部创建一个不会被中断也不会超时的结点来加入队列进行等待,否则就会把参数结点加入队列,因此可以看到awaitAdvance的中断和超时版本都会自己创建对应的结点传入。先来看看结点QNode的实现:
    static final class QNode implements ForkJoinPool.ManagedBlocker {
        //省略其它成员变量以及构造函数
        QNode next;


        public boolean isReleasable() {
            if (thread == null)
                return true;
            if (phaser.getPhase() != phase) {
                thread = null;
                return true;
            }
            if (Thread.interrupted())
                wasInterrupted = true;
            if (wasInterrupted && interruptible) {
                thread = null;
                return true;
            }
            if (timed) {
                if (nanos > 0L) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
                if (nanos <= 0L) {
                    thread = null;
                    return true;
                }
            }
            return false;
        }


        public boolean block() {
            if (isReleasable())
                return true;
            else if (!timed)
                LockSupport.park(this);
            else if (nanos > 0)
                LockSupport.parkNanos(this, nanos);
            return isReleasable();
        }
    }
    Phaser的等待队列使用的是Treiber无锁算法的栈操作。例子实现可以参考这里。首先可以注意到QNode类是实现了ForkJoinPool.ManagedBlocker接口,这个接口可以确保如果使用ForkJoinWorkerThread的时候就可以保持并发执行任务。
    首先看看isReleaseable的实现,接口定义函数返回true,就不需要接下来的block操作。因此如果当前阶段号已经和指定阶段号不相等,则返回true,另外在判断中断的时候,如果interruptible值(构造函数的时候)为false,则会忽略中断。接着就是一个典型的超时判断逻辑。注意这里在返回true之前都会把thread设为null,表示不需要等待取消,不需要进行唤醒。
    block的实现也是很简单,如果非超时就调用LockSupport.part,否则就调用超时版本parkNanos。
    接下来看看internalAwaitAdvance实现。
  //NCPU是当前CPU数量
    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;


    private int internalAwaitAdvance(int phase, QNode node) {
        // assert root == this;
        releaseWaiters(phase-1);          // ensure old queue clean
        boolean queued = false;           // true when node is enqueued
        int lastUnarrived = 0;            // to increase spins upon change
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            if (node == null) {           // spinning in noninterruptible mode
                int unarrived = (int)s & UNARRIVED_MASK;
                if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) < NCPU)
                    spins += SPINS_PER_ARRIVAL;
                boolean interrupted = Thread.interrupted();
                if (interrupted || --spins < 0) { // need node to record intr
                    node = new QNode(this, phase, false, false, 0L);
                    node.wasInterrupted = interrupted;
                }
            }
            else if (node.isReleasable()) // done or aborted
                break;
            else if (!queued) {           // push onto queue
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                QNode q = node.next = head.get();
                if ((q == null || q.phase == phase) &&
                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                    queued = head.compareAndSet(q, node);
            }
            else {
                try {
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    node.wasInterrupted = true;
                }
            }
        }


        if (node != null) {
            if (node.thread != null)
                node.thread = null;       // avoid need for unpark()
            if (node.wasInterrupted && !node.interruptible)
                Thread.currentThread().interrupt();
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
                return abortWait(phase); // possibly clean up on abort
        }
        releaseWaiters(phase);
        return p;
    }
    函数做了以下事情:
    1、调用releaseWaiters释放上一个阶段的等待队列;
    2、进入while循环,判断条件是保证指定的阶段和当前阶段保持一致,然后有4个判断分支,代表不同情况:
    (1)如果参数node为null,即请求的是非阻塞超时等待,接着是一个有关自旋等待的逻辑,考虑到在多核CPU上短时间大量线程唤醒相当慢,因此在这里准备阻塞先添加一个简单的自旋逻辑。具体是这样:初始化一个自旋次数spins为SPINS_PER_ARRIVAL,当此前的未到达parties数unarrived和上次记录的未到达parties数lastUnarrived不相等的时候,并且parites数少于当前CPU数量,则会给当前自旋次数添加一个SPINS_PER_ARRIVAL常量,这样在下一次到达之前都会自旋spins次,如果此时出现阶段号增长,则会退出自旋,就可以避免接下来的阻塞逻辑,但如果在自旋spins次阶段号仍然没有递增(如果此时发生中断则取消自旋进入阻塞),则创建一个非中断超时结点,准备进入等待队列。
    (2)如果(1)判断失败,则调用已经创建node的isReleaseable方法,判断是否是否因为中断或者超时等取消当前等待;
    (3)这里是一个入队操作,用queued变量保证只入队一次。另外,考虑到上一阶段里如果有结点在释放的时候,刚好当前阶段有入队操作的话会有竞争产生,因此这里采用了两个队列,偶数队列(evenQ)和奇数队列(oddQ)。接着按照Treiber算法的模型,把请求结点入队,注意入队前需要再次判断当前阶段是否已经增加。
    (4)最后就是进入阻塞状态,这里调用ForkJoinPool.managedBlock方法把结点阻塞,直到阶段号增长被唤醒,或者发生中断或超时等取消等待。
    3、while循环退出后,接着就是结点node状态清理,包括清空thread引用,以及必要的中断状态复位,另外如果在中断和超时的情况下还需要调用abortWait释放队列里面同样是中断和超时的结点。
    4、最后再次调用releaseWaiters释放当前阶段号的等待队列。

    这里顺便看看releaseWaiters和abortWait的实现。由于这两者都很相似,因此一并剖析。
  private void releaseWaiters(int phase) {
        QNode q;   // first element of queue
        Thread t;  // its thread
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        while ((q = head.get()) != null &&
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
            if (head.compareAndSet(q, q.next) &&
                (t = q.thread) != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }
    }


    private int abortWait(int phase) {
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        for (;;) {
            Thread t;
            QNode q = head.get();
            int p = (int)(root.state >>> PHASE_SHIFT);
            if (q == null || ((t = q.thread) != null && q.phase == p))
                return p;
            if (head.compareAndSet(q, q.next) && t != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }
    }
    releaseWaiters方法主要利用自旋从head结点起把队列里的结点出队,如果结点的thread引用为非null,则顺便唤醒。另外注意的是,每次出队前都会判断当前结点的阶段号是否与状态的阶段号相等,这里的状态阶段号用的是root.state,这是考虑到堆叠的情况。
    abortWait的实现是releaseWaiters的变种,从头结点开始,如果遇到没有被取消等待(thread引用是否为null)并且阶段号与当前相等的正常阻塞结点,就会退出,否则一直释放结点。

arrive实现
    Phaser提供了单独表示到达阶段的非阻塞函数,即
    int arrive()                    //一个party到达
    int arriveAndDeregister()       //一个party到达并且反注册这个party
    这两个函数的实现都很简单:
    public int arrive() {
        return doArrive(ONE_ARRIVAL);
    }


    public int arriveAndDeregister() {
        return doArrive(ONE_DEREGISTER);
    }
    主要是调用doArrive实现,doArrive实现如下
  private int doArrive(int adjust) {
        final Phaser root = this.root;
        for (;;) {
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            int counts = (int)s;
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
                if (unarrived == 1) {
                    long n = s & PARTIES_MASK;  // base of next state
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    if (root == this) {
                        if (onAdvance(phase, nextUnarrived))
                            n |= TERMINATION_BIT;
                        else if (nextUnarrived == 0)
                            n |= EMPTY;
                        else
                            n |= nextUnarrived;
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        n |= (long)nextPhase << PHASE_SHIFT;
                        UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                        releaseWaiters(phase);
                    }
                    else if (nextUnarrived == 0) { // propagate deregistration
                        phase = parent.doArrive(ONE_DEREGISTER);
                        UNSAFE.compareAndSwapLong(this, stateOffset,
                                                  s, s | EMPTY);
                    }
                    else
                        phase = parent.doArrive(ONE_ARRIVAL);
                }
                return phase;
            }
        }
    }
    doArrive看上去很复杂,但其实逻辑并不算太复杂。
    1、首先依然是进入一个自旋结构,然后根据是否有堆叠(root == this)来获取正确状态值,接着计算阶段号phase,未到达parties数unarrived,此时如果unarrived少于等于0,则必须抛出异常,表示这次到达是非法的(因为所有的注册parties数已经到达);
    2、接着就是利用CAS把状态值进行更改,这里是减去参数adjust值,arrive的传入参数是ONE_ARRIVE,也就是1,arriveAndDeregister是ONE_ARRIVAL|ONE_PARTY,减去之后刚好把未到达数和注册数都减去一。
    3、如果CAS成功,如果CAS之前的unarrived刚好为1,则表示此次到达是最后一个未到达party,然后重新开始计算下一个阶段值n,接着需要根据是否堆叠进行判断:
    (1)如果没有堆叠(root == this)则按照定义我们调用onAdvance,传入相对参数,此时如果onAdvance返回true,我们给n添加终结标识,如果onAdvance返回false,但下阶段的未到达parties数(同时也是当前注册的parties数)为0(可能由于反注册造成),因此要给n添加EMPTY值,否则就给n添加新的未到达parties数,接着就调用CAS把当前状态值更改为n,然后调用releaseWaiters释放上一阶段号的等待队列。注意这里第二个CAS的返回值可以忽略,因为这里与doRegister的冲突已经由doRegister的unarrived判断解决。
    (2)如果(1)判断失败,则出现了堆叠,另外如果此时新的未到达数为0(所有之前的注册parties数都被反注册),根据堆叠的结构,我们必须向parent表示已经到达一个party并且反注册自己,并且同时把当前状态CAS为EMPTY,同样,这里的CAS可以忽略返回值。
    (3)如果(1)(2)判断都失败,则只需要简单地把向parent调用doArrive(ONE_ARRIVE)表示自己当前所有已经注册的parties数都到达了,然后parent就会减去一个代表这个子phaser的到达parties数。

    这里顺便介绍一个比较方便的函数arriveAndAwaitAdvance,从名字上就可以看出,这个函数把arrive和awaitAdvance两个效果都合成在一起。具体实现如下:
    public int arriveAndAwaitAdvance() {
        final Phaser root = this.root;
        for (;;) {
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            int counts = (int)s;
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                          s -= ONE_ARRIVAL)) {
                if (unarrived > 1)
                    return root.internalAwaitAdvance(phase, null);
                if (root != this)
                    return parent.arriveAndAwaitAdvance();
                long n = s & PARTIES_MASK;  // base of next state
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                if (onAdvance(phase, nextUnarrived))
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    n |= EMPTY;
                else
                    n |= nextUnarrived;
                int nextPhase = (phase + 1) & MAX_PHASE;
                n |= (long)nextPhase << PHASE_SHIFT;
                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                    return (int)(state >>> PHASE_SHIFT); // terminated
                releaseWaiters(phase);
                return nextPhase;
            }
        }
    }
    函数的大致结构和doArrive差不多,在CAS之后如果unarrived大于1,则需要调用根结点的internalAwaitAdvance进行阻塞等待直到阶段号增长,如果unarrived小于等于1,则如果有堆叠发生(root != this)则调用父phaser的arriveAndAwaitAdvance,否则的话调用onAdvance,并且调用CAS把状态更新,然后调用releaseWaiters把之前的阶段等待队列释放。该函数对比起先调用arrive和awaitAdvance,更加方便并且由于减少了一些多余的变量读取和逻辑,速度更加快。

总结
    Phaser的同步模型于CountDownLatch、CyclicBarrier类似,但提供了更加灵活的同步支持,另外由于实现采用了无锁算法,整个同步操作的实现变得更加复杂。考虑到高并发条件下的CAS竞争,也提供了不同机制去优化性能(堆叠,自旋等)。虽然Android暂时没有开放此类,但整个算法以及思想还是很值得我们去研究学习的。

Phaser实现源码剖析

标签:android   concurrent   多线程   并发   java   

原文地址:http://blog.csdn.net/pun_c/article/details/37965813

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!