标签:
注:本文的分析和源码基于jdk1.7;
一、ThreadPoolExecutor创建
ThreadPoolExecutor作为java.util.concurrent包中核心的类,先看下类型的结构:
最顶级的接口都是Executor,而ThreadPoolExecutor继承于抽象类AbstractExecutorService,提供一下4个构造函数用于创建:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,ong keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
前面的3个方法都是使用通过this调用最后一个方法,没有指定的构造参数使用默认参数,参数解析:
1、
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
线程池核心线程数大小,初始化是核心线程数也是0,除非先调用prestartCoreThread或者prestartAllCoreThreads先创建核心线程;
在没有设置allowCoreThreadTimeOut为true情况下,核心线程不会销毁;
2、
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
线程池线程数最大值,达到最大值后线程池不会再增加线程执行任务,任务会进入等待队列或者由拒绝策略处理;
该值实际的可设置最大值不是Integer.MAX_VALUE,而是常量CAPACITY(后面再解析常量)
3、
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);1、判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,如果能完成新线程创建exexute方法结束,成功提交任务;private boolean addWorker(Runnable firstTask, boolean core) {//<span style="font-family: 微软雅黑; widows: auto;">firstTask:新增一个线程并执行这个任务,可空,增加的线程从队列获取任务;core:是否使用corePoolSize作为上限,否则使用maxmunPoolSize</span>
retry: //很少见的关键字,自行度娘
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))//线程状态非运行并且当非shutdown状态下任务为空且队列非空;
return false; //判断条件有点难理解,其实是非运行状态下(>=SHUTDOWN)或者SHUTDOWN状态下任务非空(新提交任务)、任务队列为空,就不可以再新增线程了(return false),即SHUTDOWN状态是可以新增线程去执行队列中的任务;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //实际最大线程数是CAPACITY;
return false;
if (compareAndIncrementWorkerCount(c)) //AtomicInteger的CAS操作;
break retry; //<span style="font-family: 微软雅黑; widows: auto;">新增线程数成功,结束retry(retry下的for循环)</span>
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) //状态发生改变,重试retry;
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask); // Worker为内部类,封装了线程和任务,通过ThreadFactory创建线程,可能失败抛异常或者返回null
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // SHUTDOWN以后的状态和SHUTDOWN状态下firstTask为null,不可新增线程
throw new IllegalThreadStateException();
workers.add(w); //保存到一个HashSet中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //记录最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//失败回退,从wokers移除w,线程数减一,尝试结束线程池(调用tryTerminate方法,后续解析)
}
return workerStarted;
}public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); //这个方法校验线程访问许可,不是很理解,后面有时间再单独解析;
advanceRunState(SHUTDOWN); //转换线程池状态为SHUTDOWN
interruptIdleWorkers(); //中断所有空闲的线程
onShutdown(); // 空实现方法,是做shutdown清理操作的
} finally {
mainLock.unlock();
}
tryTerminate(); //尝试结束线程池(设置状态为TERMINATED)
}public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//同上
advanceRunState(STOP);//转换线程池状态到STOP
interruptWorkers();//中断所有线程
tasks = drainQueue();//获取到任务队列所有任务,并清空队列
} finally {
mainLock.unlock();
}
tryTerminate();//同上
return tasks;
}final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //保证只有SHUTDOWN状态下任务队列为空和STOP状态下才以尝试终止
return;
if (workerCountOf(c) != 0) { //线程数还不是0情况下不可结束线程池
interruptIdleWorkers(ONLY_ONE); //只为了中断一个线程?还不是非常理解设计者的意思
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS操作设置TIDYING状态,注意这里处于循环中,失败会重设的
try {
terminated(); //空实现方法
} finally {
ctl.set(ctlOf(TERMINATED, 0));//最终状态TERNINATED
termination.signalAll();//可重入锁的condition,通知所有wait,后面会有看到
}
return;
}
} finally {
mainLock.unlock();
}
}
}public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}这个方法两个入参,设置等待超时时间;
标签:
原文地址:http://blog.csdn.net/wenhuayuzhihui/article/details/51377174