标签:method 博客 row normal operation pos 详解 shared unlock
AQS(同步阻塞队列)是concurrent包下锁机制实现的基础,相信大家在读完本篇博客后会对AQS框架有一个较为清晰的认识
这篇博客主要针对AbstractQueuedSynchronizer的源码进行分析,大致分为三个部分:
所有的分析仅基于个人的理解,若有不正之处,请谅解和批评指正,不胜感激!!!
AQS在内部维护了一个同步阻塞队列,下面简称sync queue,该队列的元素即静态内部类Node的实例
首先来看Node中涉及的常量定义,源码如下
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor‘s thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
以下两个均为Node#nextWaiter字段的可取值
SHARED:若Node#nextWaiter为SHARED,那么表明该Node节点处于共享模式
EXCLUSIVE:若Node#nextWaiter为EXCLUSIVE,那么表明该Node节点处于独占模式
以下五个均为Node#waitStatus字段的可取值
CANCELLED:用于标记一个已被取消的节点,一旦Node#waitStatus的值被设为CANCELLED,那么waitStatus的值便不再被改变
SIGNAL:标记一个节点(记为node)处于这样一种状态:当node释放资源(unlock/release)时,node节点必须唤醒其后继节点
CONDITION:用于标记一个节点位于条件变量的阻塞队列中(我称这个阻塞队列为Condition list),本篇暂不介绍Condition相关源码,因此读者可以暂时忽略
PROPAGATE:仅用于标记sync queue头节点,且为一种暂时状态,表明共享状态正在传递中(如果有剩余资源且sync queue中仍有等待的节点,那么这些节点会依次获取资源,直至资源消耗殆尽或者队列为空),仅在共享模式中出现
其次,再看Node中重要字段,源码如下
/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn‘t need to * signal. So, most code doesn‘t need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev‘s from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter;
waitStatus:节点的状态,可取值有五种,分别是SIGNAL、CANCEL、CONDITION、PROPAGATE、0。其中独占模式仅涉及到SIGNAL、CANCEL、0三种状态,共享模式仅涉及到SIGNAL、CANCEL、PROPAGATE、0四种状态。CONDITION状态不会出现在sync queue中,而是位于条件变量的Condition list中,本篇博客暂不讨论ConditoinObject
pre:前继节点,该字段通过CAS操作进行赋值,保证可靠(现在不理解没关系,后面的方法解析会多次提到)
next:后继节点,该字段的赋值操作是非线程安全的,即next是不可靠的(Node#next为null并不代表节点不存在后继)。但是,一旦next不为null,那么next也是可靠的(现在不理解没关系,后面的方法解析会多次提到)
thread:该节点关联的线程
nextWaiter:独占模式中就是null,共享模式中就是SHARED。在ConditionObject的Condition list中指向下一个节点
注意:Condition list用nextWaiter来连接单向链表(pre与next是无用的),sync queue利用pre和next来连接双向链表(nextWaiter仅用于标记独占或者共享模式而已),不要搞混了!!!
AQS字段仅有三个,源码如下
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state;
head:sync queue队列的头节点
tail:sync queue队列的尾节点
state:资源状态
该方法是独占模式下的入口方法,可以相应interrupt,但是不会抛出InterruptedException异常
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { //首先执行tryAcquire(arg)尝试获取资源,如果成功则直接返回 //如果tryAcquire(arg)获取资源失败,则讲当前线程封装成Node节点加入到sync queue队列中,并通过acquireQueued进行获取资源直至成功(如果尚未有资源可获取,那么acquireQueued会阻塞当前线程) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
其中tryAcquire方法如下,该方法的具体含义交给AQS的子类去完成,注意,该方法的实现不可有任何耗时操作,更不可阻塞线程,仅实现是否可获取资源(换言之,是否可获取锁)的逻辑即可,源码如下
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
addWaiter的作用是:将当前线程封装成一个Node节点,并且添加到sync queue中
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //生成指定模式的Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //以下几行进行入队操作,如果失败,交给enq进行入队处理。其实,我认为可以直接调用enq,不知道作者设置如下几行的意图 if (pred != null) { node.prev = pred; //通过CAS操作串行化并发入队操作,仅有一个线程会成功,由于node节点的prev字段是在CAS操作之前进行的,一旦CAS操作成功,node节点的prev字段就是指向了其前继节点,因此说prev字段是安全的 if (compareAndSetTail(pred, node)) { //这里直接通过赋值操作赋值next字段,注意,可能有别的线程会在next字段赋值之前访问到next字段,因此next字段是非可靠的(一个节点的next字段为null并不代表该节点没有后继) pred.next = node; //一旦next字段赋值成功,那么next字段又变为可靠的了 return node; } } //通过enq入队 enq(node); return node; }
enq入队,源码如下
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node‘s predecessor */ private Node enq(final Node node) { //死循环进行入队操作,CAS操作常规模式 for (;;) { Node t = tail; //此时队列为空,需要初始化 if (t == null) { // Must initialize //此时可能多个线程都在执行该方法,因此只有一个线程才能初始化sync queue,此处添加的节点我称之为Dummy Node,该节点没有关联线程 if (compareAndSetHead(new Node())) tail = head; } else { //以下四行与addWaiter类似,不再赘述 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这里抛出一个问题:在初始化sync queue中,将一个new Node()设置为了sync queue的头结点,该节点没有关联任何线程,我称之为"Dummy Node",这个头结点"Dummy Node"待会可能会被设置为SIGNAL状态,那么它是如何唤醒后继节点的呢?我会在在讲到release时进行解释
到这里,线程已被封装成节点,并且成功添加到sync queue中去了,接下来,来看最重要的acquireQueued方法
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { //用于记录是否获取成功,我现在还不清楚何时会失败= = boolean failed = true; try { //记录是否被中断过,如果被中断过,则需要在acquire方法中恢复中断现场 boolean interrupted = false; //同样的套路,CAS配合死循环 for (;;) { //获取node节点的前继节点p final Node p = node.predecessor(); //当p为sync queue头结点时,才有资格尝试获取资源,换言之,当且仅当一个节点是sync queue中第二个节点时,它才有资格获取资源 if (p == head && tryAcquire(arg)) { //一旦获取成功,以下语句都是线程安全的,所有字段直接赋值即可,不需要CAS或者加锁 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //否则,找到前继节点,并将其设置为SIGNAL状态后阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //如果失败了 cancelAcquire(node); } }
该方法的主要逻辑就是:不断地通过死循环执行获取资源(当且仅当节点是sync queue中第二个节点时才有资格获取资源)或者阻塞自己的操作,只有成果获取资源后才能够返回
接下来,来看shouldParkAfterFailedAcquire方法以及parkAndCheckInterrupt方法
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node‘s predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //一旦发现前继节点是SIGNAL状态,就返回true,在acquireQueued方法中会阻塞当前线程 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ //这里给出两个问题: //1.如果在当前线程阻塞之前,前继节点就唤醒了当前线程,那么当前线程不就永远阻塞下去了吗? //2.万一有别的线程更改了前继节点的状态,导致前继节点不唤醒当前线程,那么当前线程不就永远阻塞下去了吗? return true; //如果前继节点处于CANCELL状态(仅有CANCELL状态大于0) if (ws > 0) { //那么跳过那些被CANCELL的节点,先前找到第一个有效节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //前继节点状态要么是0,要么是PROPAGATE,将其通过CAS操作设为SIGNAL,不用管是否成功,退回到上层函数acquireQueued进行再次判断 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don‘t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
该方法的主要逻辑就是:将前继节点设置为SIGNAL
关于上面提到的两个问题
1. 如果在当前线程阻塞之前,前继节点就唤醒了当前线程,那么当前线程不就永远阻塞下去了吗?--->AQS采用的是Unsafe#park以及Unsafe#unpark,这对方法能够很好的处理这类问题,可以先unpark获取一枚许可,然后执行park不会阻塞当前线程,而是消耗这个提前获取的许可,注意,多次unpark仅能获取一枚许可
2.万一有别的线程更改了前继节点的状态,导致前继节点不唤醒当前线程,那么当前线程不就永远阻塞下去了吗?--->一旦一个节点被设为SIGNAL状态,AQS框架保证,任何改变其SIGNAL状态的操作都会唤醒其后继节点,因此,只要节点看到其前继节点为SIGNAL状态,便可放心阻塞自己
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //返回是否被中断过 return Thread.interrupted(); }
至此,独占模式的acquire调用链分析完毕,总结一下:首先尝试获取锁(tryAcquire),若成功则直接返回。若失败,将当前线程封装成Node节点加入到sync queue队列中,当该节点位于第二个节点时,会重新尝试获取锁,成功则返回,失败则阻塞自己,直至前继节点唤醒自己
AQS通过死循环以及CAS操作来串行化并发操作,并且通过这种适当的自旋加阻塞,来减少频繁的加锁解锁操作
release方法是独占模式下释放资源(即解锁)的入口,源码如下
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { //调用tryRelease尝试释放资源 if (tryRelease(arg)) { Node h = head; //只要头节点不为空且状态不为0,就唤醒后继节点,对于独占模式也就只有SIGNAL状态一种,头结点在任何情况下都不可能为CANCELL状态 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在此,解释一下enq方法中提到的问题,即那个"Dummy Node"如何唤醒后继:由于"Dummy Node"不关联任何线程,因此真正的唤醒操作实际上是由外部的线程来完成的,这里的外部线程是指从未进入sync queue的线程,因此,"Dummy Node"节点设置为SIGNAL状态,也能够正常唤醒后继
同理,tryRelease也是交给AQS子类实现的方法,只需要定义释放资源的逻辑即可,该方法的实现不应该有耗时的操作,更不该阻塞
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
通过unparkSuccessor方法唤醒指定节点的后继节点
/** * Wakes up node‘s successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; //若节点状态小于0,将其通过CAS操作改为0,表明本次SIGNAL的任务已经完成,至于CAS是否成功,或者是否再次被其他线程修改,都与本次无关unparkSuccessor无关,只是该节点被赋予了新的任务而已。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //这里通过非可靠的next字段直接获取后继,如果非空,那么说明该字段可靠,如果为空,那么利用可靠的prev字段从tail向前找到当前node节点的后继节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继节点 if (s != null) LockSupport.unpark(s.thread); }
待续
待续
标签:method 博客 row normal operation pos 详解 shared unlock
原文地址:http://www.cnblogs.com/liuyehcf/p/7045186.html