码迷,mamicode.com
首页 > 编程语言 > 详细

java 并发(五)---AbstractQueuedSynchronizer(2)

时间:2018-12-22 17:27:02      阅读:246      评论:0      收藏:0      [点我收藏+]

标签:actor   private   返回   try   monitor   文章   signal   int   exce   

 

       文章部分代码和照片来自参考资料

 

ConditonObject

         ConditionObject 继承 Condition 这个接口, 看一下这个接口的注解说明 :

 

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

       如果Lock替换了synchronized方法和语句的使用,则Condition将替换Object监视方法的使用。

 

         Condition 经常可以用在生产者-消费者的场景中,ArrayBlockingQueue 采用这种方式实现了生产者-消费者.

         ConditionObject 是 AQS里的一个对象,继承Condition 接口,上一节我们提到AQS 通过同步队列(sync queue )和 等待队列(wait queue )还有 状态变量(statue)进行并发控制。这节我们要讲的就是在等待队列的操作。

         下面是 wait queue 和 sync queue 的图例。

 

技术分享图片

 

await 和 signal 方法

 

      代码分析来自于 一行一行源码分析清楚 AbstractQueuedSynchronizer (二)

         

        这两个方法可以用下面两种两张图来描述。其中await 是将节点加入到 wait queue ,然后等待唤醒。 signal 方法是从wait queue 移动到 sync queue 中,然后唤醒。

       

        技术分享图片      

                                                                            图一. await 方法

 

