标签:share des 2.3 tst except must 流程 作用 将不
目录
AQS的类图结构如下所示:
AQS实现共享资源的访问控制基础:
state
字段,即同步器状态字段。用于共享资源的访问控制CLH
队列,FIFO等待队列,存放竞争失败的线程。通常CLH
队列是一个自旋队列,AQS以阻塞的方式实现CLH队列的使用:
// CLH队列中的头尾节点
private transient volatile Node head;
private transient volatile Node tail;
// 同步状态
private volatile int state;
注意:多线程同步获取资源成功,则state
字段会自增;若有线程释放资源,则state
字段自减。
CLH
队列有AQS的内部类Node节点构成,节点内容如下:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//节点watiStatus的值
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
因为其waitStatus的值是有序的,CANCELLED状态下值为正数,因此很多判断可以不使用等值比较。
数据结构中waitStatus为节点的等待状态。节点有4种状态(值也可以为0):
addWaiter()
方法的作用将一个Node
节点放入到CLH
队列的队尾。代码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队,失败则使用enq()方式
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
注意上述代码,共分为3个步骤:
这3个步骤之间会存在时间差。因此可能存在这种情况:
nodeA添加到CLH队列并执行完
步骤2
,尚未执行步骤3
时,刚好有其他线程遍历CLH队列,此时若从CLH队列head向tail节点方向遍历,就会漏掉节点。
为解决上述情况,假设我们称:从CLH的head向tail方向称为正向遍历;从tail向head方向称为逆向遍历。则:
先正向遍历,一旦遍历的结果为空,则从tail节点逆向遍历,直到遍历到和正向遍历相同的节点,视为遍历结束。
上述代码中如果快速入队失败,就会进行自旋入队方式的enq()
方法,基本和addWaiter()
方法一致:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
该方法用于查询CLH队列中是否有节点比当前线程等待的更久。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
AQS提供了2种获取资源的模式,独占和共享。任何实现了AQS的实现类都只能实现2种模式中的一种,而不能同时实现。
独占模式
AQS的独占模式,提供了如下对外方法:
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
AQS的实现类,需要实现如下方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
共享模式
AQS的共享模式,提供了如下对外方法:
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
AQS的实现类,需要实现如下方法:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
使用AQS获取独占资源时,使用acquire()
方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
上述方法可以知道,获取资源的核心实现在tryAcquire()
方法中,即AQS的实现类中。在获取资源失败的情况下,会调用acquireQueued()
方法进行入队操作(入队前会进行一次尝试获取资源)。如下代码:
/* 若node节点的前继节点是head节点,则会再次调用tryAcquire()获取资源。 */
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断当前节点是否可以进入park,若可以,让线程进入等待
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获取资源失败,则取消
if (failed)
cancelAcquire(node);
}
}
上述代码中,一共有3个注意点:
/** 该方法的作用在于判断当前节点中的线程,是否可以安全的进入park()。返回true,表示进程可以进入park。若前驱节点的waitStatus为SIGNAL,则表示当前节点可以安全的park()。 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前驱节点的waitStatus为SIGNAL,则表示当前节点可以安全的park()
if (ws == Node.SIGNAL) { return true; }
// waitStatus>0,即为CANCELLED状态,此时当前节点需要找到状态不为CANCELLED状态的节点,将其设置为自己的前驱节点,并将新的前驱节点的next指向自己。
// 注意,这样做完之后,那些当前节点的waitStatus状态为CANCELLED的前驱节点链,将成为孤链。但这个孤链仍然有指向原等待队列的prev和next指针。只是原等待队列中已经没有指向孤链的节点指针
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 走到此处,表明前驱节点的状态为0或PROPAGATE。此时可以将前驱节点的waitStatus设置为SIGNAL状态
// 注意:这里仍然要返回false,表明当前节点不能被park。我们需要在park之前,重试确认该节点不能获取到资源
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 代码A。
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
分析代码情况:按照前面enq()
方法的分析,假若有t1,t2两个线程竞争资源,最后t1获取资源;t2进入到CLH队列,然后t2开始调用acquireQueued()方法。
节点取消需要做一系列操作:
如果当前节点不是tail
且pred是head,尝试调用unparkSuccessor(node),尝试唤醒当前节点的后继节点
且pred不是head,从CLH队列中剔除cancelledNodes
如果当前节点的前继节点是head,那么当前节点被取消,就说明当前节点的后继节点就是head节点的后继节点了,此时作为head节点的后继节点,可以被unpark()
private void cancelAcquire(Node node) {
if (node == null) {
return;
}
/* 找到适合的前继节点,当前节点的waitStatus赋值为CANCELLED */
node.thread = null;
Node pred = node.prev;
/* 若前继节点是CANCELLED,则继续找前继节点,直至找到一个正常的前继节点赋值给node,作为node的新前继节点 */
while (pred.waitStatus > 0) {
node.prev = pred = pred.prev;
}
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
/* 特殊情况:node==tail节点,将pred作为tail节点,然后将cancelledNodes节点链从CLH队列剔除 */
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
/* 正常情况:则将cancelledNodes节点链从CLH队列剔除 */
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
/* 特殊情况:pred==head节点:尝试调用unparkSuccessor(node),尝试唤醒当前节点的后继节点 */
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
资源的释放使用的是release()
方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用tryRelease()
方法释放资源:state。释放成功后,唤醒head节点的后继节点,unparkSuccessor()
:
/*注意:如果当前节点的后继节点为空,或者是被取消的节点。那就从tail节点逆向遍历CLH队列,直至找到一个距离当前节点node最近,且waitStatus<=0的节点,然后唤醒该节点*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
/* 若后继节点不符合唤醒标准,则逆向遍历CLH,直至找到一个距离当前节点node最近,且waitStatus<=0的节点 */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
调用unparkSuccessor()方法开始unpark()head节点的后继节点:
1.将node节点waitStatus置为0
2.unpark()之后会唤醒t2线程,线程会到之前的acquireQueued()方法的循环之中,尝试获取锁,获取成功,执行完毕后图2:
标签:share des 2.3 tst except must 流程 作用 将不
原文地址:https://www.cnblogs.com/wolfdriver/p/10478515.html