码迷,mamicode.com
首页 > 编程语言 > 详细

图灵学院Java架构师-VIP-并发编程(AQS详解)

时间:2019-08-21 15:13:48      阅读:142      评论:0      收藏:0      [点我收藏+]

标签:must   field   共享资源   void   throws   接下来   中断问题   ali   工具   

1、LockSupport

LockSupport类的核心方法其实就两个:park()和unark(),其中park()方法用来阻塞当前调用线程,unpark()方法用于唤醒指定线程
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,可以把许可看成是一种(0,1)信号量(Semaphore),但与 Semaphore 不同的是,许可的累加上限是1。
初始时,permit为0,当调用unpark()方法时,线程的permit加1,当调用park()方法时,如果permit为0,则调用线程进入阻塞状态。

所以以下代码不会阻塞

// 初始信号量为0,调用unpark,信号量+1
LockSupport.unpark();
// 当前信号量为1,调用park,信号量-1
LockSupport.park();
// 以下代码可以继续执行
doSomething()

2、AQS

AbstractQueueSynchronizer是并发工具的核心,是一个抽象类,提供公平 / 非公平获取锁,获取可重入 / 不可重入锁,共享 / 排他等功能支持
AQS框架,分离了构建同步器时的一系列关注点,它的所有操作都围绕着资源——同步状态(synchronization state)来展开,并替用户解决了如下问题:

  1. 资源是可以被同时访问?还是在同一时间只能被一个线程访问?(共享/独占功能)
  2. 访问资源的线程如何进行并发管理?(等待队列)
  3. 如果线程等不及资源了,如何从等待队列退出?(超时/中断)

这其实是一种典型的模板方法设计模式:父类(AQS框架)定义好骨架和内部操作细节,具体规则由子类去实现。

什么是资源:

同步器 资源的定义
ReentrantLock 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数
CountDownLatch 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。
Semaphore 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。
ReentrantReadWriteLock 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。

AQS-API
共享和排他

钩子方法 描述
tryAcquire 排它获取(资源数)
tryRelease 排它释放(资源数)
tryAcquireShared 共享获取(资源数)
tryReleaseShared 共享获取(资源数)
isHeldExclusively 是否排它状态
  1. 支持中断超时
  2. 支持独占和共享
  3. 支持Condition条件等待

CAS操作方法
Java中CAS操作的实现都委托给一个名为UnSafe类

方法名 修饰符 描述
compareAndSetState protected final CAS修改同步状态值
compareAndSetHead private final CAS修改等待队列的头指针
compareAndSetTail private final CAS修改等待队列的尾指针
compareAndSetWaitStatus private static final CAS修改结点的等待状态
compareAndSetNext private static final CAS修改结点的next指针

等待队列

方法名 修饰符 描述
enq private 入队操作
addWaiter private 入队操作
setHead private 设置头结点
unparkSuccessor private 唤醒后继结点
doReleaseShared private 释放共享结点
setHeadAndPropagate private 设置头结点并传播唤醒

资源获取操作

方法名 修饰符 描述
cancelAcquire private 取消获取资源
shouldParkAfterFailedAcquire private static 判断是否阻塞当前调用线程
acquireQueued final 尝试获取资源,获取失败尝试阻塞线程
doAcquireInterruptibly private 独占地获取资源(响应中断)
doAcquireNanos private 独占地获取资源(限时等待)
doAcquireShared private 共享地获取资源
doAcquireSharedInterruptibly private 共享地获取资源(响应中断)
doAcquireSharedNanos private 共享地获取资源(限时等待)
acquire public final 独占地获取资源
acquireInterruptibly public final 独占地获取资源(响应中断)
acquireInterruptibly public final 独占地获取资源(限时等待)
acquireShared public final 共享地获取资源
acquireSharedInterruptibly public final 共享地获取资源(响应中断)
tryAcquireSharedNanos public final 共享地获取资源(限时等待)

资源释放:

方法名 修饰符 描述
release public final 释放独占资源
releaseShared public final 释放共享资源

3、等待队列

CLH队列中的结点是对线程的包装,结点一共有两种类型:独占(EXCLUSIVE)和共享(SHARED)。
每种类型的结点都有一些状态,其中独占结点使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享结点使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。

结点状态 描述
CANCELLED 1 取消。表示后驱结点被中断或超时,需要移出队列
SIGNAL -1 发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。)
CONDITION -2 Condition专用。表示当前结点在Condition队列中,因为等待某个条件而被阻塞了
PROPAGATE -3 传播。适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。)
INITIAL 0 默认。新结点会处于这种状态
static final class Node {
    
    // 共享模式结点
    private static final Node SHARED = new Node();
    
    // 独占模式结点
    private?static final Node EXCLUSIVE = null;

    private?static final int CANCELLED =  1;

    private?static final int SIGNAL    = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;
    // 等待状态
    volatile int waitStatus;

