标签:actor private 返回 try monitor 文章 signal int exce
文章部分代码和照片来自参考资料
ConditionObject 继承 Condition 这个接口, 看一下这个接口的注解说明 :
Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.
如果Lock替换了synchronized方法和语句的使用,则Condition将替换Object监视方法的使用。
Condition 经常可以用在生产者-消费者的场景中,ArrayBlockingQueue 采用这种方式实现了生产者-消费者.
ConditionObject 是 AQS里的一个对象,继承Condition 接口,上一节我们提到AQS 通过同步队列(sync queue )和 等待队列(wait queue )还有 状态变量(statue)进行并发控制。这节我们要讲的就是在等待队列的操作。
下面是 wait queue 和 sync queue 的图例。
这两个方法可以用下面两种两张图来描述。其中await 是将节点加入到 wait queue ,然后等待唤醒。 signal 方法是从wait queue 移动到 sync queue 中,然后唤醒。
图一. await 方法
图二. signal 方法
Conditon 的方法实现 基于 ReetranLock 。下面源码分析会涉及到。
Condition 的await 方法 包括的操作有 :
1 // 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly() 2 // 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断 3 public final void await() throws InterruptedException { 4 if (Thread.interrupted()) 5 throw new InterruptedException(); 6 // 添加到 condition 的条件队列中 7 Node node = addConditionWaiter(); 8 // 释放锁,返回值是释放锁之前的 state 值 9 int savedState = fullyRelease(node); 10 int interruptMode = 0; 11 // 这里退出循环有两种情况,之后再仔细分析 12 // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了 13 // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断 14 while (!isOnSyncQueue(node)) { 15 LockSupport.park(this); 16 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 17 break; 18 } 19 // 被唤醒后,将进入阻塞队列,等待获取锁 20 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 21 interruptMode = REINTERRUPT; 22 if (node.nextWaiter != null) // clean up if cancelled 23 unlinkCancelledWaiters(); 24 if (interruptMode != 0) 25 reportInterruptAfterWait(interruptMode); 26 }
1 // 将当前线程对应的节点入队,插入队尾 2 private Node addConditionWaiter() { 3 Node t = lastWaiter; 4 // 如果条件队列的最后一个节点取消了,将其清除出去 5 if (t != null && t.waitStatus != Node.CONDITION) { 6 // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列 7 unlinkCancelledWaiters(); 8 t = lastWaiter; 9 } 10 Node node = new Node(Thread.currentThread(), Node.CONDITION); 11 // 如果队列为空 12 if (t == null) 13 firstWaiter = node; 14 else 15 t.nextWaiter = node; 16 lastWaiter = node; 17 return node; 18 } 19 在addWaiter 方法中,有一个 unlinkCancelledWaiters() 方法,该方法用于清除队列中已经取消等待的节点。 20 21 当 await 的时候如果发生了取消操作(这点之后会说),或者是在节点入队的时候,发现最后一个节点是被取消的,会调用一次这个方法。 22 23 // 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去 24 // 纯属链表操作,很好理解,看不懂多看几遍就可以了 25 private void unlinkCancelledWaiters() { 26 Node t = firstWaiter; 27 Node trail = null; 28 while (t != null) { 29 Node next = t.nextWaiter; 30 // 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的 31 if (t.waitStatus != Node.CONDITION) { 32 t.nextWaiter = null; 33 if (trail == null) 34 firstWaiter = next; 35 else 36 trail.nextWaiter = next; 37 if (next == null) 38 lastWaiter = trail; 39 } 40 else 41 trail = t; 42 t = next; 43 } 44 }
ReentranLock 是可重入的,所以释放所有的锁。
1 // 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值 2 // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。 3 // 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1 4 // 相应的,如果 lock 重入了 n 次,savedState == n 5 // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException 6 final int fullyRelease(Node node) { 7 boolean failed = true; 8 try { 9 int savedState = getState(); 10 // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0 11 if (release(savedState)) { 12 failed = false; 13 return savedState; 14 } else { 15 throw new IllegalMonitorStateException(); 16 } 17 } finally { 18 if (failed) 19 node.waitStatus = Node.CANCELLED; 20 } 21 }
1 // 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION 2 // 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列, 3 // 这个方法就是判断 node 是否已经移动到阻塞队列了 4 final boolean isOnSyncQueue(Node node) { 5 // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到 6 // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中 7 // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列 8 if (node.waitStatus == Node.CONDITION || node.prev == null) 9 return false; 10 // 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了 11 if (node.next != null) 12 return true; 13 14 // 这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列 15 16 // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。 17 // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail, 18 // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。 19 20 // 调用这个方法的时候,往往我们需要的就在队尾的部分,所以一般都不需要完全遍历整个队列的 21 return findNodeFromTail(node); 22 } 23 24 // 从同步队列的队尾往前遍历,如果找到,返回 true 25 private boolean findNodeFromTail(Node node) { 26 Node t = tail; 27 for (;;) { 28 if (t == node) 29 return true; 30 if (t == null) 31 return false; 32 t = t.prev; 33 } 34 } 35 }
1 int interruptMode = 0; 2 while (!isOnSyncQueue(node)) { 3 // 线程挂起 4 LockSupport.park(this); 5 6 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 7 break; 8 }
回到前面的循环,isOnSyncQueue(node) 返回 false 的话,那么进到 LockSupport.park(this); 这里线程挂起。
接下来就是 signal 唤醒线程,转移到阻塞队列为了大家理解,这里我们先看唤醒操作,因为刚刚到 LockSupport.park(this); 把线程挂起了,等待唤醒。唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。
1 2 // 唤醒等待了最久的线程 3 // 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列 4 public final void signal() { 5 // 调用 signal 方法的线程必须持有当前的独占锁 6 if (!isHeldExclusively()) 7 throw new IllegalMonitorStateException(); 8 Node first = firstWaiter; 9 if (first != null) 10 doSignal(first); 11 } 12 13 // 从条件队列队头往后遍历,找出第一个需要转移的 node 14 // 因为前面我们说过,有些线程会取消排队,但是还在队列中 15 private void doSignal(Node first) { 16 do { 17 // 将 firstWaiter 指向 first 节点后面的第一个 18 // 如果将队头移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null 19 if ( (firstWaiter = first.nextWaiter) == null) 20 lastWaiter = null; 21 // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉 22 first.nextWaiter = null; 23 } while (!transferForSignal(first) && 24 (first = firstWaiter) != null); 25 // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推 26 } 27 28 // 将节点从条件队列转移到阻塞队列 29 // true 代表成功转移 30 // false 代表在 signal 之前,节点已经取消了 31 final boolean transferForSignal(Node node) { 32 33 // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消, 34 // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点 35 // 否则,将 waitStatus 置为 0 36 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 37 return false; 38 39 // enq(node): 自旋进入阻塞队列的队尾 40 // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点 41 Node p = enq(node); 42 int ws = p.waitStatus; 43 // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释 44 // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1) 45 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 46 // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节 47 LockSupport.unpark(node.thread); 48 return true; 49
接下来唤醒后检查中断状态
1 int interruptMode = 0; 2 while (!isOnSyncQueue(node)) { 3 // 线程挂起 4 LockSupport.park(this); 5 6 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 7 break; 8 }
1 // 1. 如果在 signal 之前已经中断,返回 THROW_IE 2 // 2. 如果是 signal 之后中断,返回 REINTERRUPT 3 // 3. 没有发生中断,返回 0 4 private int checkInterruptWhileWaiting(Node node) { 5 return Thread.interrupted() ? 6 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 7 0; 8 }
1 // 只有线程处于中断状态,才会调用此方法 2 // 如果需要的话,将这个已经取消等待的节点转移到阻塞队列 3 // 返回 true:如果此线程在 signal 之前被取消, 4 final boolean transferAfterCancelledWait(Node node) { 5 // 用 CAS 将节点状态设置为 0 6 // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0 7 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 8 // 将节点放入阻塞队列 9 // 这里我们看到,即使中断了,依然会转移到阻塞队列 10 enq(node); 11 return true; 12 } 13 14 // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0 15 // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成 16 // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断 17 while (!isOnSyncQueue(node)) 18 Thread.yield(); 19 return false; 20 }
ConditionObject 里的await方法会(假如这个节点已经存在sync queue)释放锁移入 wait queue , signal 方法则是重新移入到 sync queue ,那么我们就可以知道了 wait queue 的作用是临时存放节点,移除在 sync queue 的节点(假如存在),再插入到 sync queue 的队尾。它的作用我们可以在阅读ArrayBlockingQueue源码时就可以感受到了。
java 并发(五)---AbstractQueuedSynchronizer(2)
标签:actor private 返回 try monitor 文章 signal int exce
原文地址:https://www.cnblogs.com/Benjious/p/10161633.html