标签:queue try 就是 next path mutex support his and
一,AQS原理
lock最常用的类就是ReentrantLock,其底层实现使用的是AbstractQueuedSynchronizer(AQS)
简单来说AQS会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态,经过调查线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。
与synchronized相同的是,CLH也是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。原生的CLH队列是用于自旋锁,但Doug Lea把其改造为阻塞锁。 当有线程竞争锁时,该线程会首先尝试获得锁,这对于那些已经在队列中排队的线程来说显得不公平,这也是非公平锁的由来,与synchronized实现类似,这样会极大提高吞吐量。 如果已经存在Running线程,则新的竞争线程会被追加到队尾,具体是采用基于CAS的Lock-Free算法,因为线程并发对Tail调用CAS可能会导致其他线程CAS失败,解决办法是循环CAS直至成功。
二,加锁过程
1 /** 2 * 尝试获取独占锁,获取成功则返回 3 * 获取失败则继续自旋获取锁,并且判断中断标识,如果中断标识为true,则设置线程中断 4 * addWaiter方法把当前线程封装成Node,并添加到队列的尾部 5 */ 6 public final void acquire(int arg) { 7 if (!tryAcquire(arg) && 8 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 9 selfInterrupt(); 10 }
1,nonfairTryAcquire方法将是lock方法间接调用的第一个方法,每次请求锁时都会首先调用该方法。
1 final boolean nonfairTryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c == 0) { 5 if (compareAndSetState(0, acquires)) { 6 setExclusiveOwnerThread(current); 7 return true; 8 } 9 } 10 else if (current == getExclusiveOwnerThread()) { 11 int nextc = c + acquires; 12 if (nextc < 0) // overflow 13 throw new Error("Maximum lock count exceeded"); 14 setState(nextc); 15 return true; 16 } 17 return false; 18 }
该方法会首先判断当前状态,如果c==0说明没有线程正在竞争该锁,通过CAS设置该状态值为acquires,acquires的初始调用值为1,每次线程重入该锁都会+1,每次unlock都会-1,为0时释放锁。如果CAS设置成功,其他任何线程调用CAS都不会再成功,当前线程得到了该锁,该Running线程并未进入等待队列。
如果c !=0 但发现自己已经拥有锁,只是简单地++acquires,并修改status值,但因为没有竞争,所以通过setStatus修改,而非CAS,也就是说这段代码实现了偏向锁的功能,并且实现的较为简单。
2,addWaiter方法负责把当前无法获得锁的线程包装为一个Node添加到队尾:
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 // Try the fast path of enq; backup to full enq on failure 4 Node pred = tail; 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { 8 pred.next = node; 9 return node; 10 } 11 } 12 enq(node); 13 return node; 14 }
追加到队尾的动作分两步:
如果当前队尾已经存在(tail!=null),则使用CAS把当前线程更新为tail
如果当前Tail为null或则线程调用CAS设置队尾失败,则通过enq方法继续设置tail :
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 if (t == null) { // Must initialize 5 Node h = new Node(); // Dummy header 6 h.next = node; 7 node.prev = h; 8 if (compareAndSetHead(h)) { 9 tail = node; 10 return h; 11 } 12 } 13 else { 14 node.prev = t; 15 if (compareAndSetTail(t, node)) { 16 t.next = node; 17 return t; 18 } 19 } 20 } 21 }
该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是通过CAS把当前线程追加到队尾,并返回包装后的Node实例。
3,acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞.
但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回
1 final boolean acquireQueued(final Node node, int arg) { 2 try { 3 boolean interrupted = false; 4 for (;;) { 5 final Node p = node.predecessor(); 6 if (p == head && tryAcquire(arg)) { 7 setHead(node); 8 p.next = null; // help GC 9 return interrupted; 10 } 11 if (shouldParkAfterFailedAcquire(p, node) && 12 parkAndCheckInterrupt()) 13 interrupted = true; 14 } 15 } catch (RuntimeException ex) { 16 cancelAcquire(node); 17 throw ex; 18 } 19 }
仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第12行的parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this); 3 return Thread.interrupted(); 4 }
4,如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中:
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus; 3 if (ws == Node.SIGNAL) 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park 7 */ 8 return true; 9 if (ws > 0) { 10 /* 11 * Predecessor was cancelled. Skip over predecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = pred = pred.prev; 16 } while (pred.waitStatus > 0); 17 pred.next = node; 18 } else { 19 /* 20 * waitStatus must be 0 or PROPAGATE. Indicate that we 21 * need a signal, but don‘t park yet. Caller will need to 22 * retry to make sure it cannot acquire before parking. 23 */ 24 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 25 } 26 return false; 27 }
检查原则在于:
规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞
规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同
总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。
三,最后概括下加锁流程,如图所示:
标签:queue try 就是 next path mutex support his and
原文地址:https://www.cnblogs.com/superchong/p/11254262.html