    // 前驱指针
    volatile Node prev;

    // 后驱指针
    volatile Node next;

    // 结点所包装的线程
    volatile Thread thread;

    // Condition队列使用,存储condition队列中的后继节点
    Node nextWaiter;

    Node() {
    }

    Node(Thread thread, Node mode) { 
        this.nextWaiter = mode;
        this.thread = thread;
    }
}

4、加锁

这里以ReentrantLock为例,看看 FairSync 和 NonfairSync 的源码

  • FairSync
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897090466540L;
    
    // 加锁
    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 如果头和尾指向了同一个对象(null)或者头节点下一个为当前节点时,说明队列没有节点,或仅有一个当前节点
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • NonfairSync
/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

可以发现他们两个都继承自 Sync,Sync继承自AbstractQueuedSynchronizer,在ReentrantLock中的实现如下

  • Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

public void lock() {
    sync.lock();
}

ReentrantLock的加锁直接委托了Sync的 lock,在Sync中,lock是个抽象方法,依次查看NonfairSync 和 FairSync 的实现,如上文源码注释

非公平锁:

1sync 修改 state (compareAndSetState),尝试直接获取锁,获取成功,则设置排他属性。获取失败,则执行AQS获取锁逻辑

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

2、执行获取锁,tryAcquire (NonfairSync 和 FairSync都有各自的实现),获取失败,获取等待队列,将线程放入等待队列中

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 将当前线程包装为独占节点加入队列
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

3、NonfairSync 对 tryAcquire 的实现

  • 获取state状态
  • 如果当前线程持有锁,设置nextc, 添加acquires,并设置给state,可见Reentrant支持可重入锁
  • 如果当前线程没有持有锁,CAS尝试获取锁,获取成功,设置排他性,否则获取锁失败,返回false
// 委托给父类
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
// 父类实现
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取state状态
    int c = getState();
    // 如果当前线程持有锁,设置nextc, 添加acquires,并设置给state,可见Reentrant支持可重入锁
    // 如果当前线程没有持有锁,CAS尝试获取锁,获取成功,设置排他性,否则获取锁失败,返回false
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

3、获取锁失败的处理:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

  • 为当前线程创建排队节点(这是一个双向链表)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 将当前节点加入到队列尾部(算是优化吧)
        node.prev = pred;
        // 设置尾节点为当前节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果尾为null, 先初始化队列
    enq(node);
    return node;
}

// node构造函数
Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

// 自旋将节点加入尾部,包含队列初始化
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // Must initialize
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

为已经在队列中的线程以独占不间断模式获取。 由条件等待方法使用以及获取

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果当前节点的前驱节点是头节点,当前线程再次获取锁,如果成功,进入if
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为头节点,返回中断状态
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 尝试阻塞线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 退出等待
        if (failed)
            cancelAcquire(node);
    }
}

// 判断是否能阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 判断前驱节点状态
    if (ws == Node.SIGNAL)
        // SIGNAL:前驱节点释放锁时,会唤醒当前节点,可以阻塞
        return true;
    if (ws > 0) {
        // CANCELED:前驱节点已中断/取消,需要从队列中移除
        // 循环检查,剔除队列前面无效的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 再将当前节点放入队列
        pred.next = node;
    } else {
        // 将前驱节点修改为 SIGNAL, 自旋再次执行此方法时,将走第一条分支
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

acquireQueued方法抛出异常时会执行 cancelAcquire

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    // 再次剔除无效的前驱节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
         node.prev = pred = pred.prev;

    // 前驱节点的原后继节点,用于后续CAS操作
    Node predNext = pred.next;

    // 当前节点设置为打断
    node.waitStatus = Node.CANCELLED;

    // 如果当前节点为尾节点,设置最后一个有效节点为尾节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 有效的前驱节点属性next设置为null
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 唤醒后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

5、解锁

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    // 尝试解锁
    if (tryRelease(arg)) {
        Node h = head;
        // 释放锁成功唤醒后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁,本质为维护state变量,此处支持可重入锁,如果state值为0,说明释放,取消线程的独占,并更新state

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

唤醒后继节点

private void unparkSuccessor(Node node) {
    
    // 状态置为0,初始化
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 如果 s 为 null, 从后向前迭代,找到最前的未被CANCALLED的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒后继节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

后继节点被唤醒后,回到之前的逻辑,开始争夺锁,并将头节点设置为当前节点

6、其他特性

公平锁和非公平锁:

公平锁尝试加锁时,先判断队列中是否有等待线程,如果有,直接进队列

非公平锁直接获取锁,获取失败才进队列

中断特性:

如下代码中,使用了一个bool标记返回标识线程的中断状态,而中断锁会直接抛出异常

private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);//以独占模式放入队列尾部
    boolean failed = true;
    try {
        for (; ; ) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                // 中断问题,抛出异常;非中断方法中,返回中断的 bool 变量   
                throw new InterruptedException();
        }
    } finally {
        if (failed)
          cancelAcquire(node);
    }
}

