标签:success sig bubuko and shared shel .com sync art
一、AQS原理
AQS(AbstractQueuedSynchronizer)队列同步器是用来构建锁、同步组件的基础框架。
AQS内部通过一个volatile int类型的成员变量state控制同步状态【0代表锁未被占用,1表示已占用】,通过内部类Node构成FIFO的同步队列实现等待获取锁的线程排队工作,通过内部类ConditionObject构建条件等待队列,来完成等待条件线程的排队工作。当线程调用Condition对象的wait方法后会被加入等待队列中,当有线程调用Condition的signal方法后,线程将从等待队列移动到同步队列进行锁竞争。AQS内部会有一个同步队列和可能多个等待队列,前者存放等待获取锁的线程,后者分别存放等待不同条件的线程。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{ //指向同步队列队头 private transient volatile Node head; //指向同步的队尾 private transient volatile Node tail; //同步状态,0代表锁未被占用,1代表锁已被占用 private volatile int state; static final class Node { //共享模式 static final Node SHARED = new Node(); //独占模式 static final Node EXCLUSIVE = null; //标识线程已处于结束状态 static final int CANCELLED = 1; //等待被唤醒状态 static final int SIGNAL = -1; //条件状态, static final int CONDITION = -2; //在共享模式中使用表示获得的同步状态会被传播 static final int PROPAGATE = -3; //等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种 volatile int waitStatus; //同步队列中前驱结点 volatile Node prev; //同步队列中后继结点 volatile Node next; //请求锁的线程 volatile Thread thread; //等待队列中的后继结点,这个与Condition有关,稍后会分析 Node nextWaiter; //判断是否为共享模式 final boolean isShared() { return nextWaiter == SHARED; } //获取前驱结点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } //..... }
//上面Node head、tail对象构建同步队列,这里用ConditionObject类的对象创建条件等待队列 class ConditionObject implements Condition, java.io.Serializable { //等待队列第一个等待结点 private transient Node firstWaiter; //等待队列最后一个等待结点 private transient Node lastWaiter; //省略其他代码....... } //AQS中的模板方法,由其子类实现 //独占模式下获取锁的方法 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //独占模式下解锁的方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享模式下获取锁的方法 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //共享模式下解锁的方法 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //判断是否为持有独占锁 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } }
二、AQS的应用
AQS通过state状态管理、同步队列、等待队列实现了多线程同步锁获取与释放,多线程并发排队、条件等待等复杂功能。
作为基础组件,它对锁的两种模式【独占模式和共享模式】都提供支持。设计上AQS采用模板方法模式构建,其内部提供了并发操作的核心方法、而将一些实现不同模式下实现可能有差异的操作定义为模板方法,让其子类实现。如ReentrantLock通过内部类Sync及其子类继承AQS实现tryAcuire()和tryRelease()方法来实现独占锁,而SemaPhore则通过内部类继承AQS实现tryAcquireShared()方法和tryReleaseShared()方法实现共享模式锁。AQS的继承关系图如下:
三、ReetrantLock非公平锁实现分析AQS的用法
ReetrantLock中非公平锁 //加锁操作 public void lock() { sync.lock(); } /** * 非公平锁实现sync.lock() */ static final class NonfairSync extends Sync { //加锁 final void lock() { //执行CAS操作,获取同步状态 if (compareAndSetState(0, 1)) //成功则将独占锁线程设置为当前线程 setExclusiveOwnerThread(Thread.currentThread()); else //否则再次请求同步状态 acquire(1); } } //acquire为AQS本身实现的方法,其实现如下: public final void acquire(int arg) { //再次尝试获取同步状态 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //AQS子类Sync的子类NonfairSync中实现的tryAcquire方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } //nonfairTryAcquire方法 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //判断同步状态是否为0,并尝试再次获取同步状态 if (c == 0) { //执行CAS操作 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc); setState(nextc); return true; } return false; } //AQS本身实现的方法,将当前获取锁失败的线程构造成node结点加入的同步队列尾部,若队列为空或者并发入队失败,则调用enq方法重试。 private Node addWaiter(Node mode) { //将请求同步状态失败的线程封装成结点 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //如果是第一个结点加入肯定为空,跳过。 //如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加 if (pred != null) { node.prev = pred; //使用CAS执行尾部结点替换,尝试在尾部快速添加 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果第一次加入或者CAS操作没有成功执行enq入队操作 enq(node); return node; } //AQS本身实现方法,通过循环CAS操作将当前线程构造的node结点入队,解决上面队列为空或者是并发入队失败的情况; private Node enq(final Node node) { //死循环 for (;;) { Node t = tail; //如果队列为null,即没有头结点 if (t == null) { // Must initialize //创建并使用CAS设置头结点 if (compareAndSetHead(new Node())) tail = head; } else {//队尾添加新结点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //AQS本身方法,队列中结点循环观察,当自己的前驱是head结点执行的结点时尝试获取锁,若成功将其设置为头结点,否则循环尝试;若当前结点前驱结点不是头结点,则在设置其前驱结点状态后将自己挂起 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋,死循环 for (;;) { //获取前驱结点 final Node p = node.predecessor(); 当且仅当p为头结点才尝试获取同步状态 if (p == head && tryAcquire(arg)) { //将node设置为头结点 setHead(node); //清空原来头结点的引用便于GC p.next = null; // help GC failed = false; return interrupted; } //如果前驱结点不是head,判断是否挂起线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //最终都没能获取同步状态,结束该线程的请求 cancelAcquire(node); } } //设置为头结点 private void setHead(Node node) { head = node; //清空结点数据 node.thread = null; node.prev = null; } //如果前驱结点不是head,判断是否挂起线程 if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } //AQS本身的方法,若前驱结点状态为Node.SIGNAL则返回true,表示可以挂起当前结点,否则找到非结束状态的前驱结点,并设置其状态后,返回false private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取当前结点的等待状态 int ws = pred.waitStatus; //如果为等待唤醒(SIGNAL)状态则返回true if (ws == Node.SIGNAL) return true; //如果ws>0 则说明是结束状态, //遍历前驱结点直到找到没有结束状态的结点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果ws小于0又不是SIGNAL状态, //则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //AQS本身方法,挂起当前线程并检查其中断状态 private final boolean parkAndCheckInterrupt() { //将当前线程挂起 LockSupport.park(this); //获取线程中断状态,interrupted()是判断当前中断状态, //并非中断线程,因此可能true也可能false,并返回 return Thread.interrupted(); } //ReentrantLock类的unlock public void unlock() { sync.release(1); } //AQS类的release()方法 public final boolean release(int arg) { //尝试释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继结点的线程 unparkSuccessor(h); return true; } return false; } //ReentrantLock类中的内部类Sync实现的tryRelease(int releases) protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //判断状态是否为0,如果是则说明已释放同步状态 if (c == 0) { free = true; //设置Owner为null setExclusiveOwnerThread(null); } //设置更新同步状态 setState(c); return free; } //AQS本身方法,唤醒后续挂起的结点 private void unparkSuccessor(Node node) { //这里,node一般为当前线程所在的结点。 int ws = node.waitStatus; if (ws < 0)//置零当前线程所在的结点状态,允许失败。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一个需要唤醒的结点s if (s == null || s.waitStatus > 0) {//如果为空或已取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。 s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒 }
剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理
标签:success sig bubuko and shared shel .com sync art
原文地址:https://www.cnblogs.com/doit8791/p/9095424.html