标签:
(接上文:《线程基础:线程池(5)——基本使用(上)》)
我们先来总结一下上文中讨论过的内容,首先就是JAVA中ThreadPoolExecutor类的继承结构。如下图所示:
ThreadPoolExecutor:这个线程池就是我们这两篇文章中介绍的重点线程池实现。程序员可以通过这个线程池中的submit()方法或者execute()方法,执行所有实现了Runnable接口或者Callable接口的任务;ThreadPoolExecutor对于这些任务的执行是立即的、一次性的。
ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor线程池和ThreadPoolExecutor线程池的执行特点是不一样的,它是一个用来执行延迟任务、定时任务或者周期性任务的线程池。一般情况下,我们用它可以处理定时计算、周期性统计一类的任务。
ForkJoinPool:ScheduledThreadPoolExecutor和ThreadPoolExecutor都是在JDK1.5版本中提供的。在JDK1.7中,JAVA为我们提供了一种新的线程池ForkJoinPool以及配套的任务定义ForkJoinTask。除了可以执行实现了Runnable接口或者Callable接口的任务以外,ForkJoinPool还可以执行集成了ForkJoinTask定义的任务。ForkJoinPool的执行原理和ThreadPoolExecutor的执行原理是不一样的,我们将在专栏后续的文章中,专门讨论ForkJoinPool线程池。
我们继续讨论ThreadPoolExecutor线程池。上文我们给出的最简单的ThreadPoolExecutor线程池的使用方式中,我们只采用了ThreadPoolExecutor最简单的一个构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
实际上ThreadPoolExecutor线程池有很多种构造函数,其中最复杂的一种构造函数是:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
在上文中我们还没有介绍的workQueue、threadFactory和handler参数,将是本文讲解的重点。
线程池最主要的一项工作,就是在满足某些条件的情况下创建线程。而在ThreadPoolExecutor线程池中,创建线程的工作交给ThreadFactory来完成。要使用线程池,就必须要指定ThreadFactory。
类似于上文中,如果我们使用的构造函数时并没有指定使用的ThreadFactory,这个时候ThreadPoolExecutor会使用一个默认的ThreadFactory:DefaultThreadFactory。(这个类在Executors工具类中)
根据我个人观察,Executors工具类和ThreadPoolExecutor类存在循环依赖:ThreadPoolExecutor中使用了Executors工具类中定义的DefaultThreadFactory;而在Executors工具类中却又在创建ThreadPoolExecutor的对象实例。不清楚Doug Lea是故意未知呢,还是一个设计缺陷。
当然,在某些特殊业务场景下,您还可以使用一个自定义的ThreadFactory线程工厂,如下代码片段:
package test.thread.pool;
import java.util.concurrent.ThreadFactory;
/**
* 测试自定义的一个线程工厂
* @author yinwenjie
*/
public class TestThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
// do something before new thread created;
// create new thread , and return
return new Thread(r);
}
}
在使用ThreadPoolExecutor线程池的时候,需要指定一个实现了BlockingQueue接口的任务等待队列。在ThreadPoolExecutor线程池的API文档中,一共推荐了三种等待队列,它们是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;但通过观察BlockingQueue接口的实现情况,您可以发现,能够直接使用的等待队列还有:PriorityBlockingQueue、LinkedBlockingDeque和LinkedTransferQueue。
但是在实际应用中,队列中的元素有可能不是以“进入的顺序”为排序依据的。例如我们将要讲到的PriorityBlockingQueue队列。
“是这样 一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。”尼玛,各位读者看懂了吗?好吧,我抄网上的。下面我用白话翻译一下:这是一个内部没有任何容量的阻塞队列,任何一次插入操作的元素都要等待相对的删除/读取操作,否则进行插入操作的线程就要一直等待,反之亦然。
SynchronousQueue<Object> queue = new SynchronousQueue<Object>();
// 不要使用add,因为这个队列内部没有任何容量,所以会抛出异常“IllegalStateException”
// queue.add(new Object());
// 操作线程会在这里被阻塞,直到有其他操作线程取走这个对象
queue.put(new Object());
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
// 我们创建了一个ArrayBlockingQueue,并且设置队列空间为2
ArrayBlockingQueue<Object> arrayQueue = new ArrayBlockingQueue<Object>(2);
// 插入第一个对象
arrayQueue.put(new Object());
// 插入第二个对象
arrayQueue.put(new Object());
// 插入第三个对象时,这个操作线程就会被阻塞。
arrayQueue.put(new Object());
// 请不要使用add操作,和SynchronousQueue的add操作一样,它们都使用了AbstractQueue中的add实现
LinkedBlockingQueue是我们在ThreadPoolExecutor线程池中常应用的等待队列。它可以指定容量也可以不指定容量。由于它具有“无限容量”的特性,所以我还是将它归入了无限队列的范畴(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)。
LinkedBlockingQueue的实现是基于链表结构,而不是类似ArrayBlockingQueue那样的数组。但实际使用过程中,您不需要关心它的内部实现,如果您指定了LinkedBlockingQueue的容量大小,那么它反映出来的使用特性就和ArrayBlockingQueue类似了。
LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>(2);
linkedQueue.put(new Object());
// 插入第二个对象
linkedQueue.put(new Object());
// 插入第三个对象时,这个操作线程就会被阻塞。
linkedQueue.put(new Object());
=======================================
// 或者如下使用:
LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>();
linkedQueue.put(new Object());
// 插入第二个对象
linkedQueue.put(new Object());
// 插入第N个对象时,都不会阻塞
linkedQueue.put(new Object());
LinkedBlockingDeque是一个基于链表的双端队列。LinkedBlockingQueue的内部结构决定了它只能从队列尾部插入,从队列头部取出元素;但是LinkedBlockingDeque既可以从尾部插入/取出元素,还可以从头部插入元素/取出元素。
LinkedBlockingDeque<TempObject> linkedDeque = new LinkedBlockingDeque<TempObject>();
// push ,可以从队列的头部插入元素
linkedDeque.push(new TempObject(1));
linkedDeque.push(new TempObject(2));
linkedDeque.push(new TempObject(3));
// poll , 可以从队列的头部取出元素
TempObject tempObject = linkedDeque.poll();
// 这里会打印 tempObject.index = 3
System.out.println("tempObject.index = " + tempObject.getIndex());
// put , 可以从队列的尾部插入元素
linkedDeque.put(new TempObject(4));
linkedDeque.put(new TempObject(5));
// pollLast , 可以从队列尾部取出元素
tempObject = linkedDeque.pollLast();
// 这里会打印 tempObject.index = 5
System.out.println("tempObject.index = " + tempObject.getIndex());
PriorityBlockingQueue是一个按照优先级进行内部元素排序的无限队列。存放在PriorityBlockingQueue中的元素必须实现Comparable接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部,优先级最低的元素将始终排在队列的尾部;PriorityBlockingQueue不会保证优先级一样的元素的排序。
PriorityBlockingQueue<TempObject> priorityQueue = new PriorityBlockingQueue<TempObject>();
priorityQueue.put(new TempObject(-5));
priorityQueue.put(new TempObject(5));
priorityQueue.put(new TempObject(-1));
priorityQueue.put(new TempObject(1));
// 第一个元素是5
TempObject targetTempObject = priorityQueue.poll();
System.out.println("tempObject.index = " + targetTempObject.getIndex());
// 第二个元素是1
targetTempObject = priorityQueue.poll();
System.out.println("tempObject.index = " + targetTempObject.getIndex());
// 第三个元素是-1
targetTempObject = priorityQueue.poll();
System.out.println("tempObject.index = " + targetTempObject.getIndex());
// 第四个元素是-5
targetTempObject = priorityQueue.poll();
System.out.println("tempObject.index = " + targetTempObject.getIndex());
============================================================================
// 这个元素类,必须实现Comparable接口
private static class TempObject implements Comparable<TempObject> {
private int index;
public TempObject(int index) {
this.index = index;
}
/**
* @return the index
*/
public int getIndex() {
return index;
}
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(TempObject o) {
return o.getIndex() - this.index;
}
}
LinkedTransferQueue也是一个无限队列,它除了具有一般队列的操作特性外(先进先出),还具有一个阻塞特性:LinkedTransferQueue可以由一对生产者/消费者线程进行操作,当消费者将一个新的元素插入队列后,消费者线程将会一直等待,直到某一个消费者线程将这个元素取走,反之亦然。
LinkedTransferQueue的操作特性可以由下面这段代码提现。在下面的代码片段中,有两中类型的线程:生产者和消费者,这两类线程互相等待对方的操作:
/**
* 生产者线程
* @author yinwenjie
*/
private static class ProducerRunnable implements Runnable {
private LinkedTransferQueue<TempObject> linkedQueue;
public ProducerRunnable(LinkedTransferQueue<TempObject> linkedQueue) {
this.linkedQueue = linkedQueue;
}
@Override
public void run() {
for(int index = 1 ; ; index++) {
try {
// 向LinkedTransferQueue队列插入一个新的元素
// 然后生产者线程就会等待,直到有一个消费者将这个元素从队列中取走
this.linkedQueue.transfer(new TempObject(index));
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
}
/**
* 消费者线程
* @author yinwenjie
*/
private static class ConsumerRunnable implements Runnable {
private LinkedTransferQueue<TempObject> linkedQueue;
public ConsumerRunnable(LinkedTransferQueue<TempObject> linkedQueue) {
this.linkedQueue = linkedQueue;
}
@Override
public void run() {
Thread currentThread = Thread.currentThread();
while(!currentThread.isInterrupted()) {
try {
// 等待,直到从LinkedTransferQueue队列中得到一个元素
TempObject targetObject = this.linkedQueue.take();
System.out.println("线程(" + currentThread.getId() + ")取得targetObject.index = " + targetObject.getIndex());
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
}
......
===============================以下是启动代码:
LinkedTransferQueue<TempObject> linkedQueue = new LinkedTransferQueue<TempObject>();
// 这是一个生产者线程
Thread producerThread = new Thread(new ProducerRunnable(linkedQueue));
// 这里有两个消费者线程
Thread consumerRunnable1 = new Thread(new ConsumerRunnable(linkedQueue));
Thread consumerRunnable2 = new Thread(new ConsumerRunnable(linkedQueue));
// 开始运行
producerThread.start();
consumerRunnable1.start();
consumerRunnable2.start();
// 这里只是为了main不退出,没有任何演示含义
Thread currentThread = Thread.currentThread();
synchronized (currentThread) {
currentThread.wait();
}
......
在ThreadPoolExecutor线程池中还有一个重要的接口:RejectedExecutionHandler。当提交给线程池的某一个新任务无法直接被线程池中“核心线程”直接处理,又无法加入等待队列,也无法创建新的线程执行;又或者线程池已经调用shutdown()方法停止了工作;又或者线程池不是处于正常的工作状态;这时候ThreadPoolExecutor线程池会拒绝处理这个任务,触发您创建ThreadPoolExecutor线程池时定义的RejectedExecutionHandler接口的实现:
New tasks submitted in method execute will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution method of its RejectedExecutionHandler. Four predefined handler policies are provided
您在创建ThreadPoolExecutor线程池时,一定会指定RejectedExecutionHandler接口的实现。如果您调用的是不需要指定RejectedExecutionHandler接口的构造函数,如:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
那么ThreadPoolExecutor线程池在创建时,会使用一个默认的RejectedExecutionHandler接口实现,源代码片段如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
......
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
......
// 可以看到,ThreadPoolExecutor中的两个没有指定RejectedExecutionHandler
// 接口的构造函数,都是使用了一个RejectedExecutionHandler接口的默认实现:AbortPolicy
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
......
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
......
}
实际上,在ThreadPoolExecutor中已经提供了四种可以直接使用的RejectedExecutionHandler接口的实现:
这个拒绝处理器,将直接运行这个任务的run方法。但是,请注意并不是在ThreadPoolExecutor线程池中的线程中运行,而是直接调用这个任务实现的run方法。源代码如下:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller‘s thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
这个处理器,在任务被拒绝后会创建一个RejectedExecutionException异常并抛出。这个处理过程也是ThreadPoolExecutor线程池默认的RejectedExecutionHandler实现:
A handler for rejected tasks that throws a RejectedExecutionException.
DiscardPolicy处理器,将会默默丢弃这个被拒绝的任务,不会抛出异常,也不会通过其他方式执行这个任务的任何一个方法,更不会出现任何的日志提示。
A handler for rejected tasks that silently discards the rejected task.
这个处理器很有意思。它会检查当前ThreadPoolExecutor线程池的等待队列。并调用队列的poll()方法,将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
......
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
......
}
实际上查阅这四种ThreadPoolExecutor线程池自带的拒绝处理器实现,您可以发现CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy处理器针对被拒绝的任务并不是一个很好的处理方式。
CallerRunsPolicy在非线程池以外直接调用任务的run方法,可能会造成线程安全上的问题;DiscardPolicy默默的忽略掉被拒绝任务,也没有输出日志或者提示,开发人员不会知道线程池的处理过程出现了错误;DiscardOldestPolicy中e.getQueue().poll()的方式好像是科学的,但是如果等待队列出现了容量问题,大多数情况下就是这个线程池的代码出现了BUG。最科学的的还是AbortPolicy提供的处理方式:抛出异常,由开发人员进行处理。
(接下文,好吧我承认篇幅又没有控制好)
标签:
原文地址:http://blog.csdn.net/yinwenjie/article/details/50577325