技术分享图片

                                                                            图二. signal 方法

 

              Conditon 的方法实现 基于 ReetranLock 。下面源码分析会涉及到。

              Condition 的await 方法 包括的操作有 :

 

  • If current thread is interrupted, throw InterruptedException.
  • Save lock state returned by getState.
  • Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
  • Block until signalled or interrupted.
  • Reacquire by invoking specialized version of acquire with saved state as argument.
    If interrupted while blocked in step 4, throw InterruptedException.

 

  1 // 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
  2 // 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
  3 public final void await() throws InterruptedException {
  4     if (Thread.interrupted())
  5         throw new InterruptedException();
  6     // 添加到 condition 的条件队列中
  7     Node node = addConditionWaiter();
  8     // 释放锁,返回值是释放锁之前的 state 值
  9     int savedState = fullyRelease(node);
 10     int interruptMode = 0;
 11     // 这里退出循环有两种情况,之后再仔细分析
 12     // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了
 13     // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断
 14     while (!isOnSyncQueue(node)) {
 15         LockSupport.park(this);
 16         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 17             break;
 18     }
 19     // 被唤醒后,将进入阻塞队列,等待获取锁
 20     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 21         interruptMode = REINTERRUPT;
 22     if (node.nextWaiter != null) // clean up if cancelled
 23         unlinkCancelledWaiters();
 24     if (interruptMode != 0)
 25         reportInterruptAfterWait(interruptMode);
 26 }

       

  1 // 将当前线程对应的节点入队,插入队尾
  2 private Node addConditionWaiter() {
  3     Node t = lastWaiter;
  4     // 如果条件队列的最后一个节点取消了,将其清除出去
  5     if (t != null && t.waitStatus != Node.CONDITION) {
  6         // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
  7         unlinkCancelledWaiters();
  8         t = lastWaiter;
  9     }
 10     Node node = new Node(Thread.currentThread(), Node.CONDITION);
 11     // 如果队列为空
 12     if (t == null)
 13         firstWaiter = node;
 14     else
 15         t.nextWaiter = node;
 16     lastWaiter = node;
 17     return node;
 18 }
 19 在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。
 20 
 21 当 await 的时候如果发生了取消操作(这点之后会说),或者是在节点入队的时候,发现最后一个节点是被取消的,会调用一次这个方法。
 22 
 23 // 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去
 24 // 纯属链表操作,很好理解,看不懂多看几遍就可以了
 25 private void unlinkCancelledWaiters() {
 26     Node t = firstWaiter;
 27     Node trail = null;
 28     while (t != null) {
 29         Node next = t.nextWaiter;
 30         // 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的
 31         if (t.waitStatus != Node.CONDITION) {
 32             t.nextWaiter = null;
 33             if (trail == null)
 34                 firstWaiter = next;
 35             else
 36                 trail.nextWaiter = next;
 37             if (next == null)
 38                 lastWaiter = trail;
 39         }
 40         else
 41             trail = t;
 42         t = next;
 43     }
 44 }

          ReentranLock 是可重入的,所以释放所有的锁。

  1 // 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
  2 // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
  3 //         那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
  4 //         相应的,如果 lock 重入了 n 次,savedState == n
  5 // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
  6 final int fullyRelease(Node node) {
  7     boolean failed = true;
  8     try {
  9         int savedState = getState();
 10         // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
 11         if (release(savedState)) {
 12             failed = false;
 13             return savedState;
 14         } else {
 15             throw new IllegalMonitorStateException();
 16         }
 17     } finally {
 18         if (failed)
 19             node.waitStatus = Node.CANCELLED;
 20     }
 21 }
  1 // 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
  2 // 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
  3 // 这个方法就是判断 node 是否已经移动到阻塞队列了
  4 final boolean isOnSyncQueue(Node node) {
  5     // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
  6     // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
  7     // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列
  8     if (node.waitStatus == Node.CONDITION || node.prev == null)
  9         return false;
 10     // 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
 11     if (node.next != null)
 12         return true;
 13 
 14     // 这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列
 15 
 16     // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
 17     // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,
 18     // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。
 19 
 20     // 调用这个方法的时候,往往我们需要的就在队尾的部分,所以一般都不需要完全遍历整个队列的
 21     return findNodeFromTail(node);
 22 }
 23 
 24 // 从同步队列的队尾往前遍历,如果找到,返回 true
 25 private boolean findNodeFromTail(Node node) {
 26     Node t = tail;
 27     for (;;) {
 28         if (t == node)
 29             return true;
 30         if (t == null)
 31             return false;
 32         t = t.prev;
 33     }
 34 }
 35 }
           等待唤醒。
  1 int interruptMode = 0;
  2 while (!isOnSyncQueue(node)) {
  3     // 线程挂起
  4     LockSupport.park(this);
  5 
  6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  7         break;
  8 }

         回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this); 这里线程挂起。

         接下来就是 signal 唤醒线程,转移到阻塞队列为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this); 把线程挂起了,等待唤醒。唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。

  1 
  2 // 唤醒等待了最久的线程
  3 // 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
  4 public final void signal() {
  5     // 调用 signal 方法的线程必须持有当前的独占锁
  6     if (!isHeldExclusively())
  7         throw new IllegalMonitorStateException();
  8     Node first = firstWaiter;
  9     if (first != null)
 10         doSignal(first);
 11 }
 12 
 13 // 从条件队列队头往后遍历,找出第一个需要转移的 node
 14 // 因为前面我们说过,有些线程会取消排队,但是还在队列中
 15 private void doSignal(Node first) {
 16     do {
 17           // 将 firstWaiter 指向 first 节点后面的第一个
 18         // 如果将队头移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
 19         if ( (firstWaiter = first.nextWaiter) == null)
 20             lastWaiter = null;
 21         // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
 22         first.nextWaiter = null;
 23     } while (!transferForSignal(first) &&
 24              (first = firstWaiter) != null);
 25       // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
 26 }
 27 
 28 // 将节点从条件队列转移到阻塞队列
 29 // true 代表成功转移
 30 // false 代表在 signal 之前,节点已经取消了
 31 final boolean transferForSignal(Node node) {
 32 
 33     // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
 34     // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点
 35     // 否则,将 waitStatus 置为 0
 36     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 37         return false;
 38 
 39     // enq(node): 自旋进入阻塞队列的队尾
 40     // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
 41     Node p = enq(node);
 42     int ws = p.waitStatus;
 43     // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释
 44     // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
 45     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 46         // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节
 47         LockSupport.unpark(node.thread);
 48     return true;
 49 

        接下来唤醒后检查中断状态

  1 int interruptMode = 0;
  2 while (!isOnSyncQueue(node)) {
  3     // 线程挂起
  4     LockSupport.park(this);
  5 
  6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  7         break;
  8 }

 

  1 // 1. 如果在 signal 之前已经中断,返回 THROW_IE
  2 // 2. 如果是 signal 之后中断,返回 REINTERRUPT
  3 // 3. 没有发生中断,返回 0
  4 private int checkInterruptWhileWaiting(Node node) {
  5     return Thread.interrupted() ?
  6         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  7         0;
  8 }

 

  1 // 只有线程处于中断状态,才会调用此方法
  2 // 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
  3 // 返回 true:如果此线程在 signal 之前被取消,
  4 final boolean transferAfterCancelledWait(Node node) {
  5     // 用 CAS 将节点状态设置为 0 
  6     // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
  7     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  8         // 将节点放入阻塞队列
  9         // 这里我们看到,即使中断了,依然会转移到阻塞队列
 10         enq(node);
 11         return true;
 12     }
 13 
 14     // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
 15     // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
 16     // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
 17     while (!isOnSyncQueue(node))
 18         Thread.yield();
 19     return false;
 20 }

 

为什么有了sync queue 还需要 wait queue ?

          ConditionObject  里的await方法会(假如这个节点已经存在sync queue)释放锁移入 wait queue , signal 方法则是重新移入到 sync queue ,那么我们就可以知道了 wait queue 的作用是临时存放节点,移除在 sync queue 的节点(假如存在),再插入到 sync queue 的队尾。它的作用我们可以在阅读ArrayBlockingQueue源码时就可以感受到了。

 

 

参考资料

java 并发(五)---AbstractQueuedSynchronizer(2)

标签:actor   private   返回   try   monitor   文章   signal   int   exce   

原文地址:https://www.cnblogs.com/Benjious/p/10161633.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!