标签:signal 比较 java并发编程 amp control 一句话 技术分享 pat tst
文章部分图片和代码来自参考文章。
阅读源码首先看一下注解 ,知道了大概的意思后,再进行分析。注释一开始就进行了概括。AQS的实现是基于FIFO等待队列的。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
可以从上图看到,AQS里面先是定义了这两个类,AQS的实现也都会围绕着两个类进行。为了了解这两个类我们先了解一下以下知识。
AQS中有个Node,可以看到它的注释 :
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks.
Node类是一个变形的 CLH . 那么CLH是什么呢?它是自旋锁的一种。下面的文章讲解了什么是CLH : 克雷格.兰丁&hagersten (CLH Lock) , 简单来说就是当前节点要运行就必须等待当前节点的前一个节点释放锁。运行以下代码 :
示例代码来自参考资料
1 public class ClhSpinLock implements Lock{ 2 private final ThreadLocal<Node> prev; 3 private final ThreadLocal<Node> node; 4 private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node()); 5 6 public ClhSpinLock() { 7 this.node = new ThreadLocal<Node>() { 8 protected Node initialValue() { 9 return new Node(); 10 } 11 }; 12 13 this.prev = new ThreadLocal<Node>() { 14 protected Node initialValue() { 15 return null; 16 } 17 }; 18 } 19 20 /** 21 * 1.初始状态 tail指向一个node(head)节点 22 * +------+ 23 * | head | <---- tail 24 * +------+ 25 * 26 * 2.lock-thread加入等待队列: tail指向新的Node,同时Prev指向tail之前指向的节点 27 * +----------+ 28 * | Thread-A | 29 * | := Node | <---- tail 30 * | := Prev | -----> +------+ 31 * +----------+ | head | 32 * +------+ 33 * 34 * +----------+ +----------+ 35 * | Thread-B | | Thread-A | 36 * tail ----> | := Node | --> | := Node | 37 * | := Prev | ----| | := Prev | -----> +------+ 38 * +----------+ +----------+ | head | 39 * +------+ 40 * 3.寻找当前node的prev-node然后开始自旋 41 * 42 */ 43 public void lock() { 44 final Node node = this.node.get(); 45 node.locked = true; 46 Node pred = this.tail.getAndSet(node); 47 this.prev.set(pred); 48 // 自旋 49 while (pred.locked); 50 } 51 52 public void unlock() { 53 final Node node = this.node.get(); 54 node.locked = false; 55 this.node.set(this.prev.get()); 56 } 57 58 @Override 59 public void lockInterruptibly() throws InterruptedException { 60 // TODO Auto-generated method stub 61 62 } 63 64 @Override 65 public boolean tryLock() { 66 // TODO Auto-generated method stub 67 return false; 68 } 69 70 @Override 71 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 72 // TODO Auto-generated method stub 73 return false; 74 } 75 76 @Override 77 public Condition newCondition() { 78 // TODO Auto-generated method stub 79 return null; 80 } 81 82 private static class Node { 83 private volatile boolean locked; 84 } 85 }
1 public class LockTest { 2 static int count = 0; 3 4 public static void testLock(Lock lock) { 5 try { 6 lock.lock(); 7 for (int i = 0; i < 10000000; i++) ++count; 8 } finally { 9 lock.unlock(); 10 } 11 } 12 13 public static void main(String[] args) throws InterruptedException, BrokenBarrierException { 14 final ClhSpinLock clh = new ClhSpinLock(); 15 final CyclicBarrier cb = new CyclicBarrier(10, new Runnable() { 16 @Override 17 public void run() { 18 System.out.println(count); 19 } 20 }); 21 22 for (int i = 0; i < 10; i++) { 23 new Thread(new Runnable() { 24 @Override 25 public void run() { 26 testLock(clh); 27 // 这段代码是非lock比较使用 28 // for (int i = 0; i < 10000000; i++) 29 // count++; 30 try { 31 cb.await(); 32 } catch (InterruptedException | BrokenBarrierException e) { 33 e.printStackTrace(); 34 } 35 } 36 }).start(); 37 } 38 } 39 }
上面的代码出现了ThreadLocal ,可以先看一下这篇文章,了解一下 ThreadLocal : Java并发编程:深入剖析ThreadLocal
LockSupport,构建同步组件的基础工具,帮AQS完成相应线程的阻塞或者唤醒的工作。先阅读 : java并发编程之LockSupport 和 Java并发包源码学习之AQS框架(三)LockSupport和interrupt 可以知道LockSupport 重要的方法有 :unpark 和 park 。LockSupport所有的方法都是调用native的park和unpark实现的。具体源码查看上面的链接。
为什么在java6要在入参引入blocker呢?blocker的作用到底是什么?
有blocker的可以传递给开发人员更多的现场信息,可以查看到当前线程的阻塞对象,方便定位问题。所以java6新增加带blocker入参的系列park方法,替代原有的park方法。
ConditonObject 继承自 Condition 接口。我们看一下这个接口是干嘛的?
Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.
最后一句话可以知道,Condition 这个接口是代替了 Object 这个类的有关monitor的方法。
再看一下ConditionObject 在 AQS中到底是什么作用。
Condition implementation for a AbstractQueuedSynchronizer serving as the basis of a Lock implementation.
ConditionObject 作为 Condition的一个实现,在AQS中实现了Lock的基本功能。
下面有几点需要注意 :
LockSupport 与 对象的 wait/notify的不同在于:
以下部分总结来自 Java并发(五):aqs框架
先说说几个概念
statue
来维护状态 (int state这个局部变量)
private transient volatile Node tail;enq
方法在向队列添加新线程时,才会修改该值 。
下面是offset相关的变量,主要是为了提供CAS操作而设立的变量:
已经实现的方法如下
需要被子类实现的方法
protected boolean tryAcquire(int arg)
尝试获取独占锁,在获取前会检查同步状态是否允许获取独占锁。获取成功则返回true,返回false则把线程加入到等待队列。
protected boolean tryRelease(int arg)
尝试通过设置同步状态,来释放独占锁。
protected int tryAcquireShared(int arg)
尝试获取共享锁,在获取前会检查同步状态是否允许获取共享锁。获取成功则返回true,返回false则把线程加入到等待队列。
protected boolean tryReleaseShared(int arg)
尝试通过设置同步状态,来释放共享锁。
下面内容部分来自 一行一行源码分析清楚AbstractQueuedSynchronizer
AQS里面有一个Node类,它的结构如下:
1 static final class Node { 2 /** Marker to indicate a node is waiting in shared mode */ 3 // 标识节点当前在共享模式下 4 static final Node SHARED = new Node(); 5 /** Marker to indicate a node is waiting in exclusive mode */ 6 // 标识节点当前在独占模式下 7 static final Node EXCLUSIVE = null; 8 9 // ======== 下面的几个int常量是给waitStatus用的 =========== 10 /** waitStatus value to indicate thread has cancelled */ 11 // 代码此线程取消了争抢这个锁 12 static final int CANCELLED = 1; 13 /** waitStatus value to indicate successor‘s thread needs unparking */ 14 // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒 15 static final int SIGNAL = -1; 16 /** waitStatus value to indicate thread is waiting on condition */ 17 // 本文不分析condition,所以略过吧,下一篇文章会介绍这个 18 static final int CONDITION = -2; 19 /** 20 * waitStatus value to indicate the next acquireShared should 21 * unconditionally propagate 22 */ 23 // 同样的不分析,略过吧 24 static final int PROPAGATE = -3; 25 // ===================================================== 26 27 // 取值为上面的1、-1、-2、-3,或者0(以后会讲到) 28 // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待, 29 // 也许就是说半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。 30 volatile int waitStatus; 31 // 前驱节点的引用 32 volatile Node prev; 33 // 后继节点的引用 34 volatile Node next; 35 // 这个就是线程本尊 36 volatile Thread thread; 37 38 }
下面重点说一下这块代码,这是独占模式下的加锁获取
1 // 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。 2 // 否则,acquireQueued方法会将线程压到队列中 3 public final void acquire(int arg) { // 此时 arg == 1 4 // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试 5 // 因为有可能直接就成功了呢,也就不需要进队列排队了, 6 // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的) 7 if (!tryAcquire(arg) && 8 // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。 9 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 10 selfInterrupt(); 11 } 12 } 13 14 15 final boolean acquireQueued(final Node node, int arg) { 16 boolean failed = true; 17 try { 18 boolean interrupted = false; 19 for (;;) { 20 final Node p = node.predecessor(); 21 if (p == head && tryAcquire(arg)) { 22 setHead(node); 23 p.next = null; // help GC 24 failed = false; 25 return interrupted; 26 } 27 if (shouldParkAfterFailedAcquire(p, node) && 28 parkAndCheckInterrupt())
// 说明当前线程是被中断唤醒的。
// 注意:线程被中断之后会继续走到if处去判断,也就是会忽视中断。
// 除非碰巧线程中断后acquire成功了,那么根据Java的最佳实践,
// 需要重新设置线程的中断状态(acquire.selfInterrupt)。
29 interrupted = true; 30 } 31 } finally { 32 if (failed) 33 cancelAcquire(node); 34 } 35 } 36
1 /** 2 * Creates and enqueues node for current thread and given mode. 3 * 4 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 5 * @return the new node 6 */ 7 // 此方法的作用是把线程包装成node,同时进入到队列中 8 // 参数mode此时是Node.EXCLUSIVE,代表独占模式 9 private Node addWaiter(Node mode) { 10 Node node = new Node(Thread.currentThread(), mode); 11 // Try the fast path of enq; backup to full enq on failure 12 // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后 13 Node pred = tail; 14 15 // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧) 16 if (pred != null) { 17 // 设置自己的前驱 为当前的队尾节点 18 node.prev = pred; 19 // 用CAS把自己设置为队尾, 如果成功后,tail == node了 20 if (compareAndSetTail(pred, node)) { 21 // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连, 22 // 上面已经有 node.prev = pred 23 // 加上下面这句,也就实现了和之前的尾节点双向连接了 24 pred.next = node; 25 // 线程入队了,可以返回了 26 return node; 27 } 28 } 29 // 仔细看看上面的代码,如果会到这里, 30 // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队) 31 // 读者一定要跟上思路,如果没有跟上,建议先不要往下读了,往回仔细看,否则会浪费时间的 32 enq(node); 33 return node; 34 }
1 /** 2 * Inserts node into queue, initializing if necessary. See picture above. 3 * @param node the node to insert 4 * @return node‘s predecessor 5 */ 6 // 采用自旋的方式入队 7 // 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队, 8 // 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的 9 private Node enq(final Node node) { 10 for (;;) { 11 Node t = tail; 12 // 之前说过,队列为空也会进来这里 13 if (t == null) { // Must initialize 14 // 初始化head节点 15 // 细心的读者会知道原来head和tail初始化的时候都是null,反正我不细心 16 // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢 17 if (compareAndSetHead(new Node())) 18 // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了 19 20 // 这个时候有了head,但是tail还是null,设置一下, 21 // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了 22 // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return 23 // 所以,设置完了以后,继续for循环,下次就到下面的else分支了 24 tail = head; 25 } else { 26 // 下面几行,和上一个方法 addWaiter 是一样的, 27 // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排 28 node.prev = t; 29 if (compareAndSetTail(t, node)) { 30 t.next = node; 31 return t; 32 } 33 } 34 } 35 } 36
走完了end方法,那么该节点肯定是插入到了等待队列。继续前进
1 /** 2 * Checks and updates status for a node that failed to acquire. 3 * Returns true if thread should block. This is the main signal 4 * control in all acquire loops. Requires that pred == node.prev 5 * 6 * @param pred node‘s predecessor holding status 7 * @param node the node 8 * @return {@code true} if thread should block 9 */ 10 // 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?" 11 // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点 12 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 13 int ws = pred.waitStatus; 14 // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true 15 if (ws == Node.SIGNAL) 16 /* 17 * This node has already set status asking a release 18 * to signal it, so it can safely park. 19 */ 20 return true; 21 22 // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。这里需要知道这点: 23 // 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。 24 // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点, 25 // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队, 26 // 找前驱节点的前驱节点做爹,往前循环总能找到一个好爹的 27 if (ws > 0) { 28 /* 29 * Predecessor was cancelled. Skip over predecessors and 30 * indicate retry. 31 */ 32 do { 33 node.prev = pred = pred.prev; 34 } while (pred.waitStatus > 0); 35 pred.next = node; 36 } else { 37 /* 38 * waitStatus must be 0 or PROPAGATE. Indicate that we 39 * need a signal, but don‘t park yet. Caller will need to 40 * retry to make sure it cannot acquire before parking. 41 */ 42 // 仔细想想,如果进入到这个分支意味着什么 43 // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3 44 // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0 45 // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1) 46 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 47 } 48 return false; 49 } 50 51 // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 52 // 这个方法结束根据返回值我们简单分析下: 53 // 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒 54 // 我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了 55 // 如果返回false, 说明当前不需要被挂起,为什么呢?往后看 56 57 // 跳回到前面是这个方法 58 // if (shouldParkAfterFailedAcquire(p, node) && 59 // parkAndCheckInterrupt()) 60 // interrupted = true; 61 62 // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true, 63 // 那么需要执行parkAndCheckInterrupt(): 64 65 // 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的 66 // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒======= 67 private final boolean parkAndCheckInterrupt() { 68 LockSupport.park(this); 69 return Thread.interrupted(); 70 } 71 72 // 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况 73 74 // 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。 75 76 // 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程: 77 // => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。 78 }
我们结合上面的代码知道 acquireQueued 方法有个for(;;)循环,shouldParkAfterFailedAcquire 方法的作用是判断当前节点是否应该阻塞等待唤醒。内部过滤掉了Node节点waitStatus >0 的节点(即现在这段时间可能会取消掉抢锁),同时把状态都变为了 SIGNAL。
shouldParkAfterFailedAcquire方法的作用是:
parkAndCheckInterrupt() 这个方法一旦进入就会进入阻塞。注意哦,parkAndCheckInterrupt 这个方法一直等待阻塞,等待唤醒,但是唤醒之并且没有退出循环哦,所以会继续执行哦,等到什么时候才退出呢?等待第一个节点获取到锁了才退出。这样就形成了后一个节点等待上一个节点释放锁等待唤醒的等待队列。 唤醒之后,acquireQueued 退出之后,就会进入 selfInterrupt 方法 。
一旦tryAcquire成功则立即返回,否则线程会加入队列 线程可能会反复的被阻塞和唤醒直到tryAcquire成功,这是因为线程可能被中断, 而acquireQueued方法中会保证忽视中断(for循环),只有tryAcquire成功了才返回。中断版本的独占获取是
acquireInterruptibly
这个方法,doAcquireInterruptibly
这个方法中如果线程被中断则acquireInterruptibly
会抛出InterruptedException
异常。
线程被唤醒只可能是:被unpark
,被中断或伪唤醒。被中断会设置interrupted
,acquire方法返回前会 selfInterrupt
重置下线程的中断状态,如果是伪唤醒的话会for循环re-check。这里涉及到 LockSupport 可以看这篇文章: Java并发包源码学习之AQS框架(三)LockSupport和interrupt 。
比较简单只要直接唤醒后续结点就可以了,后续结点会从parkAndCheckInterrupt
方法中返回。
1 public final boolean release(int arg) { 2 // tryReease由子类实现,通过设置state值来达到同步的效果。 3 if (tryRelease(arg)) { 4 Node h = head; 5 // waitStatus为0说明是初始化的空队列 6 if (h != null && h.waitStatus != 0) 7 // 唤醒后续的结点 8 unparkSuccessor(h); 9 return true; 10 } 11 return false; 12 }
acquireShared
方法是用来共享模式获取。
1 public final void acquireShared(int arg) { 2 //如果没有许可了则入队等待 3 if (tryAcquireShared(arg) < 0) 4 doAcquireShared(arg); 5 } 6 7 private void doAcquireShared(int arg) { 8 // 添加队列 9 final Node node = addWaiter(Node.SHARED); 10 boolean failed = true; 11 try { 12 boolean interrupted = false; 13 // 等待前继释放并传递 14 for (;;) { 15 final Node p = node.predecessor(); 16 if (p == head) { 17 int r = tryAcquireShared(arg);// 尝试获取 18 if (r >= 0) { 19 // 获取成功则前继出队,跟独占不同的是 20 // 会往后面结点传播唤醒的操作,保证剩下等待的线程能够尽快 获取到剩下的许可。 21 setHeadAndPropagate(node, r); 22 p.next = null; // help GC 23 if (interrupted) 24 selfInterrupt(); 25 failed = false; 26 return; 27 } 28 } 29 30 // p != head || r < 0 31 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 32 interrupted = true; 33 } 34 } 35 finally { 36 if (failed) 37 cancelAcquire(node); 38 } 39 }
核心是这个doAcquireShared
方法,跟独占模式的acquireQueued
很像,主要区别在setHeadAndPropagate
方法中, 这个方法会将node设置为head。如果当前结点acquire到了之后发现还有许可可以被获取,则继续释放自己的后继, 后继会将这个操作传递下去。这就是PROPAGATE
状态的含义。
1 private void setHeadAndPropagate(Node node, int propagate) { 2 Node h = head; // Record old head for check below 3 setHead(node); 4 /* 5 * 尝试唤醒后继的结点:<br /> 6 * propagate > 0说明许可还有能够继续被线程acquire;<br /> 7 * 或者 之前的head被设置为PROPAGATE(PROPAGATE可以被转换为SIGNAL)说明需要往后传递;<br /> 8 * 或者为null,我们还不确定什么情况。 <br /> 9 * 并且 后继结点是共享模式或者为如上为null。 10 * <p> 11 * 上面的检查有点保守,在有多个线程竞争获取/释放的时候可能会导致不必要的唤醒。<br /> 12 * 13 */ 14 if (propagate > 0 || h == null || h.waitStatus < 0) { 15 Node s = node.next; 16 // 后继结是共享模式或者s == null(不知道什么情况) 17 // 如果后继是独占模式,那么即使剩下的许可大于0也不会继续往后传递唤醒操作 18 // 即使后面有结点是共享模式。 19 if (s == null || s.isShared()) 20 // 唤醒后继结点 21 doReleaseShared(); 22 } 23 } 24 25 private void doReleaseShared() { 26 for (;;) { 27 Node h = head; 28 // 队列不为空且有后继结点 29 if (h != null && h != tail) { 30 int ws = h.waitStatus; 31 // 不管是共享还是独占只有结点状态为SIGNAL才尝试唤醒后继结点 32 if (ws == Node.SIGNAL) { 33 // 将waitStatus设置为0 34 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 35 continue; // loop to recheck cases 36 unparkSuccessor(h);// 唤醒后继结点 37 // 如果状态为0则更新状态为PROPAGATE,更新失败则重试 38 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 39 continue; // loop on failed CAS 40 } 41 // 如果过程中head被修改了则重试。 42 if (h == head) // loop if head changed 43 break; 44 } 45 }
主要逻辑也就是doReleaseShared
。
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) { 3 doReleaseShared(); 4 return true; 5 } 6 return false; 7 }
java 并发(五)---AbstractQueuedSynchronizer
标签:signal 比较 java并发编程 amp control 一句话 技术分享 pat tst
原文地址:https://www.cnblogs.com/Benjious/p/10101189.html