码迷,mamicode.com
首页 > 其他好文 > 详细

并发编程专题五:抽象队列同步器AQS应用Lock

时间:2021-02-15 12:08:25      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:管理机   iso   应该   方法   sof   countdown   构造函数   offset   不同   

并发之父

技术图片

   生平不识Doug Lea,学懂并发也枉然

  Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

ReentrantLock

  ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。

1 使用ReentrantLock进行同步
2 ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁
3 lock.lock() //加锁
4 lock.unlock() //解锁
  ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
  在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
  1、FairSync 公平锁的实现
  2、NonfairSync 非公平锁的实现
  这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。
  上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现

AQS具备特性

  * 阻塞等待队列

  * 共享/独占

  * 公平/非公平

  * 可重入

  * 允许中断

  除了Lock外,Java.util.concurrent当中同步器的实现如Latch,Barrier,BlockingQueue等,都是基于AQS框架实现

  * 一般通过定义内部类Sync继承AQS

  * 将同步器所有调用都映射到Sync对应的方法

  AQS内部维护属性volatile int state (32位)

  * state表示资源的可用状态

  State三种访问方式
    getState()、setState()、compareAndSetState()
  AQS定义两种资源共享方式

  * Exclusive-独占,只有一个线程能执行,如ReentrantLock

  * Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

  AQS定义两种队列

  * 同步等待队列

  * 条件等待队列

  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  * isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

  * tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

  * tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

  * tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  * tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

同步等待队列

  AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

技术图片

条件等待队列

  Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁

技术图片

