ThreadPoolExecutor介绍:
//构造方法
public ThreadPoolExecutor(int corePoolSize,//核心池的大小
int maximumPoolSize,//线程池最大线程数
long keepAliveTime,//保持时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler) //异常的捕捉器
构造相关参数解释
*
corePoolSize:`核心池的大小`,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
*
maximumPoolSize:`线程池最大线程数`,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
*
keepAliveTime:`表示线程没有任务执行时最多保持多久时间会终止`。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
*
unit:参数keepAliveTime的`时间单位`,有7种取值
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
*
workQueue : `任务队列`,是一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响
如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。
1. 基础API介绍
* __往队列中加元素的方法__
* add(E) : 非阻塞方法, 把元素加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。
* offer(E) : 非阻塞, 表示如果可能的话,将元素加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
* __put(E)__:阻塞方法, 把元素加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。
* __从队列中取元素的方法__
* __poll(time)__: 阻塞方法,取走BlockingQueue里排在首位的元素,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
* __take()__:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。
2. 子类介绍
* `ArrayBlockingQueue(有界队列)`: FIFO 队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小
* `LinkedBlockingQueue(无界队列)`:FIFO 队列,大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。
* `PriorityBlockingQueue`:优先级队列, 类似于LinkedBlockingQueue,但队列中元素非 FIFO, 依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序
* `SynchronousQueue(直接提交策略)`: 交替队列,队列中操作时必须是先放进去,接着取出来,交替着去处理元素的添加和移除
*
threadFactory : `线程工厂`,如何去创建线程的
*
handler : 任务队列添加`异常的捕捉器`,参考 RejectedExecutionHandler
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
基础API的介绍
* isShutdown() : 判断线程池是否关闭
* isTerminated() : 判断线程池中任务是否执行完成
* shutdown() : 调用后不再接收新任务,如果里面有任务,就执行完
* shutdownNow() : 调用后不再接受新任务,如果有等待任务,移出队列;有正在执行的,尝试停止之
* submit() : 提交执行任务
* execute() : 执行任务
任务提交给线程池之后的处理策略
1. 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建执行这个任务;
2. 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中
1. 若添加成功,则该任务会等待空闲线程将其取出去执行;![](img/task2.png)
2. 若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
3. 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
4. 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
任务提交给线程池之后的处理策略_比喻
>假如有一个工厂,工厂里面有10(`corePoolSize`)个工人,每个工人同时只能做一件任务。
>因此只要当10个工人中有工人是空闲的,`来了任务就分配`给空闲的工人做;
>当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待(`任务队列`);
>如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人(`创建新线程`)进来;然后就将任务也分配给这4个临时工人做;
>如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了(`拒绝执行`)。
>当这14个工人当中有人空闲时,而且空闲超过一定时间(`空闲时间`),新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的
----------------------------------------------------------------------------------------------------------------------------------------------------
最近发现几起对ThreadPoolExecutor的误用,发现都是因为没有仔细看注释和内部运转机制,想当然的揣测参数导致,先看一下新建一个ThreadPoolExecutor的构建参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
看这个参数很容易让人以为是线程池里保持corePoolSize个线程,如果不够用,就加线程入池直至maximumPoolSize大小,如果还不够就往workQueue里加,如果workQueue也不够就用RejectedExecutionHandler来做拒绝处理。
但实际情况不是这样,具体流程如下:
1)当池子大小小于corePoolSize就新建线程,并处理请求
2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理
3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理
4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁
内部结构如下所示:
从中可以发现ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。
其实可以通过Executes来学学几种特殊的ThreadPoolExecutor是如何构建的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool就是一个固定大小的ThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool比较适合没有固定大小并且比较快速就能完成的小任务,没必要维持一个Pool,这比直接new Thread来处理的好处是能在60秒内重用已创建的线程。
其他类型的ThreadPool看看构建参数再结合上面所说的特性就大致知道它的特性
线程池工具类:
ThreadPoolFactory 线程池工厂类
ThreadPoolProxy 线程池代理类
ThreadPoolUtils 线程池工具类
ThreadPoolFactory :
public class ThreadPoolFactory {
public static ThreadPoolProxy normalThreadPool;
public static final int NORMAL_COREPOOLSIZE = 5;
public static final int NORMAL_MAXIMUMPOOLSIZE = 5;
public static final long NORMAL_KEEPALIVETIME = 60;
public static ThreadPoolProxy getNormalThreadPool() {
//双重检测机制
if (normalThreadPool == null) {
synchronized (ThreadPoolFactory.class) {
if (normalThreadPool == null) {
normalThreadPool = new ThreadPoolProxy(NORMAL_COREPOOLSIZE,
<span style="white-space:pre"> </span>NORMAL_MAXIMUMPOOLSIZE,
<span style="white-space:pre"> </span>NORMAL_KEEPALIVETIME);
}
}
}
return normalThreadPool;
}
}
ThreadPoolProxy:
public class ThreadPoolProxy {
ThreadPoolExecutor executor;
int corePoolSize;
int maximumPoolSize;
long keepAliveTime;
public ThreadPoolProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
}
public void initThreadPoolProxy() {
//双重检测机制
if (executor == null) {
synchronized (ThreadPoolExecutor.class) {
if (executor == null) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, threadFactory, handler);
}
}
}
}
public void exector(Runnable runnable) {
initThreadPoolProxy();
executor.execute(runnable);
}
public void remove(Runnable runnable) {
initThreadPoolProxy();
executor.remove(runnable);
}
}
ThreadPoolUtils:
public class ThreadPoolUtils {
//在UI中执行
static Handler handler = new Handler();
public static void runTaskOnMainThread(Runnable runnable) {
handler.post(runnable);
}
//非UI执行
public static void runTaskOnThread(Runnable runnable) {
ThreadPoolFactory.getNormalThreadPool().exector(runnable);
}
}
今天就到这里了。
谢谢大家。