首页 > 其他好文 > 详细

CyclicBarrier 栅栏 原理,应用场景

时间:2019-10-05 01:10:51      阅读:100      评论:0      收藏:0      [点我收藏+]

标签:only   syn   ++i   --   event   cbe   rup   b2c   invoke   


栅栏与闭锁的关键区别 CyclicBarrier和CountDownLatch的区别



1. CyclicBarrier 方法多,可以用reset()方法来重置CyclicBarrier,让栅栏可以反复用。而CountDownLatch如果count变为0了,那么只能保持在这个0的最终状态,不能重新再用。

2. CyclicBarrier 是让一组线程等待某个事件发生,如果发生了,这组线程可以继续执行;CountDownLatch是一个线程或多个线程等待一组线程执行完毕。不同的点就在于当count变为0之后,CyclicBarrier是让这组线程不再阻塞,而继续执行;而CountDownLatch是让等待的线程不阻塞,继续执行。



MeetingLeaderTask:  领导线程。

OpenMeetingTask, 组员线程。

TestOpenMeeting 测试线程

package com.citi.test.mutiplethread.demo5;

public class MeetingLeaderTask implements Runnable {
    public void run() {
        try {
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
View Code
package com.citi.test.mutiplethread.demo5;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class OpenMeetingTask implements Runnable {
    private final CyclicBarrier barrier;
    private final String name;
    private final int arriveTime;

    public OpenMeetingTask(CyclicBarrier barrier,String name,int arriveTime) {

    public void run() {
        try {
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
View Code
package com.citi.test.mutiplethread.demo5;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestOpenMeeting {
    public static void main(String[] args) {
        CyclicBarrier barrier=new CyclicBarrier(3,new MeetingLeaderTask());
//        Executor executor=Executors.newFixedThreadPool(3);
        ExecutorService executor=Executors.newFixedThreadPool(3);
        executor.execute(new OpenMeetingTask(barrier,"C罗", 5));
        executor.execute(new OpenMeetingTask(barrier,"小罗", 3));
        executor.execute(new OpenMeetingTask(barrier,"卡卡", 1));
View Code

下面是代码原理 。主要讲解几个重要方法,还有成员变量的意义。

 * A synchronization aid that allows a set of threads to all wait for
 * each other to reach a common barrier point.  CyclicBarriers are
 * useful in programs involving a fixed sized party of threads that
 * must occasionally wait for each other. The barrier is called
 * <em>cyclic</em> because it can be re-used after the waiting threads
 * are released.
 * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
 * that is run once per barrier point, after the last thread in the party
 * arrives, but before any threads are released.
 * This <em>barrier action</em> is useful
 * for updating shared-state before any of the parties continue.
 * <p><b>Sample usage:</b> Here is an example of
 *  using a barrier in a parallel decomposition design:
 * <pre>
 * class Solver {
 *   final int N;
 *   final float[][] data;
 *   final CyclicBarrier barrier;
 *   class Worker implements Runnable {
 *     int myRow;
 *     Worker(int row) { myRow = row; }
 *     public void run() {
 *       while (!done()) {
 *         processRow(myRow);
 *         try {
 *           barrier.await();
 *         } catch (InterruptedException ex) {
 *           return;
 *         } catch (BrokenBarrierException ex) {
 *           return;
 *         }
 *       }
 *     }
 *   }
 *   public Solver(float[][] matrix) {
 *     data = matrix;
 *     N = matrix.length;
 *     barrier = new CyclicBarrier(N,
 *                                 new Runnable() {
 *                                   public void run() {
 *                                     mergeRows(...);
 *                                   }
 *                                 });
 *     for (int i = 0; i < N; ++i)
 *       new Thread(new Worker(i)).start();
 *     waitUntilDone();
 *   }
 * }
 * </pre>
 * Here, each worker thread processes a row of the matrix then waits at the
 * barrier until all rows have been processed. When all rows are processed
 * the supplied {@link Runnable} barrier action is executed and merges the
 * rows. If the merger
 * determines that a solution has been found then <tt>done()</tt> will return
 * <tt>true</tt> and each worker will terminate.
 * <p>If the barrier action does not rely on the parties being suspended when
 * it is executed, then any of the threads in the party could execute that
 * action when it is released. To facilitate this, each invocation of
 * {@link #await} returns the arrival index of that thread at the barrier.
 * You can then choose which thread should execute the barrier action, for
 * example:
 * <pre>  if (barrier.await() == 0) {
 *     // log the completion of this iteration
 *   }</pre>
 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
 * for failed synchronization attempts: If a thread leaves a barrier
 * point prematurely because of interruption, failure, or timeout, all
 * other threads waiting at that barrier point will also leave
 * abnormally via {@link BrokenBarrierException} (or
 * {@link InterruptedException} if they too were interrupted at about
 * the same time).
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code await()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions that are part of the barrier action, which in turn
 * <i>happen-before</i> actions following a successful return from the
 * corresponding {@code await()} in other threads.
 * @since 1.5
 * @see CountDownLatch
 * @author Doug Lea
 public class CyclicBarrier {
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which <tt>count</tt> applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
    private static class Generation {
        boolean broken = false;

    /** The lock for guarding barrier entry 
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();

     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
    private int count;
 * Waits until all {@linkplain #getParties parties} have invoked
 * <tt>await</tt> on this barrier.
 * 等待直到所有的线程都调用了这个屏障的await方法。
 * <p>If the current thread is not the last to arrive then it is
 * disabled for thread scheduling purposes and lies dormant until
 * one of the following things happens:
 * <ul>
 * <li>The last thread arrives; or
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread; or
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * one of the other waiting threads; or
 * <li>Some other thread times out while waiting for barrier; or
 * <li>Some other thread invokes {@link #reset} on this barrier.
 * </ul>
 * <p>If the current thread:
 * <ul>
 * <li>has its interrupted status set on entry to this method; or
 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 * </ul>
 * then {@link InterruptedException} is thrown and the current thread‘s
 * interrupted status is cleared.
 * <p>If the barrier is {@link #reset} while any thread is waiting,
 * or if the barrier {@linkplain #isBroken is broken} when
 * <tt>await</tt> is invoked, or while any thread is waiting, then
 * {@link BrokenBarrierException} is thrown.
 或者当任何线程等待时 会抛出中断异常。
 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
 * then all other waiting threads will throw
 * {@link BrokenBarrierException} and the barrier is placed in the broken
 * state.
 * <p>If the current thread is the last thread to arrive, and a
 * non-null barrier action was supplied in the constructor, then the
 * current thread runs the action before allowing the other threads to
 * continue.
 * If an exception occurs during the barrier action then that exception
 * will be propagated in the current thread and the barrier is placed in
 * the broken state.
 * @return the arrival index of the current thread, where index
 *         <tt>{@link #getParties()} - 1</tt> indicates the first
 *         to arrive and zero indicates the last to arrive
 * @throws InterruptedException if the current thread was interrupted
 *         while waiting
 * @throws BrokenBarrierException if <em>another</em> thread was
 *         interrupted or timed out while the current thread was
 *         waiting, or the barrier was reset, or the barrier was
 *         broken when {@code await} was called, or the barrier
 *         action (if present) failed due an exception.
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
这个方法是比较重要的方法。 会涵盖各种的策略。
 * Main barrier code, covering the various policies.
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            throw new InterruptedException();

       int index = --count;
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               final Runnable command = barrierCommand;
               if (command != null)
               ranAction = true;
               return 0;
           } finally {
               if (!ranAction)

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    throw ie;
                } else {
                    // We‘re about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                throw new TimeoutException();
    } finally {
View Code

参考资料: https://blog.csdn.net/qq_38293564/article/details/80558157





CyclicBarrier 栅栏 原理,应用场景

标签:only   syn   ++i   --   event   cbe   rup   b2c   invoke   


评论 一句话评论(0
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com