码迷,mamicode.com
首页 > 其他好文 > 详细

ThreadPoolExecutor任务提交过程源码浅析

时间:2019-05-31 23:14:12      阅读:129      评论:0      收藏:0      [点我收藏+]

标签:shutdown   worker   包装   lex   启动   海量   固定   https   博文   

线程池是一种重复利用既有线程的池化技术 ,它大量减少了线程的创建初始化过程,也可以防止海量线程创建占尽资源的风险。

任务提交过程

学习使用线程池的使用,我们都大概知道这样一个过程,如图:

技术图片

这个是一个Runnable实例提交到线程池的过程,大体分为4个步骤:

1)判断当前线程数量是否小于核心线程数量,如果小于则创建一个新的线程去执行该任务;

2)如果线程数已经超过了核心线程数,那么就提交到等待工作队列(等待队列的任务将会被既有的线程获取并处理)。

3)如果等待队列已经满了,无法再提交任务,那么将会判断当前线程数是否超过最大线程数。如果没有超过,那么将创建一个新的线程去执行该任务。

4)如果线程数已经超过了最大线程数,那么将调用拒绝策略的处理器(默认处理器实现是抛出一个拒绝的异常)。

DEMO入口

下面,我们以一个DEMO作为入口,了解一下ThreadPoolExecutor的任务提交过程源码实现,如下:

技术图片

DEMO中,第一行代码创建了一个ThreadPoolExecutor实例对象,构造过程传入了几个参数:

1)corePoolSize:核心线程数,一般线程池中存活的最大线程,即使闲置也不会被回收;

2)maximumPoolSize:最大线程数,线程池中允许最大存活的线程数量;

3)keepAliveTime:闲置线程的存活时间,如果没有工作任务,超过存活时间后闲置线程将会被回收;

4)TimeUnit:存活时间的单位;

5)LinkedBlockQueue:一个阻塞队列,存放待处理任务;

ThreadPoolExecutor构造方法源码

我们点开ThreadPoolExecutor实例的构造方法,代码如下:

技术图片

我们看到,该构造方法调用了另外一个构造方法,同时参数增加了两个,一个是ThreadFactory的实例(用于创建Thread对象),另一个是RejectExecutionHandler的实例(拒绝任务提交时调用的处理器)。点击进入另一个构造方法,如:

技术图片

这里一开始进行了参数值校验,后面的部分就是给成员变量赋值(keepAliveTime转成成了纳秒),构造方法即执行结束了。

 submit提交方法的源码

 接下来我们看看submit提交任务的源码,如:

技术图片

submit方法内,显示将Runnable实例包装成了一个RunnableFuture(该接口继承了Runnable和Future)的实现类FutureTask的实例,执行FutureTask也就会调用Runnable。

下面,我们看看execute这个核心方法,点击进入该方法

技术图片

注意:ThreadPoolExecutor通过一个AtomicInteger类型的ctl变量存储了runState(线程池状态)和workerCount(线程数)两个变量。具体实现可以看另外一个博文说明:https://www.cnblogs.com/lay2017/p/10946928.html

execute代码的核心步骤如下:

1、获得workerCount,判断workerCount是否小于corePoolSize,如果小于那么调用addWorker方法添加一个worker(这里的worker可以暂时理解为线程,后面我们会打开addWorker方法看看)执行该任务。

2、如果workerCount大于等于corePoolSize,或者addWorker方法添加worker失败(失败会重新获取ctl值),那么进入第二重判断。

3、第二重判断先是判断了线程池的状态,如果不是运行中则进行第三重判断,如果是运行中,那么尝试着吧任务提交到workQueue(工作队列中),如果提交任务失败也一样进入第三重判断。如果提交成功了,那么会再获取一次ctl并进行二次校验线程池状态(因为在offer到队列以后有可能线程池shutdown了),如果不是running状态,那么将会从队列中移除并调用拒绝策略。如果还是运行状态,但是当前线程数为0,那么为了把已经提交的任务处理掉会调用addWorker启动一个新的线程确保该任务有线程会处理。

4、在第二重判断中,如果线程池不是running状态或者提交到workQueue失败都会进入第三重判断。第三重判断中,如果不是running状态addWorker方法不会调用成功则直接调用reject方法,该方法将调用RejectExecutionHandler处理策略。如果是因为workQueue提交失败,如果线程数不大于MaximumPoolSize的话,addWorker方法将尝试创建一个新的线程去处理,否则调用reject拒绝任务提交。

 我们看到,execute方法的执行逻辑基本和我们一开始所说的任务提交过程图是一致的。

addWorker方法添加一个线程

 我们通过execute方法的逻辑大体了解了任务提交的源码过程,下面我们打开addWorker方法看看如何添加一个线程的。

技术图片

技术图片

技术图片

addWorker方法主要分为两步:

1、在for循环中通过CAS乐观锁把workerCount数量+1

2、线程数增加成功以后,再实际地去创建一个worker实例对象,worker对象中包含任务对象和一个Thread线程对象。worker对象构建完成以后添加到WorkerSet集合容器里面,并调用Thread的start方法,当Thread执行以后会调用worker中的runWorker方法去处理任务(如果当前任务处理完了,会持续不断地从workQueue中获取任务并处理)。

总结

线程池的设计非常地面向对象,ThreadPoolExecutor就像是一个工厂的车间,这个车间里面有固定的几个工作人员(worker)和一个工作台(workQueue)。worker们会从workQueue上获取任务(RunnableFuture)并处理该任务昼夜不歇。如果workQueue上的任务满了,那么会招来更多地worker(创建新的Thread)来帮助处理任务。如果workQueue上的任务处理完了,大家闲在那里无所事事的话多余的worker就会离开(GC回收)留下几个worker驻守在那里(corePoolSize限定个数)等待新的任务来临。

 

ThreadPoolExecutor任务提交过程源码浅析

标签:shutdown   worker   包装   lex   启动   海量   固定   https   博文   

原文地址:https://www.cnblogs.com/lay2017/p/10957943.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!