AQS源码分析

   1 public abstract class AbstractQueuedSynchronizer
   2         extends AbstractOwnableSynchronizer
   3         implements java.io.Serializable {
   4     private static final long serialVersionUID = 7373984972572414691L;
   5 
   6     /**
   7      * Creates a new {@code AbstractQueuedSynchronizer} instance
   8      * with initial synchronization state of zero.
   9      */
  10     protected AbstractQueuedSynchronizer() { }
  11 
  12     /**
  13      * Wait queue node class.
  14      *
  15      * 不管是条件队列,还是CLH等待队列
  16      * 都是基于Node类
  17      * 
  18      * AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人
  19      * 发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的
  20      * CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
  21      */
  22     static final class Node {
  23         /**
  24          * 标记节点未共享模式
  25          * */
  26         static final Node SHARED = new Node();
  27         /**
  28          *  标记节点为独占模式
  29          */
  30         static final Node EXCLUSIVE = null;
  31 
  32         /**
  33          * 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
  34          * */
  35         static final int CANCELLED =  1;
  36         /**
  37          *  后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,
  38          *  将会通知后继节点,使后继节点的线程得以运行。
  39          */
  40         static final int SIGNAL    = -1;
  41         /**
  42          *  节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,
  43          *  该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
  44          */
  45         static final int CONDITION = -2;
  46         /**
  47          * 表示下一次共享式同步状态获取将会被无条件地传播下去
  48          */
  49         static final int PROPAGATE = -3;
  50 
  51         /**
  52          * 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态
  53          * 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
  54          * 即被一个线程修改后,状态会立马让其他线程可见。
  55          */
  56         volatile int waitStatus;
  57 
  58         /**
  59          * 前驱节点,当前节点加入到同步队列中被设置
  60          */
  61         volatile Node prev;
  62 
  63         /**
  64          * 后继节点
  65          */
  66         volatile Node next;
  67 
  68         /**
  69          * 节点同步状态的线程
  70          */
  71         volatile Thread thread;
  72 
  73         /**
  74          * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,
  75          * 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。
  76          */
  77         Node nextWaiter;
  78 
  79         /**
  80          * Returns true if node is waiting in shared mode.
  81          */
  82         final boolean isShared() {
  83             return nextWaiter == SHARED;
  84         }
  85 
  86         /**
  87          * 返回前驱节点
  88          */
  89         final Node predecessor() throws NullPointerException {
  90             Node p = prev;
  91             if (p == null)
  92                 throw new NullPointerException();
  93             else
  94                 return p;
  95         }
  96         //空节点,用于标记共享模式
  97         Node() {    // Used to establish initial head or SHARED marker
  98         }
  99         //用于同步队列CLH
 100         Node(Thread thread, Node mode) {     // Used by addWaiter
 101             this.nextWaiter = mode;
 102             this.thread = thread;
 103         }
 104         //用于条件队列
 105         Node(Thread thread, int waitStatus) { // Used by Condition
 106             this.waitStatus = waitStatus;
 107             this.thread = thread;
 108         }
 109     }
 110     
 111     /**
 112      * 指向同步等待队列的头节点
 113      */
 114     private transient volatile Node head;
 115 
 116     /**
 117      * 指向同步等待队列的尾节点
 118      */
 119     private transient volatile Node tail;
 120 
 121     /**
 122      * 同步资源状态
 123      */
 124     private volatile int state;
 125 
 126     /**
 127      * 
 128      * @return current state value
 129      */
 130     protected final int getState() {
 131         return state;
 132     }
 133 
 134     protected final void setState(int newState) {
 135         state = newState;
 136     }
 137 
 138     /**
 139      * Atomically sets synchronization state to the given updated
 140      * value if the current state value equals the expected value.
 141      * This operation has memory semantics of a {@code volatile} read
 142      * and write.
 143      *
 144      * @param expect the expected value
 145      * @param update the new value
 146      * @return {@code true} if successful. False return indicates that the actual
 147      *         value was not equal to the expected value.
 148      */
 149     protected final boolean compareAndSetState(int expect, int update) {
 150         // See below for intrinsics setup to support this
 151         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 152     }
 153 
 154     // Queuing utilities
 155 
 156     /**
 157      * The number of nanoseconds for which it is faster to spin
 158      * rather than to use timed park. A rough estimate suffices
 159      * to improve responsiveness with very short timeouts.
 160      */
 161     static final long spinForTimeoutThreshold = 1000L;
 162 
 163     /**
 164      * 节点加入CLH同步队列
 165      */
 166     private Node enq(final Node node) {
 167         for (;;) {
 168             Node t = tail;
 169             if (t == null) { // Must initialize
 170                 //队列为空需要初始化,创建空的头节点
 171                 if (compareAndSetHead(new Node()))
 172                     tail = head;
 173             } else {
 174                 node.prev = t;
 175                 //set尾部节点
 176                 if (compareAndSetTail(t, node)) {//当前节点置为尾部
 177                     t.next = node; //前驱节点的next指针指向当前节点
 178                     return t;
 179                 }
 180             }
 181         }
 182     }
 183 
 184     /**
 185      * Creates and enqueues node for current thread and given mode.
 186      *
 187      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 188      * @return the new node
 189      */
 190     private Node addWaiter(Node mode) {
 191         // 1. 将当前线程构建成Node类型
 192         Node node = new Node(Thread.currentThread(), mode);
 193         // Try the fast path of enq; backup to full enq on failure
 194         Node pred = tail;
 195         // 2. 1当前尾节点是否为null?
 196         if (pred != null) {
 197             // 2.2 将当前节点尾插入的方式
 198             node.prev = pred;
 199             // 2.3 CAS将节点插入同步队列的尾部
 200             if (compareAndSetTail(pred, node)) {
 201                 pred.next = node;
 202                 return node;
 203             }
 204         }
 205         enq(node);
 206         return node;
 207     }
 208 
 209     /**
 210      * Sets head of queue to be node, thus dequeuing. Called only by
 211      * acquire methods.  Also nulls out unused fields for sake of GC
 212      * and to suppress unnecessary signals and traversals.
 213      *
 214      * @param node the node
 215      */
 216     private void setHead(Node node) {
 217         head = node;
 218         node.thread = null;
 219         node.prev = null;
 220     }
 221 
 222     /**
 223      *
 224      */
 225     private void unparkSuccessor(Node node) {
 226         //获取wait状态
 227         int ws = node.waitStatus;
 228         if (ws < 0)
 229             compareAndSetWaitStatus(node, ws, 0);// 将等待状态waitStatus设置为初始值0
 230 
 231         /**
 232          * 若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点
 233          * 进行唤醒
 234          */
 235         Node s = node.next; //head.next = Node1 ,thread = T3
 236         if (s == null || s.waitStatus > 0) {
 237             s = null;
 238             for (Node t = tail; t != null && t != node; t = t.prev)
 239                 if (t.waitStatus <= 0)
 240                     s = t;
 241         }
 242         if (s != null)
 243             LockSupport.unpark(s.thread);//唤醒线程,T3唤醒
 244     }
 245 
 246     /**
 247      * 把当前结点设置为SIGNAL或者PROPAGATE
 248      * 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
 249      * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
 250      * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
 251      */
 252     private void doReleaseShared() {
 253         for (;;) {
 254             Node h = head;
 255             if (h != null && h != tail) {
 256                 int ws = h.waitStatus;
 257                 if (ws == Node.SIGNAL) {//head是SIGNAL状态
 258                     /* head状态是SIGNAL,重置head节点waitStatus为0,E这里不直接设为Node.PROPAGAT,
 259                      * 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
 260                      * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
 261                      */
 262                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 263                         continue; //设置失败,重新循环
 264                     /* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
 265                      * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
 266                      * 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
 267                      */
 268                     unparkSuccessor(h);
 269                     /*
 270                      * 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
 271                      * 意味着需要将状态向后一个节点传播
 272                      */
 273                 }
 274                 else if (ws == 0 &&
 275                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 276                     continue;                // loop on failed CAS
 277             }
 278             if (h == head) //如果head变了,重新循环
 279                 break;
 280         }
 281     }
 282 
 283     /**
 284      * 把node节点设置成head节点,且Node.waitStatus->Node.PROPAGATE
 285      */
 286     private void setHeadAndPropagate(Node node, int propagate) {
 287         Node h = head; //h用来保存旧的head节点
 288         setHead(node);//head引用指向node节点
 289         /* 这里意思有两种情况是需要执行唤醒操作
 290          * 1.propagate > 0 表示调用方指明了后继节点需要被唤醒
 291          * 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
 292          */
 293         if (propagate > 0 || h == null || h.waitStatus < 0 ||
 294                 (h = head) == null || h.waitStatus < 0) {
 295             Node s = node.next;
 296             if (s == null || s.isShared())//node是最后一个节点或者 node的后继节点是共享节点
 297                 /* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0
 298                  * head节点状态为0(第一次添加时是0),设置head.waitStatus->Node.PROPAGATE表示状态需要向后继节点传播
 299                  */
 300                 doReleaseShared();
 301         }
 302     }
 303 
 304     // Utilities for various versions of acquire
 305 
 306     /**
 307      * 终结掉正在尝试去获取锁的节点
 308      * @param node the node
 309      */
 310     private void cancelAcquire(Node node) {
 311         // Ignore if node doesn‘t exist
 312         if (node == null)
 313             return;
 314 
 315         node.thread = null;
 316 
 317         // 剔除掉一件被cancel掉的节点
 318         Node pred = node.prev;
 319         while (pred.waitStatus > 0)
 320             node.prev = pred = pred.prev;
 321 
 322         // predNext is the apparent node to unsplice. CASes below will
 323         // fail if not, in which case, we lost race vs another cancel
 324         // or signal, so no further action is necessary.
 325         Node predNext = pred.next;
 326 
 327         // Can use unconditional write instead of CAS here.
 328         // After this atomic step, other Nodes can skip past us.
 329         // Before, we are free of interference from other threads.
 330         node.waitStatus = Node.CANCELLED;
 331 
 332         // If we are the tail, remove ourselves.
 333         if (node == tail && compareAndSetTail(node, pred)) {
 334             compareAndSetNext(pred, predNext, null);
 335         } else {
 336             // If successor needs signal, try to set pred‘s next-link
 337             // so it will get one. Otherwise wake it up to propagate.
 338             int ws;
 339             if (pred != head &&
 340                     ((ws = pred.waitStatus) == Node.SIGNAL ||
 341                             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
 342                     pred.thread != null) {
 343                 Node next = node.next;
 344                 if (next != null && next.waitStatus <= 0)
 345                     compareAndSetNext(pred, predNext, next);
 346             } else {
 347                 unparkSuccessor(node);
 348             }
 349 
 350             node.next = node; // help GC
 351         }
 352     }
 353 
 354     /**
 355      * 
 356      */
 357     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 358         int ws = pred.waitStatus;
 359         if (ws == Node.SIGNAL)
 360             /*
 361              * 若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
 362              */
 363             return true;
 364         if (ws > 0) {
 365             /*
 366              * 前驱节点状态如果被取消状态,将被移除出队列
 367              */
 368             do {
 369                 node.prev = pred = pred.prev;
 370             } while (pred.waitStatus > 0);
 371             pred.next = node;
 372         } else {
 373             /*
 374              * 当前驱节点waitStatus为 0 or PROPAGATE状态时
 375              * 将其设置为SIGNAL状态,然后当前结点才可以可以被安全地park
 376              */
 377             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 378         }
 379         return false;
 380     }
 381 
 382     /**
 383      * 中断当前线程
 384      */
 385     static void selfInterrupt() {
 386         Thread.currentThread().interrupt();
 387     }
 388 
 389     /**
 390      * 阻塞当前节点,返回当前Thread的中断状态
 391      * LockSupport.park 底层实现逻辑调用系统内核功能 pthread_mutex_lock 阻塞线程
 392      */
 393     private final boolean parkAndCheckInterrupt() {
 394         LockSupport.park(this);//阻塞
 395         return Thread.interrupted();
 396     }
 397 
 398     /**
 399      * 已经在队列当中的Thread节点,准备阻塞等待获取锁
 400      */
 401     final boolean acquireQueued(final Node node, int arg) {
 402         boolean failed = true;
 403         try {
 404             boolean interrupted = false;
 405             for (;;) {//死循环
 406                 final Node p = node.predecessor();//找到当前结点的前驱结点
 407                 if (p == head && tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
 408                     setHead(node);//获取同步状态成功,将当前结点设置为头结点。
 409                     p.next = null; // help GC
 410                     failed = false;
 411                     return interrupted;
 412                 }
 413                 /**
 414                  * 如果前驱节点不是Head,通过shouldParkAfterFailedAcquire判断是否应该阻塞
 415                  * 前驱节点信号量为-1,当前线程可以安全被parkAndCheckInterrupt用来阻塞线程
 416                  */
 417                 if (shouldParkAfterFailedAcquire(p, node) &&
 418                         parkAndCheckInterrupt())
 419                     interrupted = true;
 420             }
 421         } finally {
 422             if (failed)
 423                 cancelAcquire(node);
 424         }
 425     }
 426 
 427     /**
 428      * 与acquireQueued逻辑相似,唯一区别节点还不在队列当中需要先进行入队操作
 429      */
 430     private void doAcquireInterruptibly(int arg)
 431             throws InterruptedException {
 432         final Node node = addWaiter(Node.EXCLUSIVE);//以独占模式放入队列尾部
 433         boolean failed = true;
 434         try {
 435             for (;;) {
 436                 final Node p = node.predecessor();
 437                 if (p == head && tryAcquire(arg)) {
 438                     setHead(node);
 439                     p.next = null; // help GC
 440                     failed = false;
 441                     return;
 442                 }
 443                 if (shouldParkAfterFailedAcquire(p, node) &&
 444                         parkAndCheckInterrupt())
 445                     throw new InterruptedException();
 446             }
 447         } finally {
 448             if (failed)
 449                 cancelAcquire(node);
 450         }
 451     }
 452 
 453     /**
 454      * 独占模式定时获取
 455      */
 456     private boolean doAcquireNanos(int arg, long nanosTimeout)
 457             throws InterruptedException {
 458         if (nanosTimeout <= 0L)
 459             return false;
 460         final long deadline = System.nanoTime() + nanosTimeout;
 461         final Node node = addWaiter(Node.EXCLUSIVE);//加入队列
 462         boolean failed = true;
 463         try {
 464             for (;;) {
 465                 final Node p = node.predecessor();
 466                 if (p == head && tryAcquire(arg)) {
 467                     setHead(node);
 468                     p.next = null; // help GC
 469                     failed = false;
 470                     return true;
 471                 }
 472                 nanosTimeout = deadline - System.nanoTime();
 473                 if (nanosTimeout <= 0L)
 474                     return false;//超时直接返回获取失败
 475                 if (shouldParkAfterFailedAcquire(p, node) &&
 476                         nanosTimeout > spinForTimeoutThreshold)
 477                     //阻塞指定时长,超时则线程自动被唤醒
 478                     LockSupport.parkNanos(this, nanosTimeout);
 479                 if (Thread.interrupted())//当前线程中断状态
 480                     throw new InterruptedException();
 481             }
 482         } finally {
 483             if (failed)
 484                 cancelAcquire(node);
 485         }
 486     }
 487 
 488     /**
 489      * 尝试获取共享锁
 490      */
 491     private void doAcquireShared(int arg) {
 492         final Node node = addWaiter(Node.SHARED);//入队
 493         boolean failed = true;
 494         try {
 495             boolean interrupted = false;
 496             for (;;) {
 497                 final Node p = node.predecessor();//前驱节点
 498                 if (p == head) {
 499                     int r = tryAcquireShared(arg); //非公平锁实现,再尝试获取锁
 500                     //state==0时tryAcquireShared会返回>=0(CountDownLatch中返回的是1)。
 501                     // state为0说明共享次数已经到了,可以获取锁了
 502                     if (r >= 0) {//r>0表示state==0,前继节点已经释放锁,锁的状态为可被获取
 503                         //这一步设置node为head节点设置node.waitStatus->Node.PROPAGATE,然后唤醒node.thread
 504                         setHeadAndPropagate(node, r);
 505                         p.next = null; // help GC
 506                         if (interrupted)
 507                             selfInterrupt();
 508                         failed = false;
 509                         return;
 510                     }
 511                 }
 512                 //前继节点非head节点,将前继节点状态设置为SIGNAL,通过park挂起node节点的线程
 513                 if (shouldParkAfterFailedAcquire(p, node) &&
 514                         parkAndCheckInterrupt())
 515                     interrupted = true;
 516             }
 517         } finally {
 518             if (failed)
 519                 cancelAcquire(node);
 520         }
 521     }
 522 
 523     /**
 524      * Acquires in shared interruptible mode.
 525      * @param arg the acquire argument
 526      */
 527     private void doAcquireSharedInterruptibly(int arg)
 528             throws InterruptedException {
 529         final Node node = addWaiter(Node.SHARED);
 530         boolean failed = true;
 531         try {
 532             for (;;) {
 533                 final Node p = node.predecessor();
 534                 if (p == head) {
 535                     int r = tryAcquireShared(arg);
 536                     if (r >= 0) {
 537                         setHeadAndPropagate(node, r);
 538                         p.next = null; // help GC
 539                         failed = false;
 540                         return;
 541                     }
 542                 }
 543                 if (shouldParkAfterFailedAcquire(p, node) &&
 544                         parkAndCheckInterrupt())
 545                     throw new InterruptedException();
 546             }
 547         } finally {
 548             if (failed)
 549                 cancelAcquire(node);
 550         }
 551     }
 552 
 553     /**
 554      * Acquires in shared timed mode.
 555      *
 556      * @param arg the acquire argument
 557      * @param nanosTimeout max wait time
 558      * @return {@code true} if acquired
 559      */
 560     private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
 561             throws InterruptedException {
 562         if (nanosTimeout <= 0L)
 563             return false;
 564         final long deadline = System.nanoTime() + nanosTimeout;
 565         final Node node = addWaiter(Node.SHARED);
 566         boolean failed = true;
 567         try {
 568             for (;;) {
 569                 final Node p = node.predecessor();
 570                 if (p == head) {
 571                     int r = tryAcquireShared(arg);
 572                     if (r >= 0) {
 573                         setHeadAndPropagate(node, r);
 574                         p.next = null; // help GC
 575                         failed = false;
 576                         return true;
 577                     }
 578                 }
 579                 nanosTimeout = deadline - System.nanoTime();
 580                 if (nanosTimeout <= 0L)
 581                     return false;
 582                 if (shouldParkAfterFailedAcquire(p, node) &&
 583                         nanosTimeout > spinForTimeoutThreshold)
 584                     LockSupport.parkNanos(this, nanosTimeout);
 585                 if (Thread.interrupted())
 586                     throw new InterruptedException();
 587             }
 588         } finally {
 589             if (failed)
 590                 cancelAcquire(node);
 591         }
 592     }
 593 
 594     // Main exported methods
 595 
 596     /**
 597      * 尝试获取独占锁,可指定锁的获取数量
 598      */
 599     protected boolean tryAcquire(int arg) {
 600         throw new UnsupportedOperationException();
 601     }
 602 
 603     /**
 604      * 尝试释放独占锁,在子类当中实现
 605      */
 606     protected boolean tryRelease(int arg) {
 607         throw new UnsupportedOperationException();
 608     }
 609 
 610     /**
 611      * 共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,
 612      * 其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcquire返回值为boolean,这很容易理解;
 613      * 对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。
 614      * 本方法待被之类覆盖实现具体逻辑
 615      *  1.当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
 616      *
 617      * 2.当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了;
 618 
 619      * 3.当返回值小于0时,表示获取同步状态失败。
 620      */
 621     protected int tryAcquireShared(int arg) {
 622         throw new UnsupportedOperationException();
 623     }
 624 
 625     /**
 626      * 释放共享锁,具体实现在子类当中实现
 627      */
 628     protected boolean tryReleaseShared(int arg) {
 629         throw new UnsupportedOperationException();
 630     }
 631 
 632     /**
 633      * 当前线程是否持有独占锁
 634      */
 635     protected boolean isHeldExclusively() {
 636         throw new UnsupportedOperationException();
 637     }
 638 
 639     /**
 640      * 获取独占锁
 641      */
 642     public final void acquire(int arg) {
 643         //尝试获取锁
 644         if (!tryAcquire(arg) &&
 645                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//独占模式
 646             selfInterrupt();
 647     }
 648 
 649     /**
 650      * 
 651      */
 652     public final void acquireInterruptibly(int arg)
 653             throws InterruptedException {
 654         if (Thread.interrupted())
 655             throw new InterruptedException();
 656         if (!tryAcquire(arg))
 657             doAcquireInterruptibly(arg);
 658     }
 659 
 660     /**
 661      * 获取独占锁,设置最大等待时间
 662      */
 663     public final boolean tryAcquireNanos(int arg, long nanosTimeout)
 664             throws InterruptedException {
 665         if (Thread.interrupted())
 666             throw new InterruptedException();
 667         return tryAcquire(arg) ||
 668                 doAcquireNanos(arg, nanosTimeout);
 669     }
 670 
 671     /**
 672      * 释放独占模式持有的锁
 673      */
 674     public final boolean release(int arg) {
 675         if (tryRelease(arg)) {//释放一次锁
 676             Node h = head;
 677             if (h != null && h.waitStatus != 0)
 678                 unparkSuccessor(h);//唤醒后继结点
 679             return true;
 680         }
 681         return false;
 682     }
 683 
 684     /**
 685      * 请求获取共享锁
 686      */
 687     public final void acquireShared(int arg) {
 688         if (tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
 689             doAcquireShared(arg);
 690     }
 691 
 692 
 693     /**
 694      * Releases in shared mode.  Implemented by unblocking one or more
 695      * threads if {@link #tryReleaseShared} returns true.
 696      *
 697      * @param arg the release argument.  This value is conveyed to
 698      *        {@link #tryReleaseShared} but is otherwise uninterpreted
 699      *        and can represent anything you like.
 700      * @return the value returned from {@link #tryReleaseShared}
 701      */
 702     public final boolean releaseShared(int arg) {
 703         if (tryReleaseShared(arg)) {
 704             doReleaseShared();
 705             return true;
 706         }
 707         return false;
 708     }
 709 
 710     // Queue inspection methods
 711 
 712     public final boolean hasQueuedThreads() {
 713         return head != tail;
 714     }
 715 
 716     public final boolean hasContended() {
 717         return head != null;
 718     }
 719 
 720     public final Thread getFirstQueuedThread() {
 721         // handle only fast path, else relay
 722         return (head == tail) ? null : fullGetFirstQueuedThread();
 723     }
 724 
 725     /**
 726      * Version of getFirstQueuedThread called when fastpath fails
 727      */
 728     private Thread fullGetFirstQueuedThread() {
 729         Node h, s;
 730         Thread st;
 731         if (((h = head) != null && (s = h.next) != null &&
 732                 s.prev == head && (st = s.thread) != null) ||
 733                 ((h = head) != null && (s = h.next) != null &&
 734                         s.prev == head && (st = s.thread) != null))
 735             return st;
 736 
 737         Node t = tail;
 738         Thread firstThread = null;
 739         while (t != null && t != head) {
 740             Thread tt = t.thread;
 741             if (tt != null)
 742                 firstThread = tt;
 743             t = t.prev;
 744         }
 745         return firstThread;
 746     }
 747 
 748     /**
 749      * 判断当前线程是否在队列当中
 750      */
 751     public final boolean isQueued(Thread thread) {
 752         if (thread == null)
 753             throw new NullPointerException();
 754         for (Node p = tail; p != null; p = p.prev)
 755             if (p.thread == thread)
 756                 return true;
 757         return false;
 758     }
 759 
 760     final boolean apparentlyFirstQueuedIsExclusive() {
 761         Node h, s;
 762         return (h = head) != null &&
 763                 (s = h.next)  != null &&
 764                 !s.isShared()         &&
 765                 s.thread != null;
 766     }
 767 
 768     /**
 769      * 判断当前节点是否有前驱节点
 770      */
 771     public final boolean hasQueuedPredecessors() {
 772         Node t = tail; // Read fields in reverse initialization order
 773         Node h = head;
 774         Node s;
 775         return h != t &&
 776                 ((s = h.next) == null || s.thread != Thread.currentThread());
 777     }
 778 
 779 
 780     // Instrumentation and monitoring methods
 781 
 782     /**
 783      * 同步队列长度
 784      */
 785     public final int getQueueLength() {
 786         int n = 0;
 787         for (Node p = tail; p != null; p = p.prev) {
 788             if (p.thread != null)
 789                 ++n;
 790         }
 791         return n;
 792     }
 793 
 794     /**
 795      * 获取队列等待thread集合
 796      */
 797     public final Collection<Thread> getQueuedThreads() {
 798         ArrayList<Thread> list = new ArrayList<Thread>();
 799         for (Node p = tail; p != null; p = p.prev) {
 800             Thread t = p.thread;
 801             if (t != null)
 802                 list.add(t);
 803         }
 804         return list;
 805     }
 806 
 807     /**
 808      * 获取独占模式等待thread线程集合
 809      */
 810     public final Collection<Thread> getExclusiveQueuedThreads() {
 811         ArrayList<Thread> list = new ArrayList<Thread>();
 812         for (Node p = tail; p != null; p = p.prev) {
 813             if (!p.isShared()) {
 814                 Thread t = p.thread;
 815                 if (t != null)
 816                     list.add(t);
 817             }
 818         }
 819         return list;
 820     }
 821 
 822     /**
 823      * 获取共享模式等待thread集合
 824      */
 825     public final Collection<Thread> getSharedQueuedThreads() {
 826         ArrayList<Thread> list = new ArrayList<Thread>();
 827         for (Node p = tail; p != null; p = p.prev) {
 828             if (p.isShared()) {
 829                 Thread t = p.thread;
 830                 if (t != null)
 831                     list.add(t);
 832             }
 833         }
 834         return list;
 835     }
 836 
 837 
 838     // Internal support methods for Conditions
 839 
 840     /**
 841      * 判断节点是否在同步队列中
 842      */
 843     final boolean isOnSyncQueue(Node node) {
 844         //快速判断1:节点状态或者节点没有前置节点
 845         //注:同步队列是有头节点的,而条件队列没有
 846         if (node.waitStatus == Node.CONDITION || node.prev == null)
 847             return false;
 848         //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
 849         if (node.next != null) // If has successor, it must be on queue
 850             return true;
 851         //上面如果无法判断则进入复杂判断
 852         return findNodeFromTail(node);
 853     }
 854 
 855     private boolean findNodeFromTail(Node node) {
 856         Node t = tail;
 857         for (;;) {
 858             if (t == node)
 859                 return true;
 860             if (t == null)
 861                 return false;
 862             t = t.prev;
 863         }
 864     }
 865 
 866     /**
 867      * 将节点从条件队列当中移动到同步队列当中,等待获取锁
 868      */
 869     final boolean transferForSignal(Node node) {
 870         /*
 871          * 修改节点信号量状态为0,失败直接返回false
 872          */
 873         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 874             return false;
 875 
 876         /*
 877          * 加入同步队列尾部当中,返回前驱节点
 878          */
 879         Node p = enq(node);
 880         int ws = p.waitStatus;
 881         //前驱节点不可用 或者 修改信号量状态失败
 882         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 883             LockSupport.unpark(node.thread); //唤醒当前节点
 884         return true;
 885     }
 886 
 887     final boolean transferAfterCancelledWait(Node node) {
 888         if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
 889             enq(node);
 890             return true;
 891         }
 892         /*
 893          * If we lost out to a signal(), then we can‘t proceed
 894          * until it finishes its enq().  Cancelling during an
 895          * incomplete transfer is both rare and transient, so just
 896          * spin.
 897          */
 898         while (!isOnSyncQueue(node))
 899             Thread.yield();
 900         return false;
 901     }
 902 
 903     /**
 904      * 入参就是新创建的节点,即当前节点
 905      */
 906     final int fullyRelease(Node node) {
 907         boolean failed = true;
 908         try {
 909             //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
 910             //可以考虑下这个逻辑放在共享锁下面会发生什么?
 911             int savedState = getState();
 912             if (release(savedState)) {
 913                 failed = false;
 914                 return savedState;
 915             } else {
 916                 //如果这里释放失败,则抛出异常
 917                 throw new IllegalMonitorStateException();
 918             }
 919         } finally {
 920             /**
 921              * 如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中
 922              * 只需要判断最后一个节点是否被取消就可以了
 923              */
 924             if (failed)
 925                 node.waitStatus = Node.CANCELLED;
 926         }
 927     }
 928 
 929     // Instrumentation methods for conditions
 930 
 931     public final boolean hasWaiters(ConditionObject condition) {
 932         if (!owns(condition))
 933             throw new IllegalArgumentException("Not owner");
 934         return condition.hasWaiters();
 935     }
 936 
 937     /**
 938      * 获取条件队列长度
 939      */
 940     public final int getWaitQueueLength(ConditionObject condition) {
 941         if (!owns(condition))
 942             throw new IllegalArgumentException("Not owner");
 943         return condition.getWaitQueueLength();
 944     }
 945 
 946     /**
 947      * 获取条件队列当中所有等待的thread集合
 948      */
 949     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
 950         if (!owns(condition))
 951             throw new IllegalArgumentException("Not owner");
 952         return condition.getWaitingThreads();
 953     }
 954 
 955     /**
 956      * 条件对象,实现基于条件的具体行为
 957      */
 958     public class ConditionObject implements Condition, java.io.Serializable {
 959         private static final long serialVersionUID = 1173984872572414699L;
 960         /** First node of condition queue. */
 961         private transient Node firstWaiter;
 962         /** Last node of condition queue. */
 963         private transient Node lastWaiter;
 964 
 965         /**
 966          * Creates a new {@code ConditionObject} instance.
 967          */
 968         public ConditionObject() { }
 969 
 970         // Internal methods
 971 
 972         /**
 973          * 1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
 974          * 2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
 975          */
 976         private Node addConditionWaiter() {
 977             Node t = lastWaiter;
 978             //如果最后一个节点被取消,则删除队列中被取消的节点
 979             //至于为啥是最后一个节点后面会分析
 980             if (t != null && t.waitStatus != Node.CONDITION) {
 981                 //删除所有被取消的节点
 982                 unlinkCancelledWaiters();
 983                 t = lastWaiter;
 984             }
 985             //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
 986             Node node = new Node(Thread.currentThread(), Node.CONDITION);
 987             if (t == null)
 988                 firstWaiter = node;
 989             else
 990                 t.nextWaiter = node;
 991             lastWaiter = node;
 992             return node;
 993         }
 994 
 995         /**
 996          * 发信号,通知遍历条件队列当中的节点转移到同步队列当中,准备排队获取锁
 997          */
 998         private void doSignal(Node first) {
 999             do {
1000                 if ( (firstWaiter = first.nextWaiter) == null)
1001                     lastWaiter = null;
1002                 first.nextWaiter = null;
1003             } while (!transferForSignal(first) && //转移节点
1004                     (first = firstWaiter) != null);
1005         }
1006 
1007         /**
1008          * 通知所有节点移动到同步队列当中,并将节点从条件队列删除
1009          */
1010         private void doSignalAll(Node first) {
1011             lastWaiter = firstWaiter = null;
1012             do {
1013                 Node next = first.nextWaiter;
1014                 first.nextWaiter = null;
1015                 transferForSignal(first);
1016                 first = next;
1017             } while (first != null);
1018         }
1019 
1020         /**
1021          * 删除条件队列当中被取消的节点
1022          */
1023         private void unlinkCancelledWaiters() {
1024             Node t = firstWaiter;
1025             Node trail = null;
1026             while (t != null) {
1027                 Node next = t.nextWaiter;
1028                 if (t.waitStatus != Node.CONDITION) {
1029                     t.nextWaiter = null;
1030                     if (trail == null)
1031                         firstWaiter = next;
1032                     else
1033                         trail.nextWaiter = next;
1034                     if (next == null)
1035                         lastWaiter = trail;
1036                 }
1037                 else
1038                     trail = t;
1039                 t = next;
1040             }
1041         }
1042 
1043         // public methods
1044 
1045         /**
1046          * 发新号,通知条件队列当中节点到同步队列当中去排队
1047          */
1048         public final void signal() {
1049             if (!isHeldExclusively())//节点不能已经持有独占锁
1050                 throw new IllegalMonitorStateException();
1051             Node first = firstWaiter;
1052             if (first != null)
1053                 /**
1054                  * 发信号通知条件队列的节点准备到同步队列当中去排队
1055                  */
1056                 doSignal(first);
1057         }
1058 
1059         /**
1060          * 唤醒所有条件队列的节点转移到同步队列当中
1061          */
1062             public final void signalAll() {
1063             if (!isHeldExclusively())
1064                 throw new IllegalMonitorStateException();
1065             Node first = firstWaiter;
1066             if (first != null)
1067                 doSignalAll(first);
1068         }
1069 
1070         /**
1071          * Implements uninterruptible condition wait.
1072          * <ol>
1073          * <li> Save lock state returned by {@link #getState}.
1074          * <li> Invoke {@link #release} with saved state as argument,
1075          *      throwing IllegalMonitorStateException if it fails.
1076          * <li> Block until signalled.
1077          * <li> Reacquire by invoking specialized version of
1078          *      {@link #acquire} with saved state as argument.
1079          * </ol>
1080          */
1081         public final void awaitUninterruptibly() {
1082             Node node = addConditionWaiter();
1083             int savedState = fullyRelease(node);
1084             boolean interrupted = false;
1085             while (!isOnSyncQueue(node)) {
1086                 LockSupport.park(this);
1087                 if (Thread.interrupted())
1088                     interrupted = true;
1089             }
1090             if (acquireQueued(node, savedState) || interrupted)
1091                 selfInterrupt();
1092         }
1093 
1094         /** 该模式表示在退出等待时重新中断 */
1095         private static final int REINTERRUPT =  1;
1096         /** 异常中断 */
1097         private static final int THROW_IE    = -1;
1098 
1099         /**
1100          * 这里的判断逻辑是:
1101          * 1.如果现在不是中断的,即正常被signal唤醒则返回0
1102          * 2.如果节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT
1103          */
1104         private int checkInterruptWhileWaiting(Node node) {
1105             return Thread.interrupted() ?
1106                     (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
1107                     0;
1108         }
1109 
1110         /**
1111          * 根据中断时机选择抛出异常或者设置线程中断状态
1112          */
1113         private void reportInterruptAfterWait(int interruptMode)
1114                 throws InterruptedException {
1115             if (interruptMode == THROW_IE)
1116                 throw new InterruptedException();
1117             else if (interruptMode == REINTERRUPT)
1118                 selfInterrupt();
1119         }
1120 
1121         /**
1122          * 加入条件队列等待,条件队列入口
1123          */
1124         public final void await() throws InterruptedException {
1125 
1126             //T2进来
1127             //如果当前线程被中断则直接抛出异常
1128             if (Thread.interrupted())
1129                 throw new InterruptedException();
1130             //把当前节点加入条件队列
1131             Node node = addConditionWaiter();
1132             //释放掉已经获取的独占锁资源
1133             int savedState = fullyRelease(node);//T2释放锁
1134             int interruptMode = 0;
1135             //如果不在同步队列中则不断挂起
1136             while (!isOnSyncQueue(node)) {
1137                 LockSupport.park(this);//T1被阻塞
1138                 //这里被唤醒可能是正常的signal操作也可能是中断
1139                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1140                     break;
1141             }
1142             /**
1143              * 走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
1144              * 这个方法很熟悉吧?就跟独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
1145              * 在处理中断之前首先要做的是从同步队列中成功获取锁资源
1146              */
1147             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1148                 interruptMode = REINTERRUPT;
1149             //走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
1150             //删除条件队列中被取消的节点
1151             if (node.nextWaiter != null) // clean up if cancelled
1152                 unlinkCancelledWaiters();
1153             //根据不同模式处理中断
1154             if (interruptMode != 0)
1155                 reportInterruptAfterWait(interruptMode);
1156         }
1157 
1158 
1159         /**
1160          * Implements timed condition wait.
1161          * <ol>
1162          * <li> If current thread is interrupted, throw InterruptedException.
1163          * <li> Save lock state returned by {@link #getState}.
1164          * <li> Invoke {@link #release} with saved state as argument,
1165          *      throwing IllegalMonitorStateException if it fails.
1166          * <li> Block until signalled, interrupted, or timed out.
1167          * <li> Reacquire by invoking specialized version of
1168          *      {@link #acquire} with saved state as argument.
1169          * <li> If interrupted while blocked in step 4, throw InterruptedException.
1170          * <li> If timed out while blocked in step 4, return false, else true.
1171          * </ol>
1172          */
1173         public final boolean await(long time, TimeUnit unit)
1174                 throws InterruptedException {
1175             long nanosTimeout = unit.toNanos(time);
1176             if (Thread.interrupted())
1177                 throw new InterruptedException();
1178             Node node = addConditionWaiter();
1179             int savedState = fullyRelease(node);
1180             final long deadline = System.nanoTime() + nanosTimeout;
1181             boolean timedout = false;
1182             int interruptMode = 0;
1183             while (!isOnSyncQueue(node)) {
1184                 if (nanosTimeout <= 0L) {
1185                     timedout = transferAfterCancelledWait(node);
1186                     break;
1187                 }
1188                 if (nanosTimeout >= spinForTimeoutThreshold)
1189                     LockSupport.parkNanos(this, nanosTimeout);
1190                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1191                     break;
1192                 nanosTimeout = deadline - System.nanoTime();
1193             }
1194             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1195                 interruptMode = REINTERRUPT;
1196             if (node.nextWaiter != null)
1197                 unlinkCancelledWaiters();
1198             if (interruptMode != 0)
1199                 reportInterruptAfterWait(interruptMode);
1200             return !timedout;
1201         }
1202 
1203 
1204         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
1205             return sync == AbstractQueuedSynchronizer.this;
1206         }
1207 
1208         /**
1209          * Queries whether any threads are waiting on this condition.
1210          * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
1211          *
1212          * @return {@code true} if there are any waiting threads
1213          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1214          *         returns {@code false}
1215          */
1216         protected final boolean hasWaiters() {
1217             if (!isHeldExclusively())
1218                 throw new IllegalMonitorStateException();
1219             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1220                 if (w.waitStatus == Node.CONDITION)
1221                     return true;
1222             }
1223             return false;
1224         }
1225 
1226         /**
1227          * Returns an estimate of the number of threads waiting on
1228          * this condition.
1229          * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
1230          *
1231          * @return the estimated number of waiting threads
1232          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1233          *         returns {@code false}
1234          */
1235         protected final int getWaitQueueLength() {
1236             if (!isHeldExclusively())
1237                 throw new IllegalMonitorStateException();
1238             int n = 0;
1239             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1240                 if (w.waitStatus == Node.CONDITION)
1241                     ++n;
1242             }
1243             return n;
1244         }
1245 
1246         /**
1247          * 得到同步队列当中所有在等待的Thread集合
1248          */
1249         protected final Collection<Thread> getWaitingThreads() {
1250             if (!isHeldExclusively())
1251                 throw new IllegalMonitorStateException();
1252             ArrayList<Thread> list = new ArrayList<Thread>();
1253             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1254                 if (w.waitStatus == Node.CONDITION) {
1255                     Thread t = w.thread;
1256                     if (t != null)
1257                         list.add(t);
1258                 }
1259             }
1260             return list;
1261         }
1262     }
1263 
1264     /**
1265      * Setup to support compareAndSet. We need to natively implement
1266      * this here: For the sake of permitting future enhancements, we
1267      * cannot explicitly subclass AtomicInteger, which would be
1268      * efficient and useful otherwise. So, as the lesser of evils, we
1269      * natively implement using hotspot intrinsics API. And while we
1270      * are at it, we do the same for other CASable fields (which could
1271      * otherwise be done with atomic field updaters).
1272      * unsafe魔法类,直接绕过虚拟机内存管理机制,修改内存
1273      */
1274     private static final Unsafe unsafe = Unsafe.getUnsafe();
1275     //偏移量
1276     private static final long stateOffset;
1277     private static final long headOffset;
1278     private static final long tailOffset;
1279     private static final long waitStatusOffset;
1280     private static final long nextOffset;
1281 
1282     static {
1283         try {
1284             //状态偏移量
1285             stateOffset = unsafe.objectFieldOffset
1286                     (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
1287             //head指针偏移量,head指向CLH队列的头部
1288             headOffset = unsafe.objectFieldOffset
1289                     (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
1290             tailOffset = unsafe.objectFieldOffset
1291                     (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
1292             waitStatusOffset = unsafe.objectFieldOffset
1293                     (Node.class.getDeclaredField("waitStatus"));
1294             nextOffset = unsafe.objectFieldOffset
1295                     (Node.class.getDeclaredField("next"));
1296 
1297         } catch (Exception ex) { throw new Error(ex); }
1298     }
1299 
1300     /**
1301      * CAS 修改头部节点指向. 并发入队时使用.
1302      */
1303     private final boolean compareAndSetHead(Node update) {
1304         return unsafe.compareAndSwapObject(this, headOffset, null, update);
1305     }
1306 
1307     /**
1308      * CAS 修改尾部节点指向. 并发入队时使用.
1309      */
1310     private final boolean compareAndSetTail(Node expect, Node update) {
1311         return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
1312     }
1313 
1314     /**
1315      * CAS 修改信号量状态.
1316      */
1317     private static final boolean compareAndSetWaitStatus(Node node,
1318                                                          int expect,
1319                                                          int update) {
1320         return unsafe.compareAndSwapInt(node, waitStatusOffset,
1321                 expect, update);
1322     }
1323 
1324     /**
1325      * 修改节点的后继指针.
1326      */
1327     private static final boolean compareAndSetNext(Node node,
1328                                                    Node expect,
1329                                                    Node update) {
1330         return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
1331     }
1332 }
1333 
1334 
1335 AQS框架具体实现-独占锁实现ReentrantLock
1336 
1337 public class ReentrantLock implements Lock, java.io.Serializable {
1338     private static final long serialVersionUID = 7373984872572414699L;
1339     /**
1340      * 内部调用AQS的动作,都基于该成员属性实现
1341      */
1342     private final Sync sync;
1343 
1344     /**
1345      * ReentrantLock锁同步操作的基础类,继承自AQS框架.
1346      * 该类有两个继承类,1、NonfairSync 非公平锁,2、FairSync公平锁
1347      */
1348         abstract static class Sync extends AbstractQueuedSynchronizer {
1349         private static final long serialVersionUID = -5179523762034025860L;
1350 
1351         /**
1352          * 加锁的具体行为由子类实现
1353          */
1354         abstract void lock();
1355 
1356         /**
1357          * 尝试获取非公平锁
1358          */
1359         final boolean nonfairTryAcquire(int acquires) {
1360             //acquires = 1
1361             final Thread current = Thread.currentThread();
1362             int c = getState();
1363             /**
1364              * 不需要判断同步队列(CLH)中是否有排队等待线程
1365              * 判断state状态是否为0,不为0可以加锁
1366              */
1367             if (c == 0) {
1368                 //unsafe操作,cas修改state状态
1369                 if (compareAndSetState(0, acquires)) {
1370                     //独占状态锁持有者指向当前线程
1371                     setExclusiveOwnerThread(current);
1372                     return true;
1373                 }
1374             }
1375             /**
1376              * state状态不为0,判断锁持有者是否是当前线程,
1377              * 如果是当前线程持有 则state+1
1378              */
1379             else if (current == getExclusiveOwnerThread()) {
1380                 int nextc = c + acquires;
1381                 if (nextc < 0) // overflow
1382                     throw new Error("Maximum lock count exceeded");
1383                 setState(nextc);
1384                 return true;
1385             }
1386             //加锁失败
1387             return false;
1388         }
1389 
1390         /**
1391          * 释放锁
1392          */
1393         protected final boolean tryRelease(int releases) {
1394             int c = getState() - releases;
1395             if (Thread.currentThread() != getExclusiveOwnerThread())
1396                 throw new IllegalMonitorStateException();
1397             boolean free = false;
1398             if (c == 0) {
1399                 free = true;
1400                 setExclusiveOwnerThread(null);
1401             }
1402             setState(c);
1403             return free;
1404         }
1405 
1406         /**
1407          * 判断持有独占锁的线程是否是当前线程
1408          */
1409         protected final boolean isHeldExclusively() {
1410             return getExclusiveOwnerThread() == Thread.currentThread();
1411         }
1412 
1413         //返回条件对象
1414         final ConditionObject newCondition() {
1415             return new ConditionObject();
1416         }
1417 
1418 
1419         final Thread getOwner() {
1420             return getState() == 0 ? null : getExclusiveOwnerThread();
1421         }
1422 
1423         final int getHoldCount() {
1424             return isHeldExclusively() ? getState() : 0;
1425         }
1426 
1427         final boolean isLocked() {
1428             return getState() != 0;
1429         }
1430 
1431         /**
1432          * Reconstitutes the instance from a stream (that is, deserializes it).
1433          */
1434         private void readObject(java.io.ObjectInputStream s)
1435                 throws java.io.IOException, ClassNotFoundException {
1436             s.defaultReadObject();
1437             setState(0); // reset to unlocked state
1438         }
1439     }
1440 
1441     /**
1442      * 非公平锁
1443      */
1444     static final class NonfairSync extends Sync {
1445         private static final long serialVersionUID = 7316153563782823691L;
1446         /**
1447          * 加锁行为
1448          */
1449         final void lock() {
1450             /**
1451              * 第一步:直接尝试加锁
1452              * 与公平锁实现的加锁行为一个最大的区别在于,此处不会去判断同步队列(CLH队列)中
1453              * 是否有排队等待加锁的节点,上来直接加锁(判断state是否为0,CAS修改state为1)
1454              * ,并将独占锁持有者 exclusiveOwnerThread 属性指向当前线程
1455              * 如果当前有人占用锁,再尝试去加一次锁
1456              */
1457             if (compareAndSetState(0, 1))
1458                 setExclusiveOwnerThread(Thread.currentThread());
1459             else
1460                 //AQS定义的方法,加锁
1461                 acquire(1);
1462         }
1463 
1464         /**
1465          * 父类AbstractQueuedSynchronizer.acquire()中调用本方法
1466          */
1467         protected final boolean tryAcquire(int acquires) {
1468             return nonfairTryAcquire(acquires);
1469         }
1470     }
1471 
1472     /**
1473      * 公平锁
1474      */
1475     static final class FairSync extends Sync {
1476         private static final long serialVersionUID = -3000897897090466540L;
1477         final void lock() {
1478             acquire(1);
1479         }
1480         /**
1481          * 重写aqs中的方法逻辑
1482          * 尝试加锁,被AQS的acquire()方法调用
1483          */
1484         protected final boolean tryAcquire(int acquires) {
1485             final Thread current = Thread.currentThread();
1486             int c = getState();
1487             if (c == 0) {
1488                 /**
1489                  * 与非公平锁中的区别,需要先判断队列当中是否有等待的节点
1490                  * 如果没有则可以尝试CAS获取锁
1491                  */
1492                 if (!hasQueuedPredecessors() &&
1493                         compareAndSetState(0, acquires)) {
1494                     //独占线程指向当前线程
1495                     setExclusiveOwnerThread(current);
1496                     return true;
1497                 }
1498             }
1499             else if (current == getExclusiveOwnerThread()) {
1500                 int nextc = c + acquires;
1501                 if (nextc < 0)
1502                     throw new Error("Maximum lock count exceeded");
1503                 setState(nextc);
1504                 return true;
1505             }
1506             return false;
1507         }
1508     }
1509 
1510     /**
1511      * 默认构造函数,创建非公平锁对象
1512      */
1513     public ReentrantLock() {
1514         sync = new NonfairSync();
1515     }
1516 
1517     /**
1518      * 根据要求创建公平锁或非公平锁
1519      */
1520     public ReentrantLock(boolean fair) {
1521         sync = fair ? new FairSync() : new NonfairSync();
1522     }
1523 
1524     /**
1525      * 加锁
1526      */
1527     public void lock() {
1528         sync.lock();
1529     }
1530 
1531     /**
1532      * 尝试获去取锁,获取失败被阻塞,线程被中断直接抛出异常
1533      */
1534     public void lockInterruptibly() throws InterruptedException {
1535         sync.acquireInterruptibly(1);
1536     }
1537 
1538     /**
1539      * 尝试加锁
1540      */
1541     public boolean tryLock() {
1542         return sync.nonfairTryAcquire(1);
1543     }
1544 
1545     /**
1546      * 指定等待时间内尝试加锁
1547      */
1548     public boolean tryLock(long timeout, TimeUnit unit)
1549             throws InterruptedException {
1550         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
1551     }
1552 
1553     /**
1554      * 尝试去释放锁
1555      */
1556     public void unlock() {
1557         sync.release(1);
1558     }
1559 
1560     /**
1561      * 返回条件对象
1562      */
1563     public Condition newCondition() {
1564         return sync.newCondition();
1565     }
1566 
1567     /**
1568      * 返回当前线程持有的state状态数量
1569      */
1570     public int getHoldCount() {
1571         return sync.getHoldCount();
1572     }
1573 
1574     /**
1575      * 查询当前线程是否持有锁
1576      */
1577     public boolean isHeldByCurrentThread() {
1578         return sync.isHeldExclusively();
1579     }
1580 
1581     /**
1582      * 状态表示是否被Thread加锁持有
1583      */
1584     public boolean isLocked() {
1585         return sync.isLocked();
1586     }
1587 
1588     /**
1589      * 是否公平锁?是返回true 否则返回 false
1590      */
1591     public final boolean isFair() {
1592         return sync instanceof FairSync;
1593     }
1594 
1595     /**
1596      * 获取持有锁的当前线程
1597      */
1598     protected Thread getOwner() {
1599         return sync.getOwner();
1600     }
1601 
1602     /**
1603      * 判断队列当中是否有在等待获取锁的Thread节点
1604      */
1605     public final boolean hasQueuedThreads() {
1606         return sync.hasQueuedThreads();
1607     }
1608 
1609     /**
1610      * 当前线程是否在同步队列中等待
1611      */
1612     public final boolean hasQueuedThread(Thread thread) {
1613         return sync.isQueued(thread);
1614     }
1615 
1616     /**
1617      * 获取同步队列长度
1618      */
1619     public final int getQueueLength() {
1620         return sync.getQueueLength();
1621     }
1622 
1623     /**
1624      * 返回Thread集合,排队中的所有节点Thread会被返回
1625      */
1626     protected Collection<Thread> getQueuedThreads() {
1627         return sync.getQueuedThreads();
1628     }
1629 
1630     /**
1631      * 条件队列当中是否有正在等待的节点
1632      */
1633     public boolean hasWaiters(Condition condition) {
1634         if (condition == null)
1635             throw new NullPointerException();
1636         if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
1637             throw new IllegalArgumentException("not owner");
1638         return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
1639     }
1640 
1641 }

并发编程专题五:抽象队列同步器AQS应用Lock

标签:管理机   iso   应该   方法   sof   countdown   构造函数   offset   不同   

原文地址:https://www.cnblogs.com/Mapi/p/14397300.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!