码迷,mamicode.com
首页 > 编程语言 > 详细

Java并发包中线程并发器

时间:2020-02-06 14:54:31      阅读:89      评论:0      收藏:0      [点我收藏+]

标签:catch   不同   map   个数   index   inf   wait   get   extend   

一、CountDownLatch

场景:主线程需要等待所有子线程执行完毕后再进行汇总

CountDownLatch实现比较简单,继承AQS实现了一个不可重入共享锁Sync

技术图片

1.不可重入共享锁Sync

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

//尝试获取锁 仅state==0时才能获取成功
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
//尝试释放锁
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }

2.方法

1)void await()

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//尝试获取锁,不忽略中断引起的返回
    }

2)boolean await(long timeout, TimeUnit unit)

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//尝试一定时间内获取锁
    }

3)void countDown()

    public void countDown() {
        sync.releaseShared(1);
    }

3.实例

public class CountDownLatchTest {

    //定义CountDownLatch  实际创建共享锁  且锁已被两个线程持有 state == 2
    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println("childThreadOne over");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //线程1释放共享锁,state--
                    countDownLatch.countDown();
                }
            }
        });
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println("childThreadTwo over");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //线程2释放共享锁,state--
                    countDownLatch.countDown();
                }
            }
        });
        System.out.println("wait all child thread over");
        //主线程阻塞, 实际尝试获取共享锁 ,仅state == 0时获取成功或被中断打断引起异常
        countDownLatch.await();
        System.out.println("all child thread over");
        pool.shutdown();
    }
}

二、CyclicBarrier回环屏障

场景:和CountDownLatch场景一样,但是CountDownLatch是一次性的,CyclicBarrier可重复使用;实现方式不同,所以使用方式也不同,见后面实例

CyclicBarrier采用独占锁ReentranLock及条件变量trip(阻塞到达屏障的线程)实现

设置一道屏障,①当线程数小于屏障规定的线程数时,线程入trip条件阻塞队列,线程阻塞;②当线程数等于屏障规定的线程数时,唤醒trip中所有的线程,并重置计数器状态(越过屏障)

另外CyclicBarrier也不忽略中断引起的返回,会抛出异常,屏障会失效,抛错genetation.barrier = true

1)变量与构造方法

    /** 独占锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 条件变量 */
    private final Condition trip = lock.newCondition();
    /** 屏障阻塞的线程个数 */
    private final int parties;
    /* 突破屏障后执行的任务  默认为空 */
    private final Runnable barrierCommand;
    /** 默认false,当前屏障被中断打破后,设置为true,继续使用屏障会抛出异常BrokenBarrierException */
    private Generation generation = new Generation();

    /**
     * 实际计数器  count == 0时突破屏障
     */
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

2.方法

1)int dowait(boolean timed, long nanos)

    /**
     * 主要代码
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                //中断引起的跨过屏障,后续await屏障都会抛错
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                //当前线程被中断,唤醒trip的所有阻塞线程,设置g.broken == true,抛出异常
                breakBarrier();
                throw new InterruptedException();
            }
            //调用一次数据器-1
            int index = --count;
            //当计数器 == 0时,达到屏蔽的线程数,越过屏障
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        //先执行屏障任务
                        command.run();
                    ranAction = true;
                    //唤醒条件变量中所有线程trip.signalAll();
                    //重置计数器count = parties;
                    //重置版本generation = new Generation();
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        //执行屏障任务抛错时,
                        //依然唤醒所有阻塞线程,
                        //但设置g.barrier == true,后续屏障都会抛错
                        breakBarrier();
                }
            }

            // 当计数器 != 0 时,当前线程入条件阻塞队列
            for (;;) {
                try {
                    if (!timed)
                        //无限阻塞
                        trip.await();
                    else if (nanos > 0L)
                        //超时阻塞
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

2)int await()

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

3) int await(long timeout, TimeUnit unit)

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

3.实例

public class CyclicBarrierTest {

    //设置屏障线程数为2  state = 2 
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(2);

        pool.submit(new Runnable(){
            @Override
            public void run() {
                try {
                    System.out.println("thread1 step1");
                    //线程1入trip阻塞队列,state--
                    cyclicBarrier.await();
                    System.out.println("thread1 step2");
                    cyclicBarrier.await();
                    System.out.println("thread1 step3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });

        pool.submit(new Runnable(){
            @Override
            public void run() {
                try {
                    System.out.println("thread2 step1");
                    //线程2入trip阻塞队列,state--
                    //与线程1的step1一起导致state == 0,越过屏障唤醒两个线程,state重新设置为2后续逻辑一致
                    cyclicBarrier.await();
                    System.out.println("thread2 step2");
                    cyclicBarrier.await();
                    System.out.println("thread2 step3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });
        pool.shutdown();
    }
}

三、Semaphore

场景:

信号量同步器设计类似于CountDownLatch,不同的是计数器是递增的

Semaphore不仅实现了一个

技术图片

 

Java并发包中线程并发器

标签:catch   不同   map   个数   index   inf   wait   get   extend   

原文地址:https://www.cnblogs.com/wqff-biubiu/p/12268512.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!