码迷,mamicode.com
首页 > 其他好文 > 详细

AQS简介

时间:2019-03-05 18:38:24      阅读:248      评论:0      收藏:0      [点我收藏+]

标签:share   des   2.3   tst   except   must   流程   作用   将不   

1 基础

AQS的类图结构如下所示:
技术图片

AQS实现共享资源的访问控制基础:

  1. state字段,即同步器状态字段。用于共享资源的访问控制
  2. CLH队列,FIFO等待队列,存放竞争失败的线程。通常CLH队列是一个自旋队列,AQS以阻塞的方式实现

CLH队列的使用:
技术图片

1.1 常用字段:

// CLH队列中的头尾节点
private transient volatile Node head;
private transient volatile Node tail;
// 同步状态
private volatile int state;

注意:多线程同步获取资源成功,则state字段会自增;若有线程释放资源,则state字段自减。

1.2 CLH队列

CLH队列有AQS的内部类Node节点构成,节点内容如下:

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    
    //节点watiStatus的值
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}

因为其waitStatus的值是有序的,CANCELLED状态下值为正数,因此很多判断可以不使用等值比较。
数据结构中waitStatus为节点的等待状态。节点有4种状态(值也可以为0):

  • CANCELLED :终态,该节点被取消由于超时或中断
  • SIGNAL:该节点的后继节点是blocked(via park),所以当前节点release或cancels时,必须unpark它的后继节点
  • CONDITION:该节点处于条件队列中,将不会被用于sync queue,直到节点状态被设置为0
  • PROPAGATE:releaseShared应该被传播到其他节点

1.2 入队

addWaiter()方法的作用将一个Node节点放入到CLH队列的队尾。代码如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 尝试快速入队,失败则使用enq()方式
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

注意上述代码,共分为3个步骤:

  • 第一步:首先将oldTail赋值给newNode.prev:node.prev = pred
  • 第二步:将tail赋值给newNode:compareAndSetTail(pred, node)
  • 第三步:将oldTail的next指针指向newNode(即tail):pred.next = node

这3个步骤之间会存在时间差。因此可能存在这种情况:

nodeA添加到CLH队列并执行完步骤2,尚未执行步骤3时,刚好有其他线程遍历CLH队列,此时若从CLH队列head向tail节点方向遍历,就会漏掉节点。

为解决上述情况,假设我们称:从CLH的head向tail方向称为正向遍历;从tail向head方向称为逆向遍历。则:

先正向遍历,一旦遍历的结果为空,则从tail节点逆向遍历,直到遍历到和正向遍历相同的节点,视为遍历结束。

上述代码中如果快速入队失败,就会进行自旋入队方式的enq()方法,基本和addWaiter()方法一致:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

1.3 hasQueuedPredecessors()

该方法用于查询CLH队列中是否有节点比当前线程等待的更久。

  • 因为由于中断导致的取消或超时随时可能发生,因此不能保证CLH队列中的那些比当前线程等待更久的线程能获取到资源。
  • 同样的也可能存在这种情况,由于队列为空,导致方法返回false
public final boolean hasQueuedPredecessors() {
    Node t = tail;    // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

1.4 独占锁和共享锁

AQS提供了2种获取资源的模式,独占和共享。任何实现了AQS的实现类都只能实现2种模式中的一种,而不能同时实现。

独占模式

AQS的独占模式,提供了如下对外方法:

public final void acquire(int arg)  
public final void acquireInterruptibly(int arg)  
public final boolean tryAcquireNanos(int arg, long nanosTimeout)  
public final boolean release(int arg)

AQS的实现类,需要实现如下方法:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

共享模式

AQS的共享模式,提供了如下对外方法:

public final void acquireShared(int arg)  
public final void acquireSharedInterruptibly(int arg)  
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)  
public final boolean releaseShared(int arg)

AQS的实现类,需要实现如下方法:

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

2 获取独占资源

使用AQS获取独占资源时,使用acquire()方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}

2.1 acquireQueued()

上述方法可以知道,获取资源的核心实现在tryAcquire()方法中,即AQS的实现类中。在获取资源失败的情况下,会调用acquireQueued()方法进行入队操作(入队前会进行一次尝试获取资源)。如下代码:

