标签:red tor owa semaphore wait 个数 退出 线程 shutdown
计数器
new CountDownLatch(2)
countDownLatch.countDown(); //-1 countDownLatch.await();//当计数器为0时返回
不用等到子进程全部执行完毕之后再返回
是基于AQS实现的
AQS中的state用来计数了
阻塞调用线程,当计数器值为0时,返回咯
当计数器为0时返回;或者timeoout之后
state减1 tryReleaseShared()
获取当前的state值
CountDownLatch的计数器是一次性的
CyclicBarrier:回环屏障,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CycleBarrierTest { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread() + "task1 merge result"); } }); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @Override public void run() { System.out.println("one in"); try { cyclicBarrier.await(); System.out.println("one out"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); executorService.submit(new Runnable() { @Override public void run() { System.out.println("two in"); try { cyclicBarrier.await(); System.out.println("two out"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); executorService.shutdown(); } /** * one in * two in * Thread[pool-1-thread-2,5,main]task1 merge result * two out * one out */ }
创建了一个CyclicBarrier对象,其中第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。 当第一个线程调用await方法时,计数器减1,第二个线程调用await方法时,减1。如果当前cyclicBarrier中的计数值不等于0时,就线程12都锁住
当cyclicBarrier的值等于0时,才会去执行线程12的任务。然后cyclicBarrier被重置。
等待,阻塞;或者异常退出
等待一定的时间后,如果没有突破屏障,也会返回
如果count == 0了,先执行CyclicBarrier自己的方法,再那唤醒所有的trip条件变量的阻塞进程,
如果count != 0 也没有设置等待,那就直接把线程放进阻塞队列,当前会被挂起并释放lock锁。如果当前线程设置了超时时间,被放进条件变量trip的阻塞队列,不过过段时间后会自动返回
CycleBarrier可以复用
内部计数器是递增,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要直到需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { private static Semaphore semaphore = new Semaphore(0); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread() + "start"); semaphore.release(); System.out.println(Thread.currentThread() + "over"); } }); executorService.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread() + "start"); semaphore.release(); System.out.println(Thread.currentThread() + "over"); } }); try { semaphore.acquire(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("all child thread over"); executorService.shutdown(); } /** * Thread[pool-1-thread-1,5,main]start * Thread[pool-1-thread-1,5,main]over * Thread[pool-1-thread-2,5,main]start * Thread[pool-1-thread-2,5,main]over * all child thread over */ }
阻塞当前main线程,当信号量到达release和初始化值之和时,才能解封当前线程。
当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等于0,则当前线程会被放入AQS的阻塞队列。或者中断线程。
当然有公平的实现和非公平的实现;
获取多个信号量的值,满足就唤醒执行,不满足就不执行呗。
不响应中断
获取指定个数
不响应中断
增加一个信号量,就是state + 1
释放多个信号量
Semaphore的计数器是不可以自动重置的。通过变相改变acquire的参数实现CycleBarrier的功能。AQS实现。
标签:red tor owa semaphore wait 个数 退出 线程 shutdown
原文地址:https://www.cnblogs.com/sicheng-li/p/13205515.html