标签:
线程的创建
```
/* What will be run. */
private Runnable target;
@Override
public void run() {
if (target != null) {
target.run();
}
}
```
(1) 实现Runnable接口:new Thread(Runnable).start();
(2) 继承Thread类:重写run方法,new Thread(){run()}.start();
编写简单,可直接操纵线程,无需使用Thread.currentThread()。
线程池(ThreadPool)
(1) 使用线程池的好处
(2) 线程池的创建:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
注意:线程池的创建在底层使用的都是ThreadPoolExecutor的构造方法:
handler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
(3)线程池提交任务的方法
我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
```
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}
```
(4) 关闭线程池
(5)线程池分析
当提交一个新任务到线程池时,线程池的处理流程如下:
1)首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
2) 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
3) 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
源码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果线程数小于基本线程数,则创建线程并执行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
则创建一个线程执行任务。
else if (!addIfUnderMaximumPoolSize(command))
//抛出RejectedExecutionException异常
reject(command); // is shutdown or saturated
}
}
工作线程。线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从Worker的run方法里看到这点:
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
从任务中产生返回值
Runnable是执行工作的独立任务,但是它不产生返回值。实现Callable接口可产生返回值,是一种具有类型参数的泛型,它的参数表示的是从放大call()中返回的值,并且必须使用ExecutorService.submit()方法调用它。例子如下:
package com.xqq.Callable;
import java.util.concurrent.Callable;
public class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() throws Exception {
return "result of TaskWithResult " + id;
}
}
package com.xqq.Callable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<String>> list = new ArrayList<Future<String>>();
for(int i = 0; i < 10; i++){
list.add(exec.submit(new TaskWithResult(i)));
}
for(Future<String> fs: list){
try {
System.out.println(fs.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
exec.shutdown();
}
}
}
}
运行结果:
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
submit()方法会产生Future对象,它用Callable但会结果的特定类型进行参数化。可以通过isDone()来查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。也可以不用isDone()进行检查就直接get(),在这种情况下,get()将阻塞,直到结果准备就绪。
后台线程
join()
一个线程可以调用其他线程的join()方法,其效果是等待其他线程结束才继续执行。如果某个线程调用t.join(),此线程将被挂起,直到目标线程t结束才恢复(即t.isAlive()为假)。
对join()方法的调用可以被中断,做法就是在调用线程上调用interrupt()方法,即t.interrupt()。例子如下:
package com.xqq.join;
public class Joiner extends Thread{
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper){
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch (Exception e) {
System.out.println("Interupted");
}
System.out.println(getName() + " join completed");
}
}
package com.xqq.join;
public class Sleeper extends Thread{
private int duration;
public Sleeper(String name, int sleeperTime){
super(name);
duration = sleeperTime;
start();
}
public void run() {
try {
sleep(duration);
} catch (Exception e) {
System.out.println(getName() + " was interrupted. " + "isInterupted: " + isInterrupted());
return;
}
System.out.println(getName() + " has awakened");
}
}
package com.xqq.join;
/**
* 如果某一个线程在另一个线程t上调用t.join(),此线程将被挂起,直到线程t运行结束
* 对join方法的调用可以被中断,在调用线程上调用interrupt()方法。
* @author xqq
*/
public class Joining {
public static void main(String[] args) {
Sleeper sleepy = new Sleeper("Sleepy", 1500);
Sleeper grumpy = new Sleeper("Grumpy", 1500);
new Joiner("Dopey", sleepy);
new Joiner("Doc", grumpy);
grumpy.interrupt();
}
}
运行结果:
Grumpy was interrupted. isInterupted: false
Doc join completed
Sleepy has awakened
Dopey join completed
在catch子句中,将根据isInterrupted()的返回值报告这个中断,当另一个线程在该线程上调用interrupt()时,将给该线程设定一个标志,表明该线程已经被中断。然而,异常被捕获时将清理这个标志,所以在catch子句中,在异常被捕获的时候这个标志总是为假。
捕获异常
由于线程的本质特性,使得不能捕获从线程中逃逸的异常。一旦异常逃出任务的run()方法,就会向外传向控制台,在JavaSE5之前,可以使用线程组来捕获这些异常。现在可以修改Executor产生线程的方式。Thread.UncaughtExceptionHandler是Java SE5中的新接口,它允许在每个Thread对象上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用。可以创建一个新类型的ThreadFactory,为每个新创建的Thread对象上附着一个异常处理器。例子如下:
package com.xqq.捕获异常;
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
package com.xqq.捕获异常;
import java.util.concurrent.ThreadFactory;
public class HandlerThreadFactory implements ThreadFactory{
public Thread newThread(Runnable r) {
System.out.println(this + " creating new thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
package com.xqq.捕获异常;
public class ExceptionThread implements Runnable{
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
package com.xqq.捕获异常;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 捕获异常:修改Executor产生线程的方式
* 为每一个线程绑定一个异常处理器,Thread.UncaughExceptionHandler
* @author xqq
*/
public class CaptureUncaughtExcception {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread());
}
}
运行结果:
com.xqq.捕获异常.HandlerThreadFactory@7852e922 creating new thread
created Thread[Thread-0,5,main]
eh = com.xqq.捕获异常.MyUncaughtExceptionHandler@4e25154f
run() by Thread[Thread-0,5,main]
eh = com.xqq.捕获异常.MyUncaughtExceptionHandler@4e25154f
com.xqq.捕获异常.HandlerThreadFactory@7852e922 creating new thread
created Thread[Thread-1,5,main]
eh = com.xqq.捕获异常.MyUncaughtExceptionHandler@34637630
caught java.lang.RuntimeException
运用Brian同步规则:如果你正在写一个变量,它可能接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个写过的变量,那么你必须使用同步,并且读写线程必须使用相同的监视器锁同步。
同步分为两种,一种是多个线程共享同一个资源,该资源同一时刻只能让一个线程访问,对共享资源的互斥访问,利用synchronized关键字或者Lock等实现。另一种是多个线程协作解决某一个问题,某个线程必须在另一个线程之前或者之后完成,线程之间的顺序问题,利用wait()和notify()实现。
使用线程来同时运行多个任务时,可以通过使用锁来同步两个任务的行为,从而使得一个任务不会干涉另一个任务的资源。也就是两项任务在交替着步入某项资源,可以使用互斥来使得任何时刻只有一个任务可以访问这项资源。
关键字synchronized,为防止资源冲突提供内置支持。
synchronized的使用方法:不可以修饰域,只能锁定对象或者方法
synchronized void f(){}
synchronized void g(){}
synchronized (Object){}
如果 某个任务对对象调用了f(), 对于同一个对象而言,就只能等到f()调用结束并释放锁之后,其他任务才能调用f(),g()。
使用显示的Lock锁:Lock对象必须被显示的创建、锁定和释放。因此与内建锁相比,代码缺乏优雅性。
原子类
Java SE5 引入了诸如AtomicInteger、AtomicLong、AtomicReference等特殊的原子性变量类。
线程本地存储(ThreadLocal)
防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的不同线程都创建不同的存储。如果有5个线程都要使用变量x所表示的对象,那线程本地存储就会生成5个用于x的不同的存储块。ThreadLocal对象通常当做静态域存储。
锁的等级:必须给定一个在其上的进行同步的对象。
yield()让步
当调用yield()时,是在建议具有相同优先级的其它线程可以运行
如何使得多个任务彼此之间可以协作,以使得多个任务可以一起工作去解决某问题,现在的问题是线程之间的彼此协调,因为在这类问题中,某些部分必须在其他部分被解决之前解决。任务间的握手可以通过Object的方法wait()和notify()来安全的实现,Java SE5的并发类库提供了具有await()和signal方法的Condition对象。
wait()、notify()、notifyAll()
wait()与sleep()的区别
为什么使用一个检查感兴趣的条件的while循环包围wait()?
notify()与notifyAll()
使用notify而不是notifyAll是一种优化。当notify()因某个特定的锁而被调用时,只有等待这个锁的任务才会被唤醒。
一个任务进入阻塞状态的原因:
所谓中断就是在Runnable.run()方法的中间打断它。
退出阻塞任务的方法:
Thread类包含了interrupt()方法,因此你可以终止被阻塞的任务,这个方法将设置线程的中断状态。
退出无阻塞任务的方法:
package com.xqq.生产者与消费者之wait和notify实现;
public class Item {
private static int counter;
private final int id = counter++;
@Override
public String toString() {
return "Item " + id;
}
}
package com.xqq.生产者与消费者之wait和notify实现;
import java.util.concurrent.TimeUnit;
public class Producer implements Runnable{
private int delay;
private FlowQueue<Item> output;
public Producer(int delay, FlowQueue<Item> output) {
this.delay = delay;
this.output = output;
}
public void run() {
for(;;){
try {
Item product = new Item();
System.out.println("Product " + product);
output.put(product);
TimeUnit.MILLISECONDS.sleep(delay);
} catch (Exception e) {
return;
}
}
}
}
package com.xqq.生产者与消费者之wait和notify实现;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable{
private int delay;
private FlowQueue<Item> input;
public Consumer(int delay, FlowQueue<Item> input) {
this.delay = delay;
this.input = input;
}
public void run() {
for(;;){
try {
System.out.println("Consumer " + input.get());
TimeUnit.MILLISECONDS.sleep(delay);
} catch (Exception e) {
return;
}
}
}
}
package com.xqq.生产者与消费者之wait和notify实现;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ProducerConsumer {
public static void main(String[] args) throws InterruptedException {
int producerSleep = 200;
int consumerSleep = 200;
FlowQueue<Item> fq = new FlowQueue<Item>(10);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Producer(producerSleep, fq));
exec.execute(new Consumer(consumerSleep, fq));
exec.execute(new Producer(producerSleep, fq));
exec.execute(new Consumer(consumerSleep, fq));
TimeUnit.SECONDS.sleep(2);
exec.shutdownNow();
}
}
运行结果:
Product Item 0
Product Item 1
Consumer Item 0
Consumer Item 1
Product Item 3
Product Item 2
Consumer Item 3
Consumer Item 2
Product Item 4
Consumer Item 4
Product Item 5
Consumer Item 5
Product Item 6
Consumer Item 6
Product Item 7
Consumer Item 7
Product Item 8
Consumer Item 8
Product Item 9
Consumer Item 9
package com.xqq.生产者与消费者之显示Lock和Condition对象;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BufferData<T> {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object [] items;
private int count = 0, putPtr = 0, takePtr = 0;
public BufferData(int size) {
items = new Object[size];
}
public void put(T item) throws InterruptedException{
lock.lock();
try{
while(count == items.length){
notFull.await();
}
items[putPtr++] = item;
if(putPtr == items.length)
putPtr = 0;
count++;
notEmpty.signal();
}finally{
lock.unlock();
}
}
public T take() throws InterruptedException{
lock.lock();
try{
while(count == 0){
notEmpty.await();
}
@SuppressWarnings("unchecked")
T item = (T)items[takePtr ++];
if(takePtr == items.length)
takePtr = 0;
count--;
notFull.signal();
return item;
}finally{
lock.unlock();
}
}
}
package com.xqq.生产者与消费者BlockingQueue实现;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.xqq.生产者与消费者之wait和notify实现.Item;
public class Producer implements Runnable{
private BlockingQueue<Item> sharedQueue;
public Producer(BlockingQueue<Item> sharedQueue) {
this.sharedQueue = sharedQueue;
}
public void run() {
for(; ;){
try {
Item item = new Item();
System.out.println("Produce " + item);
sharedQueue.put(item);
TimeUnit.MILLISECONDS.sleep(200);
} catch (Exception e) {
return ;
}
}
}
}
package com.xqq.生产者与消费者BlockingQueue实现;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.xqq.生产者与消费者之wait和notify实现.Item;
public class Consumer implements Runnable {
private BlockingQueue<Item> sharedQueue;
public Consumer(BlockingQueue<Item> sharedQueue) {
this.sharedQueue = sharedQueue;
}
public void run() {
for (;;) {
try {
System.out.println("Consumer " + sharedQueue.take());
TimeUnit.MILLISECONDS.sleep(200);
} catch (Exception e) {
return;
}
}
}
}
package com.xqq.生产者与消费者BlockingQueue实现;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.xqq.生产者与消费者之wait和notify实现.Item;
public class ProducerConsumerPattern {
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
BlockingQueue<Item> sharedQueue = new LinkedBlockingQueue<Item>();
exec.execute(new Producer(sharedQueue));
exec.execute(new Consumer(sharedQueue));
exec.execute(new Producer(sharedQueue));
exec.execute(new Consumer(sharedQueue));
TimeUnit.SECONDS.sleep(3);
exec.shutdownNow();
}
}
运行结果:
Produce Item 0
Produce Item 1
Consumer Item 0
Consumer Item 1
Produce Item 3
Produce Item 2
Consumer Item 3
Consumer Item 2
Produce Item 4
Produce Item 5
Consumer Item 5
Consumer Item 4
Produce Item 7
Consumer Item 7
Produce Item 6
Consumer Item 6
Produce Item 8
Produce Item 9
Consumer Item 8
Consumer Item 9
某个任务在等待另一个任务,而后者有等待别的任务,这样一直下去,直到这个链条上的任务又在等待第一个任务释放锁。这得到了一个任务之间相互等待的连续循环,没有那一个线程能继续。这称之为死锁。
死锁的四个必要条件
哲学家就餐问题
解决哲学家就餐死锁的办法就是破破坏循环等待条件:前面的哲学家先拿右边的再拿左边的,而最后一个哲学家先拿左边的在拿右边的。
1. CountDownLatch
用来同步一个或多个任务,强制他们等待由其他任务执行的一组操作完成。
可以向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用await()方法都将阻塞,直至这个计数值到达0。其他任务在结束其工作时,可以调用countDown()来减小这个计数值。CountDownLatch被设计为只触发一次,计数值不能重置,如果需要能够重置计数值的版本,则可以使用CyclicBarrier。示例代码:
package com.xqq.新类库中的构件之CountDownLatch;
import java.util.concurrent.CountDownLatch;
public class WaitingTask implements Runnable{
private CountDownLatch latch;
private static int counter = 0;
private final int id = counter++;
public WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch (Exception e) {
return ;
}
}
@Override
public String toString() {
return String.format("%1$-3d", id);
}
}
package com.xqq.新类库中的构件之CountDownLatch;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TaskPortion implements Runnable{
private CountDownLatch latch;
private static int counter = 0;
private final int id = counter++;
private Random rand = new Random(47);
public TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch (Exception e) {
return ;
}
}
private void doWork() throws InterruptedException {
TimeUnit.MICROSECONDS.sleep(rand.nextInt(2000));
System.out.println(this + " completed");
}
@Override
public String toString() {
return String.format("%1$-3d", id);
}
}
package com.xqq.新类库中的构件之CountDownLatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* CountDownLatch:同步一个或者多个任务,将一堆任务分成两类,一类必须在另一类之前完成
* @author xqq
*/
public class CountDownLatchDemo {
private static final int SIZE = 10;
public static void main(String[] args) throws InterruptedException {
//只触发一次,计数值不能被重置
CountDownLatch latch = new CountDownLatch(SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0 ; i < 10; i++){
exec.execute(new WaitingTask(latch));
}
for(int i = 0 ; i < SIZE; i++){
exec.execute(new TaskPortion(latch));
}
TimeUnit.SECONDS.sleep(1);
System.out.println("All Tasks Lauched!");
exec.shutdown();
}
}
运行结果:
0 completed
8 completed
7 completed
5 completed
4 completed
9 completed
6 completed
1 completed
2 completed
3 completed
Latch barrier passed for 0
Latch barrier passed for 1
Latch barrier passed for 2
Latch barrier passed for 3
Latch barrier passed for 4
Latch barrier passed for 5
Latch barrier passed for 6
Latch barrier passed for 7
Latch barrier passed for 8
Latch barrier passed for 9
All Tasks Lauched!
2. CyclicBarrier
适用于这样的情况:你希望创建一组任务,他们并行的工作,然后在进行下一个步骤之前等待,直至所有的任务都完成。它使得所有的并行任务都将在栅栏处列队,因此可以一致的向前移动。赛马游戏代码:
package com.xqq.新类库中的构件之CyclicBarrier;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
public class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private CyclicBarrier barrier;
private int strides = 0;
public Horse(CyclicBarrier barrier) {
this.barrier = barrier;
}
public int getStrides() {
return strides;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
strides += rand.nextInt(3);
}
barrier.await();
}
} catch (Exception e) {
return;
}
}
@Override
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}
package com.xqq.新类库中的构件之CyclicBarrier;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* CyclicBarrier适用于这样的情况:你希望创建一组任务,他们并行工作,然后再进行下一个步骤之前等待,直至所有任务都完成
* 它使得所有任务都在栅栏处列队,因此可以一致的向前移动。
* @author xqq
*/
public class HorseRace {
private static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorse, final int pause){
//当barrier.await()的总数量达到nHorses时, 就会自动触发该任务,并且计数值又重置为mHorse
barrier = new CyclicBarrier(nHorse, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < FINISH_LINE; i++)
s.append("=");
System.out.println(s);
for(Horse horse: horses)
System.out.println(horse.tracks());
for(Horse horse: horses){
if(horse.getStrides() >= FINISH_LINE){
System.out.println(horse + "won !");
exec.shutdownNow();
return ;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (Exception e) {
}
}
});
for(int i = 0; i < nHorse; i++){
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int mHorse = 7;
int pause = 200;
new HorseRace(mHorse, pause);
}
}
3. DelayQueue
这是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,
4. PriorityBlockingQueue
5. ScheduledExecutor
每个期望的时间都是一个在预定时间运行的任务。ShceduledThreadPoolExecutor提供了解决该问题的服务。通过使用schedule()(运行一次任务),或者scheduleAtFixedRate()(每隔规则的时间重复执行任务),可以将Runnable对象设置为在将来的某个时刻执行。
6. Semaphore
正常的锁在任何时刻都只允许一个人物访问一项资源,而计数信号量允许n个任务同时访问这个资源。需要的使用semaphore.acquire()方法申请,用完之后使用semaphore.release()释放。
7. Exchanger
是两个任务之间交换对象的栅栏。每个人在完成一定的事情之后想与对方交换数据,第一个拿出数据的人将一直等待第二个拿着数据到来时,才能彼此交换数据。两个任务通过同一个Exchange对象,分别调用它的exchange(data)方法,将需要交换的数据置于方法中。
CopyOnWiteArrayList
ConcurrenHashMap和ConcurrentLinkedQueue
使用了类似的技术,允许并发的读取和写入,但是容器中只有部分内容而不是整个容器可以被复制和修改。然而,在修改完成之前,读取者仍旧不能看到他们。
ReadWriteLock
对向数据结构相对不频繁的写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。ReadWriteLock使得你可以同时有多个读者,只要他们都不试图写入即可。如果写锁已经被其他任务持有,那么任何读者都不能访问,直至这个写锁被释放为止。
标签:
原文地址:http://blog.csdn.net/chun0801/article/details/51835719