标签:mamicode 最大数 row epo 线程中断 有序 wait EDA boolean
BlockingQueue<Runnable> workQueue; // 任务队列。使用workQueue.isEmpty()判断队列是否为空,而非workQueue.poll()==null判断,这样的判空方式容纳特殊队列,如DelayQueue
ReentrantLock mainLock;
HashSet<Worker> workers; // 线程池中的所有工作线程,仅能在mainLock下访问
Condition termination; // 用于支持awaitTermination
// 所有的控制参数都被定义为volatile
ThreadFactory threadFactory;
RejectedExecutionHandler handler;
long keepAliveTime;
boolean allowCoreThreadTimeOut;
int corePoolSize;
int maximumPoolSize;
int largestPoolSize; // 线程池中工作线程的历史最大数量:largestPoolSize = workers.size() > largestPoolSize ? workers.size() : largestPoolSize;
long completedTaskCount; // 完成任务的计数器,仅在工作线程终止时更新,仅在mainLock下访问
/* 核心字段,打包了2种含义:worker线程数、线程池状态。
workerCount(低29位):已经被允许start并且不被允许stop的worker的数量。该值可能与活动线程的实际数量会出现短暂性不同
runState(高3位):状态的数值顺序是重要的,以允许有序的比较,状态流转如下
> RUNNING -> SHUTDOWN,调用shutdown()方法,可能隐含在finalize()方法中
> RUNNING or SHUTDOWN -> STOP,调用shutdownNow()
> STOP -> TIDYING,当线程池是empty时
> TIDYING -> TERMINATED,当terminate()钩子方法执行完成
当状态为TERMINAED时,线程在awaitTermination()方法上的等待将会返回。
运行状态描述 已知:RUNNING=111, SHUTDOWN=0, STOP=001, TIDYING=010, TERMINATED=011
RUNNING // 接收的新任务,并且处理排队中的任务
SHUTDOWN // 不接受新任务,但处理排队中的任务
STOP // 不接受新任务,不处理排队中的任务,并且中断正在处理的任务
TIDYING // 所有任务已经终止,workerCount=0,将运行terminate()钩子方法
TERMINATED // terminate()钩子方法执行完毕
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ReentrantLock mainLock;
HashSet<Worker> workers; // 线程池中的所有工作线程,仅能在mainLock下访问
mainLock
字段主要是为了保证工作线程池字段workers(非安全集合HashMap)
在多线程并发情况下的访问。至于workers
为何使用HashMap
而非使用安全的ConcurrentHashMap
,原因如下所示:
使用
mainLock
加锁操作会让操作按照顺序一个一个执行。这样保证了interruptIdleWorkers()
方法在执行期间避免中断风暴,尤其是在shutdown期间。注:
interruptIdleWorkers()
方法只会被shutdown()
方法调用
从类图结构来看:Worker
类既是锁,又是线程。
核心代码如下所示,共分为3部分:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 抑制中断操作,直到runWorker()方法开始运行,runWorker()方法运行时会先执行unlock()方法,将state设置为0
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 多线程核心部分:实现run()方法
public void run() {
runWorker(this);
}
// AQS核心实现部分
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 中断worker线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
worker线程自己提供了中断worker的方法interruptIfStarted()
。即创建Worker对象之后,只有在worker线程运行后,才可以执行worker线程的中断操作。
抑制中断的手段:
interruptIfStarted()
tryAcquire()
方法中,CAS只有在state=0的时候才能操作成功,即获取到锁在中断操作的方法中:
interruptWorkers()
:该方法会调用interruptIfStarted()
方法中断worker线程,而该方法只有在state>=0的情况下才会执行中断操作interruptIdleWorkers()
:该方法只有在worker.tryLock()
获取到锁时,才能执行中断操作而上述情况下,state=-1,均不能执行成功。只有在runWorker()
方法执行后,才会将state=0
操作执行
该方法作为整体方法入口,会根据线程池的线程数量、线程池状态,来决定针对添加的task执行以下哪3种操作:
任务队列
添加新task成功,且线程池仍在运行状态,但此时线程池worker线程数量为0任务队列
添加新task成功,且线程池状态已经不在运行态,但从任务队列中移除新任务失败,但此时线程池worker线程数量为0任务队列
(前提条件:线程池为运行态)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false))
reject(command);
}
该方法一共干了3件事:
workers
属性中源码省略
该方法有个非常重要的关键点,即:worker线程是否会正常执行结束。即:getTask()
方法一旦返回null,表明该worker线程无task可以处理,此时正常情况下该worker线程将会执行结束,线程退出。而getTask()
方法中,会会根据timed来决定在BlockingQueue
获取task时,是否阻塞等待。
总结:该方法一旦返回NULL,即表明该worker线程
将执行结束;否则,worker
线程将在BlockingQueue
队列中阻塞等待
private Runnable getTask() {
boolean timedOut = false; // 在workQueue.poll()是否超时
for (;;) {
/* 如下操作判断线程池的状态,是否结束该线程:
1. 线程池状态SHUTDOWN
2. 线程池状态STOP,且任务队列为空(即没有任何可执行任务)*/
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/* timed:用于判断worker线程调用BlockingQueue获取task时,阻塞是否为有限期的阻塞。
timed=true,表示是有超时时间的阻塞
timed=false,表示无限期阻塞
allowCoreThreadTimeOut=true,则表示允许核心线程数超时 */
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程池数量大于maximumPoolSize,该线程获取任务直接返回null。
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
顾名思义,该方法时worker线程
退出前的处理工作:
将当前线程完成的任务数量,累加到completedTaskCount中
尝试进行线程池终止
如果线程池仍在RUNNING状态,且worker线程没有异常退出,由于getTask()==null,即任务等待队列已经为空,此时判断coreThread是否允许超时,来限制空闲的workers的线程数量
注:是否将线程池的worker线程
数量维护在一个稳定范围,仍然需要考虑。该方法就是做如此处理:worker线程在结束前,会判断线程池是否需要新增一个新的worker线程,新增worker线程的情况如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 试着看看线程池是否可以终止
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
/* 新增worker线程的情况如下:
1. worker线程为异常终止
2. 线程池中的worker线程数量,小于其应该有的最小值(若允许核心线程运行结束,则最小值为1,否则最小值为corePoolSize)*/
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) // 如果min == 0,且任务队列中又增加新任务,则将min=1,workers中保留min个worker线程。
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 相当于创建一个新的worker线程,但没有马上为该worker线程分配task。该worker将会从队列中getTask()获取任务。
addWorker(null, false);
}
}
核心操作如下:
SHUTDOWN
interruptIdleWorkers()
onShutdown()
tryTerminate()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
中断风暴的产生,主要来自于如下方法:
空闲的worker线程
假如多线程并发调用shutdown()
方法,此时若没有mainLock
锁,让线程有序的进行调用,则可能引发大规模的空闲worker
线程中断
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
两个方法如下所示,其中主要区别为:
shutdown()
的状态是SHUTDOWN
;而shutdownNow()
的状态则是STOP
shutdown()
中断空闲worker线程;而shutdownNow()
则是中断所有worker线程shutdown()
有钩子方法中断worker
的线程总共有两类:
中断空闲worker:若未中断,且不可重入锁
加锁成功,再中断。非重入锁加锁的时机有两个:
runWorker()
方法中,getTask()获取任务时,会加锁。即,运行可执行任务和中断线程的操作,是不可同时发生!!
中断所有worker:遍历直接中断
1、runWorker()中有前置、后置方法
2、shutdown()中有onShutdown()方法
标签:mamicode 最大数 row epo 线程中断 有序 wait EDA boolean
原文地址:https://www.cnblogs.com/wolf-w/p/12498161.html