码迷,mamicode.com
首页 > 数据库 > 详细

[源码]Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).

时间:2014-06-27 09:10:37      阅读:505      评论:0      收藏:0      [点我收藏+]

标签:线程   concurrent   condition   reentrantlocker   locker   

  源代码解析
Re‘entrantLock lock = new ReentrantLock(fair);
  Condition   notEmpty = lock.newCondition(); //返回内部类 AbstractQueuedSyncronizer.ConditionObject
   各自维护了两个队列.一个是阻塞同步队列 syncQueue 双向队列,一个是条件等待队列.
   Condition.await两个作用.1.放入同步队列 park 2.realse锁,3等待别人获取锁acquire(),并且.signal .unlock()之后调用acquiredQueue()从阻塞同步队列里复活出来.
   Condition..signal 1.在父类Locker.lock()获取锁之后,从条件队列迁移到阻塞同步队列.2.等待之后的unlcok 释放锁,并唤醒next线程.
能够park .signal 完成线程加入到阻塞队列中( 因为signal必须在对应的lock()后操作. 所以从条件队列中迁移出不可能获得锁,只能加入到线程队列中.)
  之前的误区: 何时await的线程被唤醒?和正在syncQueue中的线程优先级哪个高?
   我理解为signal之后唤醒await线程.
   正确理解: signal只是转移线程,并不是唤醒await队列的地方.真正唤醒await线程的地方在持有Locker.unlock的时候.(见LinkedBlockingQueue中的signalNotFull()方法.) await线程被转移到syncQueue时,已经有线程在排队,那么只好放在队尾.
  下面有LinkedBlockingQueue的先take,await, 然后被put.signal的时序图.

  private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal(); // phil:必须在lock()后面,锁已经被线程获取了.
        } finally {
            takeLock.unlock();
        }
    }
   ReentrantLock.unlock完成下一个线程(可能刚好是signal加入的)的unpark.

所以总结: signal后不一定是之前的那个await的线程.  获得锁执行..


 /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException
         * <li> Save lock state returned by {@link #getState}
         * <li> Invoke {@link #release} with
         *      saved state as argument, throwing
         *      IllegalMonitorStateException  if it fails.
         * <li> Block until signalled or interrupted
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw exception
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {                //phi注:不在AbstractQueuedSyncronizer的线程阻塞队列里.就park();
condition.signal后,迁移线程到父类的阻塞线程队列,如果父类ReentrantLocker执行release()操作,唤醒队列头线程,但线程还在队列里. 所以不用死循环. 所以说: Lock的孩子的条件队列和Lock自己的阻塞队列是互斥的.
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 如果是第一个,那么就获取锁,从队列中剔除并执行,如果不是就重新park()
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }


/**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)  //这一步做的操作重新设置队列头.
                    lastWaiter = null;
                first.nextWaiter = null;                       //隔断老的队列头和原队列的连接.
            } while (!transferForSignal(first) &&             //transferForSignal把线程从条件队列中转移到阻塞队列中.但没有unpark操作
                     (first = firstWaiter) != null);
        }

/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal).
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))   
            LockSupport.unpark(node.thread);             //正常情况不会进到这里. 那么何时进行unpark呢? unpark是上一个线程释放锁时进行的操作. unpark阻塞队列列头的线程.
        return true;
    }


何时unpark()呢?
ArrayBlockingQueue. 内部只有一个Locker.
所以所有的.await都在这个locker.lock()和unlock()之间.
在.signal()和父类locker的unlocker() 两个动作先后执行之后 unpark.

LinkedBlockingQueue 中
由于 take的Condition的signal是在put()内执行的.
所以特意在.signal()的外面封装了 takeLocker.lock()和 unLock();
这两个操作(signal和unlock() )才能成为一个整体 ,实现线程从条件队列迁移到阻塞队列,然后线程的唤醒,然后从阻塞队列的剔除这个线程.
bubuko.com,布布扣

  LinkedBlockingQueue的先take,await, 然后被put.signal的时序图.高清版右键下载
https://www.gliffy.com fei3342@画出来
bubuko.com,布布扣

bubuko.com,布布扣

[源码]Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).,布布扣,bubuko.com

[源码]Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).

标签:线程   concurrent   condition   reentrantlocker   locker   

原文地址:http://blog.csdn.net/fei33423/article/details/34854579

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!