/* 若node节点的前继节点是head节点,则会再次调用tryAcquire()获取资源。 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断当前节点是否可以进入park,若可以,让线程进入等待
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果获取资源失败,则取消
        if (failed)
            cancelAcquire(node);
    }
}

上述代码中,一共有3个注意点:

  • 判断当前节点的前继节点是否为head节点。若是,则表示该节点有资格尝试获取共享资源。此处的head节点的判断在一定程度上保证资源竞争的公平性
  • shouldParkAfterFailedAcquire():判断当前节点是否可以安全进入park()
  • parkAndCheckInterrupt():让线程进入等待
/** 该方法的作用在于判断当前节点中的线程,是否可以安全的进入park()。返回true,表示进程可以进入park。若前驱节点的waitStatus为SIGNAL,则表示当前节点可以安全的park()。 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果前驱节点的waitStatus为SIGNAL,则表示当前节点可以安全的park()
    if (ws == Node.SIGNAL) { return true; }   
    // waitStatus>0,即为CANCELLED状态,此时当前节点需要找到状态不为CANCELLED状态的节点,将其设置为自己的前驱节点,并将新的前驱节点的next指向自己。
    // 注意,这样做完之后,那些当前节点的waitStatus状态为CANCELLED的前驱节点链,将成为孤链。但这个孤链仍然有指向原等待队列的prev和next指针。只是原等待队列中已经没有指向孤链的节点指针
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 走到此处,表明前驱节点的状态为0或PROPAGATE。此时可以将前驱节点的waitStatus设置为SIGNAL状态
        // 注意:这里仍然要返回false,表明当前节点不能被park。我们需要在park之前,重试确认该节点不能获取到资源
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  // 代码A。  
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

2.2 图解流程

分析代码情况:按照前面enq()方法的分析,假若有t1,t2两个线程竞争资源,最后t1获取资源;t2进入到CLH队列,然后t2开始调用acquireQueued()方法。

  1. 进入循环前如下图
    技术图片
  2. 第一次循环:当前节点的前继节点为head,tryAcquire()获取资源,t1线程获取资源,获取失败,调用shouldParkAfterFailedAcquire()之后情况如下图
    技术图片
  3. 第二次循环:当前节点的前继节点为head,tryAcquire()获取资源,t1线程占用资源,获取失败,调用shouldParkAfterFailedAcquire()直接返回成功,当前节点进入WAIT状态。情况如下图
    技术图片
  4. 假设此时t3线程前来竞争资源(t1还占着资源呢)。此时又进入到addWaiter()方法,执行之后,如下图
    技术图片
  5. 然后接着调用acquireQueued()方法,执行完毕后,将t3线程进入WAIT状态,如下图
    技术图片

2.3 取消节点

节点取消需要做一系列操作:

  1. 当前节点的前继节点不能是CANCELLED状态。因此,我们需要从当前节点逆向遍历CLH找到第一个不为CANCELLED的节点pred:正常的节点
  2. 将当前节点状态修改为CANCELLED
  3. 然后就是将pred作为正常节点,当前节点及其前继节点为CANCELLED状态的节点链,记为cancelledNodes,剔除CLH队列。该操作,需要针对特殊节点判断:
    1. 如果当前节点是tail,此时表明pred可以作为tail节点
    2. 如果当前节点不是tail

      且pred是head,尝试调用unparkSuccessor(node),尝试唤醒当前节点的后继节点
      且pred不是head,从CLH队列中剔除cancelledNodes

如果当前节点的前继节点是head,那么当前节点被取消,就说明当前节点的后继节点就是head节点的后继节点了,此时作为head节点的后继节点,可以被unpark()

private void cancelAcquire(Node node) {
    if (node == null) { 
        return; 
    }

    /* 找到适合的前继节点,当前节点的waitStatus赋值为CANCELLED */
    node.thread = null;
    Node pred = node.prev;
    /* 若前继节点是CANCELLED,则继续找前继节点,直至找到一个正常的前继节点赋值给node,作为node的新前继节点 */
    while (pred.waitStatus > 0) { 
        node.prev = pred = pred.prev; 
    }  
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    /* 特殊情况:node==tail节点,将pred作为tail节点,然后将cancelledNodes节点链从CLH队列剔除 */
    if (node == tail && compareAndSetTail(node, pred)) {  
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        /* 正常情况:则将cancelledNodes节点链从CLH队列剔除 */
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {  
            /*  特殊情况:pred==head节点:尝试调用unparkSuccessor(node),尝试唤醒当前节点的后继节点 */
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

3 释放独占资源

资源的释放使用的是release()方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

调用tryRelease()方法释放资源:state。释放成功后,唤醒head节点的后继节点,unparkSuccessor()

/*注意:如果当前节点的后继节点为空,或者是被取消的节点。那就从tail节点逆向遍历CLH队列,直至找到一个距离当前节点node最近,且waitStatus<=0的节点,然后唤醒该节点*/
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) {
        compareAndSetWaitStatus(node, ws, 0); 
    }
    /* 若后继节点不符合唤醒标准,则逆向遍历CLH,直至找到一个距离当前节点node最近,且waitStatus<=0的节点 */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
  1. 假设之前的t1线程执行完毕,调用release()释放资源,释放前效果图:
    技术图片
  2. 调用unparkSuccessor()方法开始unpark()head节点的后继节点:

    1.将node节点waitStatus置为0
    技术图片
    2.unpark()之后会唤醒t2线程,线程会到之前的acquireQueued()方法的循环之中,尝试获取锁,获取成功,执行完毕后图2:
    技术图片

AQS简介

标签:share   des   2.3   tst   except   must   流程   作用   将不   

原文地址:https://www.cnblogs.com/wolfdriver/p/10478515.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!