码迷,mamicode.com
首页 > 其他好文 > 详细

(五)新类库的构件

时间:2016-10-23 23:01:26      阅读:210      评论:0      收藏:0      [点我收藏+]

标签:his   list   title   put   开始   不能   version   except   next   

CountDownLatch

public class CountDownLatch
extends Object

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

A useful property of a CountDownLatch is that it doesn‘t require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.

用来同步一个或多个任务,强制它们等待其他任务执行的一组操作完成。可以这么理解,有两组任务A,B。A的多个任务等待B组的所有任务结束才能执行。

CountDownLatch对象设置一个初始的值。A中的任务执行前先调用CountDownLatch.await()方法将当前任务阻塞,当CountDownLatch的值为0时才能进行下去。B中的每个任务执行完都调用CountDownLatch.countDown()来减小计数值。这样就可以保证B中的任务可以同时进行,当B的任务全部结束,A的任务才可以开始。

CountDownLatch对象的计数值不能被再次重置,只能使用一次。想要重置,使用CyclicBarrier

public class CountDownLatchDemo {
    static final int SIZE = 10;
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(SIZE);
        
        for(int i=0;i<10;i++){
            exec.execute(new A(latch));
        }
        
        for(int i=0;i<10;i++){
            exec.execute(new B(latch));
        }
        
        System.out.println("Launched all tasks");
        exec.shutdown();
    }
    
}

class B implements Runnable{
    private static int counter =0;
    private final int id = counter++;
    private static Random random = new Random(47);
    private final CountDownLatch latch;
    public B(CountDownLatch latch){
        this.latch=latch;    
    }
    
    public void run(){
        try{
            doWork();
            latch.countDown();//B中完成一次任务,计数值减1
        }catch(InterruptedException e){
            e.printStackTrace();
        }
    }

    private void doWork() throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(random.nextInt(10));
        System.out.println(this+" completed");
    }
    public String toString(){
        return String.format("B %1$-3d", id);
    }
}

class A implements Runnable{
    private static int counter =0;
    private final int id = counter++;
    private static Random random = new Random(57);
    private final CountDownLatch latch;
    public A(CountDownLatch latch){
        this.latch=latch;
    }
    @Override
    public void run() {
        try{
            latch.await();//等待计数值为0,在这之前都处于阻塞状态
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1));
            System.out.println("Latch barrier passedd for "+this);
        }catch(InterruptedException e){
            e.printStackTrace();
        }
    }    
    public String toString(){
        return String.format("A %1$-3d", id);
    }
}

输出:
Launched all tasks
B 3   completed
B 2   completed
B 6   completed
B 5   completed
B 0   completed
B 7   completed
B 1   completed
B 4   completed
B 9   completed
B 8   completed
Latch barrier passedd for A 0  
Latch barrier passedd for A 3  
Latch barrier passedd for A 2  
Latch barrier passedd for A 1  
Latch barrier passedd for A 4  
Latch barrier passedd for A 5  
Latch barrier passedd for A 6  
Latch barrier passedd for A 7  
Latch barrier passedd for A 8  
Latch barrier passedd for A 9 

CyclicBarrier

试想有一个任务,把它分割成多个子任务交给不同的线程去做,等它们都完成后,再执行下一个步骤。这时可以用CyclicBarrier对象。

初始化CyclicBarrier对象时需要指定任务的数目n,当有n个线程对同一个CyclicBarrier对象的await方法调用并进入阻塞的话,才算达到栅栏点。

