标签:style blog class code java tar
CyclicBarrier一个线程同步辅助类,它允许一组线程互相等待,直到线程数达到了CyclicBarrier初始时规定的数目时,才继续运行await方法后的业务;CyclicBarrier和CountDownLacth不同,CyclicBarrier是当所有await的线程数量到达了设定的数量后,才继续往下执行。而CountDownLacth是当所有线程都执行完成时,去执行。
CyclicBarrier和CountDownLacth的区别说起来比较绕, 下图是在网上找的一个解释:
实例:(同事们一起聚餐时, 等所有人全部到达饭店后,才能开吃)
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
public void run() {
System.out.println("========全部人员到齐,开吃========");
}
});
for (int i = 1; i <= 5; i++) {
new Thread("同事" + i){
public void run() {
try {
Thread.sleep((long)(Math.random()*1000));
System.out.println("第" + (cyclicBarrier.getNumberWaiting() + 1) + "位到达:" + this.getName());
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("========" + this.getName() + "==============");
};
}.start();
}结果输出:
第1位到达:同事2
第2位到达:同事1
第3位到达:同事5
第4位到达:同事4
第5位到达:同事3
========全部人员到齐,开吃========
========同事2==============
========同事1==============
========同事5==============
========同事4==============
========同事3==============
CyclicBarrier实现:
构造函数:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}barrierAction允许传入一个实现了Runnable的对象,当调用await方法使count的数量递减到0时,首先会执行此Runnable的对象。
int await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}调用dowait方法时,首先使用ReentrantLock进行加锁,然后对count成员属性的值执行减1操作,如果减后的值为0,则执行传入的barrierAction对象。执行完成后将ranAction设置为true, 调用nextGeneration方法并返回0,nextGeneration方法主要调用trip的signalAll方法唤醒所有等待的线程(trip 为Condition类的对象);如果count递去1后的值不等于0,则调用trip的await方法或await 设定时间 挂起当前线程,直到被唤醒、线程被interrupt或超过设定时间,才会从等待状态恢复;如果设定了等待时间,则检查是否超时,如果超过了,则将generation#broken的值设置为true,调用trip的signalAll方法,并抛出TimeoutException异常,执行finally中的释放锁。
private int dowait(boolean timed, long nanos) throws InterruptedException,
BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
// We‘re about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
} private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}int getNumberWaiting()
返回在屏障处等待的线程数目。
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}parties、count为初始化的数目,parties不会改变,而count为每次调用await时递减;
int getParties()
返回参与者数量
public int getParties() {
return parties;
}将屏障重置为其初始状态(即count=parties),如果屏障中还有线程等待,则会抛出BrokenBarrierException异常.
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
} private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}Java多线程:CyclicBarrier,布布扣,bubuko.com
标签:style blog class code java tar
原文地址:http://blog.csdn.net/java2000_wl/article/details/25009795