标签:tor eth and blog ace 个数 概述 hang ima
本文承接上一篇文章:AQS-共享模式分析
CountDownLatch是一个同步计数器,他允许一个或者多个线程在另外一组线程执行完成之前一直等待,基于AQS共享模式实现的,下面就先举一个简单例子,从例子入手分析CountDownLatch的原理。
public class CountDownLatchTest { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); System.out.println("主线程开始执行…… ……"); //第一个子线程执行 ExecutorService es1 = Executors.newSingleThreadExecutor(); es1.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000); System.out.println("子线程:"+Thread.currentThread().getName()+"执行"); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); } }); es1.shutdown(); //第二个子线程执行 ExecutorService es2 = Executors.newSingleThreadExecutor(); es2.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程:"+Thread.currentThread().getName()+"执行"); latch.countDown(); } }); es2.shutdown(); System.out.println("等待两个线程执行完毕…… ……"); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("两个子线程都执行完毕,继续执行主线程"); } }
上面例子中,首先new CountDownLatch,给了同步计数器一个计数值,然后新建了两个线程,每当一个线程执行完之前调用一下countDown()方法,将计数减1,在主线程中执行了await方法,该方法在计数器值大于0之前一直等待,直到计数器为0,结束等待,下面就分析一下CountDownLatch原理。
从图中可以看出CountDownLatch是基于Sync抽象类实现的,而Sync继承AQS,使用的是AQS共享模式。
构造方法如下:
//需要传入计数器的大小 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //初始化,设置资源个数 Sync(int count) { setState(count); } //获取共享资源个数 int getCount() { return getState(); } //尝试获取共享锁,只有当共享资源个数为0的时候,才会返回1,否则为-1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //释放共享资源,通过CAS每次对state减1 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
进入AbstractQueuedSynchronizer #acquireSharedInterruptibly()方法.
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //等待过程不可中断 if (Thread.interrupted()) throw new InterruptedException(); //这里的tryAcquireShared在AbstractQueuedSynchronizer中没有实现,在上面介绍的Sync中实现的 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
在上面介绍Sync类的时候#tryAcquireShared(),当AQS的state = 0的时候才会返回1,否则一直返回-1,如果返回-1,要执行#doAcquireSharedInterruptibly(),进入该方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //这里就把主线程加入队列,队列中有两个节点,第一个是虚拟节点,第二个就是主线程节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //总共只有两个节点,主线程前一个就是首节点 final Node p = node.predecessor(); if (p == head) { //这里又执行到CountDownLatch中Sync类中实现的方法,判断state是否为0 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //如果state不为0,这里会把主线程挂起阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这里使用AQS很神奇,在阻塞队列中就只加入了一个主线程,但是呢,只要其他线程没有执行完,那state就不为0,那主线程就在这里阻塞着,那问题了,谁来唤醒这个主线程呢?就是下面要介绍的方法。
public void countDown() { sync.releaseShared(1); }
进入AbstractQueuedSynchronizer #releaseShared方法
public final boolean releaseShared(int arg) { //该方法同样在AbstractQueuedSynchronizer中没有实现,在CountDownLatch中实现 if (tryReleaseShared(arg)) { //唤醒主线程 doReleaseShared(); return true; } return false; }
在分析Sync类的时候,介绍了tryReleaseShared(),该方法会把AQS的state减1,如果减1操作成功,执行唤醒主线程操作,进入AbstractQueuedSynchronizer#tryReleaseShared()方法
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //首节点状态为SIGNAL = -1 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒主线程,也就是队列中的第二个节点,如果线程没有执行完成,主线程被唤醒之后,发现state依然不为零,会再次阻塞 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }
该方法就是指定等待时间,如果在规定的等待时间中没有完成,就直接返回false,在主线程中可以根据这个状态进行后续的处理。
CountDownLatch这个类对AQS的使用很神奇,像之前介绍的ReentrantLock和Semaphore都会在阻塞队列中放入很多的线程,而CountDownLatch就只在队列中放入一个主线程,然后不停的唤醒,唤醒之后发现不满住条件,主线程又阻塞,很厉害。
参考文章:
标签:tor eth and blog ace 个数 概述 hang ima
原文地址:https://www.cnblogs.com/gunduzi/p/13616806.html