以下程序演示对矩阵的每个元素求平方,我们用5个线程分别处理5行数据。等到5个线程都完成工作后,将处理好的矩阵打印输出。

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int[][] matrix={{1,1,1,1,1},{2,2,2,2,2},{3,3,3,3,3},{4,4,4,4,4},{5,5,5,5,5}};
        List<int[]> list = new ArrayList<int[]>();//把矩阵每一行放在list里
        for(int i=0;i<matrix.length;i++){list.add(matrix[i]);}
        ExecutorService exec = Executors.newCachedThreadPool();
        CyclicBarrier barrier = new CyclicBarrier(5,new Runnable(){
            public void run() {
                System.out.println("Solver are all completed");
                //打印处理后的矩阵
                for(int i=0;i<matrix.length;i++){
                    System.out.println(Arrays.toString(list.get(i)));
                }
            }
        });
        for(int i=0;i<5;i++){
            exec.execute(new Solver(barrier,list.get(i)));
        }
    }
}
class Solver implements Runnable{
    private int[] row;
    private static int count=0;
    public Random random = new Random(47);
    private final int id = count++;
    private CyclicBarrier barrier;
    public Solver(CyclicBarrier barrier,int[] row){
        this.barrier=barrier;
        this.row=row;
    }
    public void run(){
        try {
            //任务开始
            int length = row.length;
            for(int i=0;i<length;i++){
                row[i]=row[i]*row[i];
            }
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            System.out.println("Solver "+id+" completed");
            //任务结束,到达栅栏点
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
输出:
Solver 1 completed
Solver 2 completed
Solver 0 completed
Solver 3 completed
Solver 4 completed
Solver are all completed
[1, 1, 1, 1, 1]
[4, 4, 4, 4, 4]
[9, 9, 9, 9, 9]
[16, 16, 16, 16, 16]
[25, 25, 25, 25, 25]

DelayQueue

这是一个无界的BlockingQueue,里面放置Delayed对象,其中的对象只能在其到期后才能被取走。如果队列中无到期的对象,则等待。该队列是有序队列,队首是最先过期的那个对象。因此该队列内部的对象需要比较各自的过期时间,故对象必须实现Delayed接口,即实现两个方法compareTo()和getDelay()。一个是用于比较对象之间的时间先后,一个用于获取对象的过期时间。

public class DelayQueueDemo {
    public static void main(String[] args) {
        DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
        ExecutorService exec = Executors.newCachedThreadPool();
        Random random = new Random(48);
        for(int i=0;i<20;i++)
            queue.put(new DelayedTask(random.nextInt(1000)));//把所有具有延迟到期功能的对象放在DelayQueue对列里
        exec.execute(new DelayedTaskConsumer(queue));
        queue.put(new DelayedTask(500));
    }
}
//具有延迟到期功能的任务
class DelayedTask implements Runnable,Delayed{
    private static int count=0;
    private final int id = count++;
    private final int delta;
    private final long trigger;
    public DelayedTask(int delayMilliseconds){
        delta = delayMilliseconds;
        trigger = System.nanoTime()+TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        DelayedTask that = (DelayedTask) o;
        if(trigger<that.trigger) return -1;
        if(trigger>that.trigger) return 1;
        return 0;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        //返回剩余时间
        return TimeUnit.NANOSECONDS.convert(System.nanoTime()-trigger,TimeUnit.NANOSECONDS);
    }
    @Override
    public void run() {
        System.out.println("DelayedTask delayTime ["+delta+"] "+"is running");
    }    
}
class DelayedTaskConsumer implements Runnable{
    private DelayQueue<DelayedTask> queue ;
    public DelayedTaskConsumer(DelayQueue<DelayedTask> queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        try{
            while(!Thread.interrupted()){queue.take().run();}//取出最先过期的对象,并操作该对象,这里执行了对象的run方法
        }catch(InterruptedException e){e.printStackTrace();}
    }    
}
输出:
DelayedTask delayTime [100] is running
DelayedTask delayTime [140] is running
DelayedTask delayTime [183] is running
DelayedTask delayTime [244] is running
DelayedTask delayTime [316] is running
DelayedTask delayTime [368] is running
DelayedTask delayTime [522] is running
DelayedTask delayTime [562] is running
DelayedTask delayTime [569] is running
DelayedTask delayTime [703] is running
DelayedTask delayTime [794] is running
DelayedTask delayTime [804] is running
DelayedTask delayTime [831] is running
DelayedTask delayTime [877] is running
DelayedTask delayTime [911] is running
DelayedTask delayTime [926] is running
DelayedTask delayTime [972] is running
DelayedTask delayTime [982] is running
DelayedTask delayTime [984] is running
DelayedTask delayTime [987] is running

PriorityBlockingQueue

优先级队列,它具有可阻塞的读取操作。该队列总是取出优先级最高的对象。优先级的比较由队列内对象的compareTo方法比较,故对象应实现Comparable接口。

public class PriorityBlockingQueueDemo {

    public static void main(String[] args) {
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
        ExecutorService exec = Executors.newCachedThreadPool();
        Random random = new Random(48);
        for(int i=0;i<20;i++)
            queue.put(new PriorityTask(random.nextInt(1000)));
        exec.execute(new PriorityTaskConsumer(queue));
    }

}
class PriorityTask implements Runnable,Comparable{
    private static int count=0;
    private final int id = count++;
    private final int priority;
    public PriorityTask(int priority){
        this.priority=priority;
    }
    @Override
    public int compareTo(Object o) {
        PriorityTask that = (PriorityTask) o;
        if(this.priority<that.priority) return -1;
        if(this.priority>that.priority) return 1;
        return 0;
    }
    @Override
    public void run() {
        System.out.println("PriorityTask priority ["+priority+"] is runnig");
    }
    
}
class PriorityTaskConsumer implements Runnable{
    private PriorityBlockingQueue<Runnable> queue;
    Random random = new Random(28);
    public PriorityTaskConsumer(PriorityBlockingQueue<Runnable> queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        try{
            while(!Thread.interrupted()){
                queue.take().run();
                TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            }
        }catch(InterruptedException e){e.printStackTrace();}
    }    
}
输出:
PriorityTask priority [100] is runnig
PriorityTask priority [140] is runnig
PriorityTask priority [183] is runnig
PriorityTask priority [244] is runnig
PriorityTask priority [316] is runnig
PriorityTask priority [368] is runnig
PriorityTask priority [522] is runnig
PriorityTask priority [562] is runnig
PriorityTask priority [569] is runnig
PriorityTask priority [703] is runnig
PriorityTask priority [794] is runnig
PriorityTask priority [804] is runnig
PriorityTask priority [831] is runnig
PriorityTask priority [877] is runnig
PriorityTask priority [911] is runnig
PriorityTask priority [926] is runnig
PriorityTask priority [972] is runnig
PriorityTask priority [982] is runnig
PriorityTask priority [984] is runnig
PriorityTask priority [987] is runnig

 

(五)新类库的构件

标签:his   list   title   put   开始   不能   version   except   next   

原文地址:http://www.cnblogs.com/wuchaodzxx/p/5991125.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!