限时等待:

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

具体实现:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);// 加入队列
    boolean failed = true;
    try {
        for (; ; ) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 自旋时更新剩余等待时间
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                // 超时直接返回获取失败
                return false;
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                // 阻塞指定时长,超时则线程自动被唤醒,自旋时,将在上一个if块退出
                // 底层通过 unsafe 实现
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())// 当前线程中断状态
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

7、条件队列

当线程在指定Condition对象上等待的时候,是将线程包装成结点,加入了条件队列,然后阻塞。当线程被通知唤醒时,则是将条件队列中的结点转换成等待队列中的结点,之后的处理就和独占功能完全一样。

J.U.C包提供了Conditon接口,用以对原生的Object.wait()Object.notify()进行增强。

public Condition newCondition() {
        return sync.newCondition();
}
final ConditionObject newCondition() {
    return new ConditionObject();
}

在ReentrantLock中,通过内部ConditionObject实现了Condition接口,提供对条件队列的支持

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /**
         * First node of condition queue.
     */
    private transient Node firstWaiter;
    /**
     * Last node of condition queue.
     */
    private transient Node lastWaiter;
    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() {
    }
    ...
}

条件队列操作:

1、加入条件队列等待,条件队列入口

public final void await() throws InterruptedException {
    // 如果当前线程被中断则直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 把当前节点加入条件队列
    Node node = addConditionWaiter();
    // 释放掉已经获取的独占锁资源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果不在同步队列中则不断挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 这里被唤醒可能是正常的signal操作也可能是中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    /**
     * 走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
     * 和独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
     * 在处理中断之前首先要做的是从同步队列中成功获取锁资源
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
    // 删除条件队列中被取消的节点
    // clean up if cancelled
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 根据不同模式处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter:将当前线程包装为节点加入条件队列

/**
 * 1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
 * 2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果最后一个节点被取消,则删除队列中被取消的节点
    //至于为啥是最后一个节点后面会分析
    if (t != null && t.waitStatus != Node.CONDITION) {
        //删除所有被取消的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

删除条件队列当中被取消的节点

/**
 * 删除条件队列当中被取消的节点
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            // 判断中间变量
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        } else
            // 保存的是最靠前的有效条件节点
            trail = t;
        t = next;
    }
}

释放所有资源

/**
 * 入参就是新创建的节点,即当前节点
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            //如果这里释放失败,则抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        /**
         * 如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中
         * 只需要判断最后一个节点是否被取消就可以了
         */
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

判断节点是否在同步队列中

/**
 * 判断节点是否在同步队列中
 */
final boolean isOnSyncQueue(Node node) {
    //快速判断1:节点状态或者节点没有前置节点
    //注:同步队列是有头节点的,而条件队列没有
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //上面如果无法判断则进入复杂判断
    return findNodeFromTail(node);
}

2、唤醒等待队列

/**
 * 通知条件队列当中节点到同步队列当中去排队
 */
public final void signal() {
    // 节点不能已经持有独占锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    /**
     * 发信号通知条件队列的节点准备到同步队列当中去排队
     */
            doSignal(first);
}

排队过程

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

transferForSignal方法会将CONDITON结点转换为初始结点,并插入【等待队列】

final boolean transferForSignal(Node node) {
    // 尝试转化为初始节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
        
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

8、共享锁

AQS的共享功能,通过钩子方法tryAcquireShared暴露,与独占功能最主要的区别就是:

共享功能的结点,一旦被唤醒,会向队列后部传播(Propagate)状态,以实现共享结点的连续唤醒。这也是共享的含义,当锁被释放时,所有持有该锁的共享线程都会被唤醒,并从等待队列移除。

以CountDownLatch为例:CountDownLatch内部继承了AQS,覆盖了共享获取和释放锁的方法

构造:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) 
        // 只要 state == 0 就获取锁成功
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

await():

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取共享锁,取决于state变量是否为0(1:获取成功;-1:获取失败)
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 以共享节点形式加入等待队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱节点为头节点
            if (p == head) {
                // 再次获取锁
                int r = tryAcquireShared(arg);
                // 获取成功设置当前节点为头节点
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 判断阻塞条件,响应中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 移除当前节点
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    // 判断头节点等待状态
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 如果头节点等待状态为 SIGN, 设置头节点状态归0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 传递唤醒后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 退出循环的条件是head并未改变
        if (h == head)                   // loop if head changed
            break;
    }
}

countDown():

public void countDown() {
    // 释放资源
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放资源成功后执行
        doReleaseShared();
        return true;
    }
    return false;
}

图灵学院Java架构师-VIP-并发编程(AQS详解)

标签:must   field   共享资源   void   throws   接下来   中断问题   ali   工具   

原文地址:https://www.cnblogs.com/zuier/p/11388795.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!