标签:ext command 一个 参考资料 java并发 重排序 for 源码 jdk8
Java并发编程系列23 | 循环屏障CyclicBarrier收录于话题
#进阶架构师 | 并发编程专题
12个
本篇介绍第二个并发工具类CyclicBarrier,CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier),分以下部分介绍:
CyclicBarrier的使用
CyclicBarrier与CountDownLatch比较
CyclicBarrier源码解析
CyclicBarrier要做的事情是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程一起执行。
1.1 API
CyclicBarrier(int parties):构造方法,parties表示拦截线程的数量。
CyclicBarrier(int parties, Runnable barrierAction) :barrierAction用于在线程到达屏障时优先执行b,用于处理更加复杂的业务场景。
await():将当前线程阻塞,等到所有的线程都到达指定的临界点后一起执行。
getNumberWaiting():获取当前有多少个线程阻塞等待在临界点上。
reset():将屏障重置为初始状态。
1.2 使用举例
举个例子说明CyclicBarrier的使用:8个运动员参加比赛,运动员可能到达赛场的时间不一样,要等8个运动员到齐了才开始比赛,代码如下:
public class CyclicBarrierTest {
private static CyclicBarrier barrier = new CyclicBarrier(8, () -> {
System.out.println("所有运动员入场,裁判员一声令下!!!");
});
public static void main(String[] args) {
System.out.println("运动员准备进场,全场欢呼......");
for (int i = 0; i < 8; i++) {
new Thread() {
public void run() {
System.out.println(Thread.currentThread().getName() + " 运动员到达起点,准备好了!!!");
try {
barrier.await();// 运动员等待,等所有运动员全部到齐后一起开始比赛
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 运动员出发!!!");
};
}.start();
}
}
}
输出结果:
运动员准备进场,全场欢呼......
Thread-0 运动员到达起点,准备好了!!!
Thread-2 运动员到达起点,准备好了!!!
Thread-4 运动员到达起点,准备好了!!!
Thread-1 运动员到达起点,准备好了!!!
Thread-3 运动员到达起点,准备好了!!!
Thread-5 运动员到达起点,准备好了!!!
Thread-6 运动员到达起点,准备好了!!!
Thread-7 运动员到达起点,准备好了!!!
所有运动员入场,裁判员一声令下!!!
Thread-7 运动员出发!!!
Thread-0 运动员出发!!!
Thread-1 运动员出发!!!
Thread-4 运动员出发!!!
Thread-2 运动员出发!!!
Thread-6 运动员出发!!!
Thread-5 运动员出发!!!
Thread-3 运动员出发!!!
CountDownLatch用于一个线程等待若干个其他线程执行完任务之后才执行,强调一个线程等待,这个线程会阻塞。而CyclicBarrier用于一组线程互相等待至某个状态,然后这一组线程再同时执行,强调的是多个线程互等,这多个线程阻塞,等大家都完成,再携手共进。
CountDownLatch是不能复用的,而CyclicLatch是可以复用的。使用reset()方法将屏障重置为初始状态之后就可以复用。
CyclicBarrier提供了更多的方法,能够通过getNumberWaiting()获取阻塞线程的数量,通过isBroken()方法可以知道阻塞的线程是否被中断。
CyclicBarrier是通过Lock的Condition实现的,每个CyclicBarrier对应个Lock锁和该锁的condition条件。
创建CyclicBarrier时设置一个count计数,当调用await()时做两件事:①将count-1 ②将线程阻塞并构造成结点加入condition条件队列。
当count变为0时,达到等待线程数量要求,condition将条件队列中的线程全部唤醒。
3.1 类结构
public class CyclicBarrier {
private static class Generation { // 内部类,当有parties个线程到达barrier就会更新换代
boolean broken = false; // 是否损坏
}
private final ReentrantLock lock = new ReentrantLock(); // 重入锁
private final Condition trip = lock.newCondition();
private final int parties; // 等待线程总数量
private final Runnable barrierCommand; // 达到等待线程数量后执行的线程
private Generation generation = new Generation(); // 当有parties个线程到达barrier,就会更新换代
private int count; // 记录当前线程数量
}
3.2 构造方法
将parties设置为count值,设置达到等待线程数量后优先执行的线程
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties; // 保存parties可循环使用
this.count = parties; // 将parties设置为count值
this.barrierCommand = barrierAction;// 设置达到等待线程数量后优先执行的线程
}
3.3 await()
await()方法:
①将count-1
②将线程阻塞并构造成结点加入condition条件队列。
③当count变为0时,condition将条件队列中的线程全部唤醒。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier(); // 代失效,唤醒所有线程
throw new InterruptedException();
}
int index = --count; // 计数
if (index == 0) { // 达到要求数量
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 达到等待线程数量后执行barrierCommand
ranAction = true;
nextGeneration(); // 唤醒本代所有线程,生成新一代,重置count
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 线程数量未达到要求数量,将线程挂起等待
for (;;) {
try {
if (!timed)
trip.await(); // 将线程加入condition队列挂起
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 特殊情况处理
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 当前代失效,唤醒所有线程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
// 唤醒本代所有线程,生成新一代,重置count
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
CyclicBarrier要做的事情是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
注意CyclicBarrier与CountDownLatch的区别:CountDownLatch用于一个线程等待若干个其他线程执行完任务之后才执行,而CyclicBarrier强调的是多个线程互等,等大家都完成,再携手共进。此外,CyclicBarrier功能更加强大,可以循环使用。
CyclicBarrier是通过Lock的Condition实现的,每个CyclicBarrier对应个Lock锁和该锁的condition条件。创建CyclicBarrier时设置一个count计数,当调用await()时做两件事:①将count-1 ②将线程阻塞并构造成结点加入condition条件队列。当count变为0时,达到等待线程数量要求,condition将条件队列中的线程全部唤醒。
参考资料
《Java并发编程之美》
《Java并发编程实战》
《Java并发编程的艺术》
【原创】01|开篇获奖感言
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深入理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】12|揭秘 CAS
【原创】13|LockSupport
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
【原创】16|公平锁与非公平锁
【原创】17|读写锁八讲(上)
【原创】18|读写锁八讲(下)
【原创】19|JDK8新增锁StampedLock
【原创】20|StampedLock源码解析
【原创】21|Condition-Lock的等待通知
之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。
《java面试宝典5.0》(初中级)
《350道Java面试题:整理自100+公司》(中高级)
《资深java面试宝典-视频版》(资深)
《Java[BAT]面试必备》(资深)
分别适用于初中级,中高级,资深级工程师的面试复习。
内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。
获取方式:点“在看”,V信关注上述单号并回复 【面试】即可领取,更多精彩陆续奉上。
看到这里,证明有所收获
必须点个在看支持呀,喵
Java并发编程系列23 | 循环屏障CyclicBarrie
标签:ext command 一个 参考资料 java并发 重排序 for 源码 jdk8
原文地址:https://blog.51cto.com/15009303/2552593