标签:
ReentrantLock/CountDownLatch/Semaphore/FutureTask/ThreadPoolExecutor的源码中都会包含一个静态的内部类Sync,它继承了AbstractQueuedSynchronizer这个抽象类。
AbstractQueuedSynchronizer是java.util.concurrent包中的核心组件之一,为并发包中的其他synchronizers提供了一组公共的基础设施。
AQS会对进行acquire而被阻塞的线程进行管理,其管理方式是在AQS内部维护了一个FIFO的双向链表队列,队列的头部是一个空的结点,除此之外,每个结点持有着一个线程,结点中包含两个重要的属性waiteStatus和nextWaiter。结点的数据结构如下: Node中的属性waitStatus、prev、next、thread都使用了volatile修饰,这样直接的读写操作就具有内存可见性。 waitStatus表示了当前结点Node的状态
- static final class Node {
-
- static final int CANCELLED = 1;
-
- static final int SIGNAL = -1;
-
- static final int CONDITION = -2;
-
- static final Node SHARED = new Node();
-
- static final Node EXCLUSIVE = null;
-
- volatile int waitStatus;
-
- volatile Node prev;
-
- volatile Node next;
-
- volatile Thread thread;
-
- Node nextWaiter;
-
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
-
- final Node predecessor() throws NullPointerException {
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
- }
-
- Node() {
- }
-
- Node(Thread thread, Node mode) {
- this.nextWaiter = mode;
- this.thread = thread;
- }
-
- Node(Thread thread, int waitStatus) {
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
acquire操作
获取同步器
- if(尝试获取成功){
- return ;
- }else{
- 加入队列;park自己
- }
释放同步器
- if(尝试释放成功){
- unpark等待队列中的第一个结点
- }else{
- return false;
- }
添加结点到等待队列
首先构建一个准备入队列的结点,如果当前队列不为空,则将mode的前驱指向tail(只是指定当前结点的前驱结点,这样下面的操作一即使失败了 也不会影响整个队列的现有连接关系),compareAndSetTail成功将mode设置为tail结点,则将原先的tail结点的后继节点指向mode。如果队列为空亦或者compareAndSetTail操作失败,没关系我们还有enq(node)为我们把关。
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
-
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
-
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) {
- Node h = new Node();
- h.next = node;
- node.prev = h;
- if (compareAndSetHead(h)) {
- tail = node;
- return h;
- }
- }
- else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
acquire 取消结点
取消结点操作:首先会判断结点是否为null,若不为空,while循环查找距离当前结点最近的非取消前驱结点PN(方便GC处理取消的结点),然后取出这个前驱的后继结点指向,利用它来感知其他的取消或信号操作(例如 compareAndSetNext(pred, predNext, null)) 然后将当前结点的状态Status设置为CANCELLED
-
当前结点如果是尾结点,就删除当前结点,将找到的非取消前驱结点PN设置为tail,并原子地将其后继指向为null
-
当前结点存在后继结点SN,如果前驱结点需要signal,则将PN的后继指向SN;否则将通过unparkSuccessor(node);唤醒后继结点
- private void cancelAcquire(Node node) {
-
- if (node == null)
- return;
- node.thread = null;
-
- Node pred = node.prev;
- while (pred.waitStatus > 0)
- node.prev = pred = pred.prev;
-
- Node predNext = pred.next;
-
- node.waitStatus = Node.CANCELLED;
-
- if (node == tail && compareAndSetTail(node, pred)) {
- compareAndSetNext(pred, predNext, null);
- } else {
-
- if (pred != head
- && (pred.waitStatus == Node.SIGNAL
- || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
- && pred.thread != null) {
-
- Node next = node.next;
- if (next != null && next.waitStatus <= 0)
- compareAndSetNext(pred, predNext, next);
- } else {
-
- unparkSuccessor(node);
- }
- node.next = node;
- }
- }}
唤醒后继结点 unparkSuccessor
唤醒后继结点操作:首先会尝试清除当前结点的预期信号,这里即使操作失败亦或是信号已经被其他等待线程改变 都不影响
然后查找当前线程最近的一个非取消结点 并唤醒它
- private void unparkSuccessor(Node node) {
-
- compareAndSetWaitStatus(node, Node.SIGNAL, 0);
-
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
回过头来总结一下:
当我们调用acquire(int)时,会首先通过tryAcquire尝试获取锁,一般都是留给子类实现(例如ReetrantLock$FairSync中的实现)
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (isFirst(current) &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
如果tryAcquire(int)返回为false,则说明没有获得到锁。 则!tryAcquire(int)为true,接着会继续调用acquireQueued(final Node node ,int arg)方法,当然这调用这个方法之前,我们需要将当前包装成Node加入到队列中(即调用addWaiter(Node mode))。
在acquireQueued()方法体中,我们会发现一个死循环,唯一跳出死循环的途径是 直到找到一个(条件1)node的前驱是傀儡head结点并且子类的tryAcquire()返回true,那么就将当前结点设置为head结点并返回结点对于线程的中断状态。如果(条件1)不成立,则执行shouldParkAfterFailuredAcquire()
在shouldParkAfterFailuredAcquire(Node pred,Node node)方法体中,
首先会判断node结点的前驱结点pred的waitStatus的值:
* 如果waitStatus>0,表明pred处于取消状态(CANCELLED)则从队列中移除pred。
* 如果waitStatus<0,表明线程需要park住
* 如果waitStatus=0,表明这是一个新建结点,需要设置成SIGNAL(-1),在下一次循环中如果不能获得锁就需要park住线程,parkAndCheckInterrupt()就是执行了park()方法来park线程并返回线程中断状态。
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
如果中间抛出RuntimeException异常,则会调用cancelAcquire(Node)方法取消获取。取消其实也很简单,首先判断node是否为空,如果不为空,找到node最近的非取消前驱结点PN,并将node的status设置为CANCELLED;
* 倘若node为tail,将node移除并将PN结点设置为tail PN的后继指向null
* 倘若node存在后继结点SN,如果前驱结点PN需要signal,则将PN后继指向SN 否则调用unparkSuccessor(Node)唤醒后继SN
AcquireShared共享锁
- public final void acquireShared(int arg) {
- if (tryAcquireShared(arg) < 0)
- doAcquireShared(arg);
- }
- private void doAcquireShared(int arg) {
- final Node node = addWaiter(Node.SHARED);
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null;
- if (interrupted)
- selfInterrupt();
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
- }
- }
- private void setHeadAndPropagate(Node node, int propagate) {
- setHead(node);
- if (propagate > 0 && node.waitStatus != 0) {
-
- Node s = node.next;
- if (s == null || s.isShared())
- unparkSuccessor(node);
- }
- }
条件Condition
Condition是服务单个Lock,condition.await()等方法在Lock上形成一个condition等待队列
condition.signal()方法在Lock上面处理condition等待队列然后将队列中的node加入到AQS的阻塞队列中等待对应的线程被unpark
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Node node = addConditionWaiter();
-
-
-
-
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
-
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
网上找到的一个帮助理解Condition的gif图
这个AQS存在两中链表
* 一种链表是AQS sync链表队列,可称为 横向链表
* 一种链表是Condition的wait Node链表,相对于AQS sync是结点的一个纵向链表
当纵向链表被signal通知后 会进入对应的Sync进行排队处理
- public final void signal() {
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- Node first = firstWaiter;
- if (first != null)
- doSignal(first);
- }
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null;
- first.nextWaiter = null;
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
- final boolean transferForSignal(Node node) {
-
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- return false;
-
- Node p = enq(node);
- int c = p.waitStatus;
-
-
-
- if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
转眼之间,2014已经与我渐行渐远 2015要开启源码研究之旅、fighting
Java Concurrent之 AbstractQueuedSynchronizer
标签:
原文地址:http://www.cnblogs.com/SoniceryD/p/4206989.html