码迷,mamicode.com
首页 > 编程语言 > 详细

java多线程基本概述(二十二)——CountDownLatch(2017-04-20 18:54)

时间:2017-04-21 00:24:04      阅读:254      评论:0      收藏:0      [点我收藏+]

标签:one   ace   oca   ast   pos   --   阻塞   let   throw   

它被用来同步一个或者多个任务,轻质它们等待由其他任务执行的一组操作完成。

你可以向 CountDownLatch 对象设置一个初始计数值,任何在这个对象上调用  await()  的方法都将阻塞,直到这个计数值为0。其他任务在结束其工作时,可以在该对象上调用 countDown() 来减小这个数值,这个方法不会阻塞线程。 CountDownLatch 被设计为只触发一次,计数值不能被重置。如果你需要能够重置计数值的版本,则可以使用 CyclicBarrier .

 CountDownLatch  的典型用法时将一个程序分成n个互相独立的可解决任务,并创建值为n的CountDownLatch。每当任务完成时,都会在这个锁存储器上调用countDown()方法。等待问题被解决的任务在这个锁存储器上调用await()。将它们自己拦住,直到存储器计数结束。

public class CountDownLatch
extends Object
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.
A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
A useful property of a CountDownLatch is that it doesn‘t require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.
Sample usage: Here is a pair of classes in which a group of worker threads use two countdown latches:
The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed;
The second is a completion signal that allows the driver to wait until all workers have completed.
   class Driver { // ...
    void main() throws InterruptedException {
      CountDownLatch startSignal = new CountDownLatch(1);
      CountDownLatch doneSignal = new CountDownLatch(N);
 
      for (int i = 0; i < N; ++i) // create and start threads
        new Thread(new Worker(startSignal, doneSignal)).start();
 
      doSomethingElse();            // don‘t let run yet
      startSignal.countDown();      // let all threads proceed
      doSomethingElse();
      doneSignal.await();           // wait for all to finish
    }
  }
 
  class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
    }
    public void run() {
      try {
        startSignal.await();
        doWork();
        doneSignal.countDown();
      } catch (InterruptedException ex) {} // return;
    }
 
    void doWork() { ... }
  }
Another typical usage would be to divide a problem into N parts, describe each part with a Runnable that executes that portion and counts down on the latch, and queue all the Runnables to an Executor. When all sub-parts are complete, the coordinating thread will be able to pass through await. (When threads must repeatedly count down in this way, instead use a CyclicBarrier.)
   class Driver2 { // ...
    void main() throws InterruptedException {
      CountDownLatch doneSignal = new CountDownLatch(N);
      Executor e = ...
 
      for (int i = 0; i < N; ++i) // create and start threads
        e.execute(new WorkerRunnable(doneSignal, i));
 
      doneSignal.await();           // wait for all to finish
    }
  }
 
  class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;
    WorkerRunnable(CountDownLatch doneSignal, int i) {
      this.doneSignal = doneSignal;
      this.i = i;
    }
    public void run() {
      try {
        doWork(i);
        doneSignal.countDown();
      } catch (InterruptedException ex) {} // return;
    }
 
    void doWork() { ... }
  }
public class CountDownLatch
extends Object
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。

示例用法: 下面给出了两个类,其中一组 worker 线程使用了两个倒计数锁存器:

第一个类是一个启动信号,在 driver 为继续执行 worker 做好准备之前,它会阻止所有的 worker 继续执行。
第二个类是一个完成信号,它允许 driver 在完成所有 worker 之前一直等待。
 class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don‘t let run yet
     startSignal.countDown();      // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
   }
   public void run() {
      try {
        startSignal.await();
        doWork();
        doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

 
另一种典型用法是,将一个问题分成 N 个部分,用执行每个部分并让锁存器倒计数的 Runnable 来描述每个部分,然后将所有 Runnable 加入到 Executor 队列。当所有的子部分完成后,协调线程就能够通过 await。(当线程必须用这种方法反复倒计数时,可改为使用 CyclicBarrier。)

 class Driver2 { // ...
   void main() throws InterruptedException {
     CountDownLatch doneSignal = new CountDownLatch(N);
     Executor e = ...

     for (int i = 0; i < N; ++i) // create and start threads
       e.execute(new WorkerRunnable(doneSignal, i));

     doneSignal.await();           // wait for all to finish
   }
 }

 class WorkerRunnable implements Runnable {
   private final CountDownLatch doneSignal;
   private final int i;
   WorkerRunnable(CountDownLatch doneSignal, int i) {
      this.doneSignal = doneSignal;
      this.i = i;
   }
   public void run() {
      try {
        doWork(i);
        doneSignal.countDown();
      } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

 
package tij;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by HuAoxiang on 2017/4/1 .
 *
 */


class TaskPortion implements Runnable{

    private static int counter = 0;
    private final int id = counter++;
    private static Random random = new Random(47);
    private final  CountDownLatch countDownLatch;

    TaskPortion(CountDownLatch latch) {
        this.countDownLatch = latch;
    }

    @Override
    public String toString() {
        return "working "+id;
    }

    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            System.out.println(this+" completed!");
            this.countDownLatch.countDown();
        } catch (InterruptedException e) {
            System.out.println("working  interrupted!");
            e.printStackTrace();
        }
    }
}

class WaitTask implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch countDownLatch;
    WaitTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            this.countDownLatch.await();
            System.out.println("task waiting passed......"+this);
        } catch (InterruptedException e) {
            System.out.println("task waiting interrupted!"+this);
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return ""+id;
    }
}
public class Test2 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for(int i = 0;i<10;i++)
            service.execute(new WaitTask(countDownLatch));
        for(int i = 0;i<100;i++)
            service.execute(new TaskPortion(countDownLatch));
        service.shutdown();
    }
}

输出结果:

package tij;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by HuAoxiang on 2017/4/1 .
 *
 */


class TaskPortion implements Runnable{

    private static int counter = 0;
    private final int id = counter++;
    private static Random random = new Random(47);
    private final  CountDownLatch countDownLatch;

    TaskPortion(CountDownLatch latch) {
        this.countDownLatch = latch;
    }

    @Override
    public String toString() {
        return "working "+id;
    }

    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            System.out.println(this+" completed!");
            this.countDownLatch.countDown();
        } catch (InterruptedException e) {
            System.out.println("working  interrupted!");
            e.printStackTrace();
        }
    }
}

class WaitTask implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch countDownLatch;
    WaitTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            this.countDownLatch.await();
            System.out.println("task waiting passed......"+this);
        } catch (InterruptedException e) {
            System.out.println("task waiting interrupted!"+this);
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return ""+id;
    }
}
public class Test2 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for(int i = 0;i<10;i++)
            service.execute(new WaitTask(countDownLatch));
        for(int i = 0;i<100;i++)
            service.execute(new TaskPortion(countDownLatch));
        service.shutdown();
    }
}

 

java多线程基本概述(二十二)——CountDownLatch(2017-04-20 18:54)

标签:one   ace   oca   ast   pos   --   阻塞   let   throw   

原文地址:http://www.cnblogs.com/soar-hu/p/6741291.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!