标签:lin declared nal 子类 nsa failed public bst this
概述
从使用者的角度,AQS的功能可分为两类:独占功能和共享功能。它的子类中,要么实现并使用了它独占功能的API,要么使用了共享锁的功能,而不会同时使用两套API,即使是它的子类ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的。
// AQS主要的属性,接口
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 同步状态值 private volatile int state;
// 队列头结点,一般获取锁的线程在头结点 private transient volatile Node head;
// 队尾节点 private transient volatile Node tail;
// 队列缓存对象 static final class Node {}
// 获取独占资源,模板方法 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
// 子类实现 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
// 释放独占资源,模板方法 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
// 释放资源,子类实现 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
// 获取共享资源,模板方法 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
// 获取共享资源,子类实现 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
// 释放共享资源,模板方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
// 释放共享资源,子类实现 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
// 是否独占?子类实现 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } }
AQS是一个典型的模板方法模式:AQS定义模板方法,子类实现具体的方法。
模板方法模式:定义一个操作中算法的框架,而将一些步骤延迟到子类中。模板方法模式使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
模板方法模式是一种基于继承的代码复用技术,它是一种类行为型模式。
类图
本文从AQS两个典型的应用类ReeantrantLock和CountDownLatch分别介绍独占和共享两个模式。
CAS操作
JDK中很多的线程安全操作依赖于CAS + 循环的无锁模式,AQS中大量使用。
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * 设置头结点,期望值为null */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * 设置尾节点 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * 设置一个节点的waitStatus */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * 设置一个节点的next */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }
ReentrantLock
ReentrantLock会保证在同一时间只有一个线程在执行这段代码,或者说同一时刻只有一个线程的lock方法会返回,其他线程会被挂起,直到获取锁。其实ReentrantLock实现的就是一个独占锁的功能。
ReentranctLock内部有一个抽象静态类Sync,继承自AQS,重写了tryRelease和isHeldExclusively方法。
ReentranctLock内部还有FairSync和NonfairSync两个类分别代表公平锁和非公平锁。
公平锁:每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁,类似于排队。
非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁。
公平锁
调用lock方法,由子类调用对应的lock方法,这个lock为Sync类里面的抽象方法,由子类FairSync和NonfairSync实现。
对于FairSync实现如下:
acquire方法其实是AQS里面的模板方法:
tryAcquire是子类需要实现的方法,addWaiter方法是将没有获取到锁的线程包装成Node对象入队列。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果state == 0 标示锁未被占用 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 如果队列中没有其他线程,说明没有线程正在占用锁,此处acquire=1 // 通过CAS操作将状态更新,如果成功标示获取锁,后续再有调用都会因为期望值不为0而失败。
setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 可重入锁,如果当前线程已经获取了锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
如果成功获取锁返回true,否则返回false并将当前的线程放到队列中调用addWaiter(Node.EXCLUSIVE), arg),在此之前,先看看Node数据结构:
static final class Node { // 节点类型:EXCLUSIVE和SHARED // 标示是共享节点 static final Node SHARED = new Node(); // 标示以独占节点等待 static final Node EXCLUSIVE = null; // 节点状态: SIGNAL, CANCELLED, CONDITION, PROPAGATE // 线程(节点)已经取消,由于超时或interrupt,节点到此状态就不在变化,同时线程也不会再次被阻塞。 static final int CANCELLED = 1; // 后续线程需要唤醒,当前节点的后继节点已经阻塞(或即将阻塞),当前节点在释放或取消之前必须唤醒后续节点 static final int SIGNAL = -1; // 线程在等待条件 static final int CONDITION = -2; // 标示无条件传播。releaseShared应该被传播给其他节点,这个只允许设置头结点 static final int PROPAGATE = -3; // 节点状态;可取值:SIGNAL, CANCELLED, CONDITION, PROPAGATE, 0;非负值标示不需要signal,初始化为0.
// 描述节点的状态,AQS队列中,在有并发时,肯定会保存一定数量的节点,每个节点代表了一个线程的状态,有的线程可能等不急获取锁
// 需要放弃竞争,退出队列,有些线程在等待一些条件满足后恢复执行。 volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; // 线程 Node nextWaiter;// 类型 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } }
private Node addWaiter(Node mode) {
// 创建节点 Node node = new Node(Thread.currentThread(), mode); // 如果队列不空,则CAS直接入队列,如果失败了,则自旋入队列直到成功。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);// 入队列 return node; }
将节点放入队列之后,还需要将线程挂起,这个由acquireQueued来完成。
AQS的队列结构如下,队列由Node类型的节点组成,其中至少有两个变量:一个封装线程,一个封装节点类型。第一次插入节点时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点。
// 自旋入队列
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize,如果队列为空,则创建一个空节点 if (compareAndSetHead(new Node())) tail = head; } else { // 入队列,不成功,则在循环,成功则退出 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();
// 如果当前节点的pre节点是头节点,则说明它是第一个等待锁的节点(head是当前持有锁的节点)因此尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); // 如果获取锁成功,则移除head节点,当前节点变成头节点 p.next = null; // help GC failed = false; return interrupted; }
// 检测前一个节点的状态,看当前线程是否需要挂起;只有当前节点的前一个节点为SIGNAL时,当前节点才能挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 挂起 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
至此,一个线程对于锁的一次竞争结束:(1)成功(2)进入队列被挂起。
如果线程被挂起,则等待下次被唤醒后继续循环尝试获取锁,AQS队列为FIFO队列。
下面看看释放锁流程:
调用AQS的release:
对于ReentrantLock调用的tryRelease方法都是在Sync里面实现的:
protected final boolean tryRelease(int releases) { int c = getState() - releases;
// 如果释放锁的线程和占用锁的线程不是同一个,抛出非负监视器异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;
// 因为可重入锁的原因,不是每次释放锁c都等于零,直到最后一次释放锁时,才通知AQS不需要记录占用锁的线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
如果队列不空,则需要唤醒后续的等待节点:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 设置waitStatus为0,以后该状态就不会改变了 /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 如果头结点的后继节点被取消或为null,则反向遍历,寻找未取消的节点作为即将唤醒线程 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
非公平锁
与公平锁唯一的区别就是获取锁的方式不同,非公平锁是抢占式的获取锁。
直接CAS修改一次state(线程抢占获取锁)如果成功返回,否则排队。
多线程-AbstractQueuedSynchronizer(AQS)
标签:lin declared nal 子类 nsa failed public bst this
原文地址:http://www.cnblogs.com/lujiango/p/7592871.html