标签:变量 核心 needed 等于 单位 catch RKE 实现原理 block
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。 例如,线程数一般取 cpu 数量 +2 比较合适,线程数过多会导致额外的线程切换开销。
Java 中的线程池是用 ThreadPoolExecutor 类来实现的. 本文就对该类的源码来分析一下这个类内部对于线程的创建, 管理以及后台任务的调度等方面的执行原理。
先看一下线程池的类图:
上图的目的主要是为了让大家知道线程池相关类之间的关系,至少赚个眼熟,以后看到不会有害怕的感觉。
Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。
下面是 ThreadPoolExeCutor 类图。Executors 其实是一个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程实例。
从上图也可以看出来,ThreadPoolExeCutor 是线程池的核心。
J.U.C 中有三个 Executor 接口:
Executor:一个运行新任务的简单接口;
ExecutorService:扩展了 Executor 接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;
ScheduledExecutorService:扩展了 ExecutorService。支持 Future 和定期执行任务。
其实通过这些接口就可以看到一些设计思想,每个接口的名字和其任务是完全匹配的。不会因为 Executor 中只有一个方法,就将其放到其他接口中。这也是很重要的单一原则。
在去具体分析 ThreadPoolExeCutor 运行逻辑前,先看下面的流程图:
该图是 ThreadPoolExeCutor 整个运行过程的一个概括,整个源码的核心逻辑总结起来就是:
创建线程:要知道如何去创建线程,控制线程数量,线程的存活与销毁;
添加任务:任务添加后如何处理,是立刻执行,还是先保存;
下面将进入源码分析,来深入理解 ThreadPoolExeCutor 的设计思想。
先来看构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException();
// 注意 workQueue, threadFactory, handler 是不可以为null 的,为空会直接抛出错误 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize 核心线程数:表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到 corePoolSize,则会创建新的线程来执行所提交的任务,即使当前核心线程池有空闲的线程。如果当前核心线程池的线程个数已经达到了corePoolSize,则不再重新创建线程。如果调用了 prestartCoreThread()
或者 prestartAllCoreThreads()
,线程池创建的时候所有的核心线程都会被创建并且启动。若 corePoolSize == 0,则任务执行完之后,没有任何请求进入时,销毁线程池的线程。若 corePoolSize > 0,即使本地任务执行完毕,核心线程也不会被销毁。corePoolSize 其实可以理解为可保留的空闲线程数。
maximumPoolSize: 表示线程池能够容纳同时执行的最大线程数。如果当阻塞队列已满时,并且当前线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务。注意 maximumPoolSize >= 1 必须大于等于 1。maximumPoolSize == corePoolSize ,即是固定大小线程池。实际上最大容量是由 CAPACITY 控制。
keepAliveTime: 线程空闲时间。当空闲时间达到 keepAliveTime值时,线程会被销毁,直到只剩下 corePoolSize 个线程为止,避免浪费内存和句柄资源。默认情况,当线程池的线程数 > corePoolSize 时,keepAliveTime 才会起作用。但当 ThreadPoolExecutor 的 allowCoreThreadTimeOut 变量设置为 true 时,核心线程超时后会被回收。
unit:时间单位。为 keepAliveTime 指定时间单位。
workQueue 缓存队列。当请求的线程数 > maximumPoolSize时,线程进入 BlockingQueue 阻塞队列。可以使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。
threadFactory 创建线程的工程类。可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。
AbortPolicy: 直接拒绝所提交的任务,并抛出 RejectedExecutionException 异常;
CallerRunsPolicy:只用调用者所在的线程来执行任务;
DiscardPolicy:不处理直接丢弃掉任务;
DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务
看完构造函数之后,再来看下该类里面的变量,有助于进一步理解整个代码运行逻辑,下面是一些比较重要的变量:
// 用来标记线程池状态(高3位),线程个数(低29位) // 默认是 RUNNING 状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程个数掩码位数,整型最大位数-3,可以适用于不同平台 private static final int COUNT_BITS = Integer.SIZE - 3; //线程最大个数(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //(高3位):11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 获取高三位 运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取低29位 线程个数 private static int workerCountOf(int c) { return c & CAPACITY; } //计算ctl新值,线程状态 与 线程个数 private static int ctlOf(int rs, int wc) { return rs | wc; }
这里需要对一些操作做些解释。
Integer.SIZE:对于不同平台,其位数不一样,目前常见的是 32 位;
(1 << COUNT_BITS) - 1:首先是将 1 左移 COUNT_BITS 位,也就是第 COUNT_BITS + 1 位是1,其余都是 0;-1 操作则是将后面前面的 COUNT_BITS 位都变成 1。
-1 << COUNT_BITS:-1 的原码是 10000000 00000000 00000000 00000001 ,反码是 111111111 11111111 11111111 11111110 ,补码 +1,然后左移 29 位是 11100000 00000000 00000000 00000000;这里转为十进制是负数。
~CAPACITY :取反,最高三位是1;
总结:这里巧妙利用 bit 操作来将线程数量和运行状态联系在一起,减少了变量的存在和内存的占用。其中五种状态的十进制排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
线程池状态含义:
RUNNING:接受新任务并且处理阻塞队列里的任务;
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务;
STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务;
TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated 方法
TERMINATED:终止状态。terminated 方法调用完成以后的状态;
线程池状态转换:
RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了shutdown()方法。
RUNNING or SHUTDOWN)-> STOP:显式 shutdownNow() 方法;
SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候;
STOP -> TIDYING:当线程池为空的时候;
TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候;
1. 原码:原码就是符号位加上真值的绝对值, 即用第一位表示符号,其余位表示值. 比如如果是 8 位二进制:
[+1]原 = 0000 0001
[-1]原 = 1000 0001
负数原码第一位是符号位.
2. 反码:反码的表示方法是,正数的反码是其本身,负数的反码是在其原码的基础上, 符号位不变,其余各个位取反.
[+1] = [0000 0001]原 = [0000 0001]反
[-1] = [1000 0001]原 = [1111 1110]反
3. 补码:补码的表示方法是,正数的补码就是其本身,负数的补码是在其原码的基础上, 符号位不变, 其余各位取反, 最后 +1. (即在反码的基础上 +1)
[+1] = [0000 0001]原 = [0000 0001]反 = [0000 0001]补
[-1] = [1000 0001]原 = [1111 1110]反 = [1111 1111]补
4. 总结
在知道一个数原码的情况下:
正数:反码,补码 就是本身自己
负数:反码是高位符号位不变,其余位取反。补码:反码+1
5. 左移:当数值左、右移时,先将数值转化为其补码形式,移完后,再转换成对应的原码
左移:高位丢弃,低位补零
[+1] = [00000001]补
[0000 0001]补 << 1 = [0000 0010]补 = [0000 0010]原 = [+2]
[-1] = [1000 0001]原 = [1111 1111]补
[1111 1111]补 << 1 = [1111 1110]补 = [1000 0010]原 = [-2]
其中,再次提醒,负数的补码是反码+1;负数的反码是补码-1;
6. 右移:高位保持不变,低位丢弃
[+127] = [0111 1111]原 = [0111 1111]补
[0111 1111]补 >> 1 = [0011 1111]补 = [0011 1111]原 = [+63]
[-127] = [1111 1111]原 = [1000 0001]补
[1000 0001]补 >> 1 = [1100 0000]补 = [1100 0000]原 = [-64]
通过 ThreadPoolExecutor 创建线程池后,提交任务后执行过程是怎样的,下面来通过源码来看一看。execute 方法源码如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 返回包含线程数及线程池状态(头3位) int c = ctl.get(); // 如果工作线程数小于核心线程数,则创建线程任务执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 如果创建失败,防止外部已经在线程池中加入新任务,重新获取 c = ctl.get(); } // 只有线程池处于 RUNNING 状态,且 入队列成功 if (isRunning(c) && workQueue.offer(command)) { // 后面的操作属于double-check int recheck = ctl.get(); // 如果线程池不是 RUNNING 状态,则将刚加入队列的任务移除 if (! isRunning(recheck) && remove(command)) reject(command); // 如果之前的线程已被消费完,新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 核心池和队列都满了,尝试创建一个新线程 else if (!addWorker(command, false)) // 如果 addWorker 返回是 false,即创建失败,则唤醒拒绝策略 reject(command); }
如果当前运行的线程少于 corePoolSize,则会创建新的线程来执行新的任务;
如果运行的线程个数等于或者大于 corePoolSize,则会将提交的任务存放到阻塞队列 workQueue 中;
如果当前 workQueue 队列已满的话,则会创建新的线程来执行任务;
如果线程个数已经超过了 maximumPoolSize,则会使用饱和策略 RejectedExecutionHandler 来进行处理。
这里要注意一下 addWorker(null, false)
也就是创建一个线程,但并没有传入任务,因为任务已经被添加到 workQueue 中了,所以 worker 在执行的时候,会直接从 workQueue 中获取任务。所以,在 workerCountOf(recheck) == 0
时执行 addWorker(null, false)
也是为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务。
需要注意的是,线程池的设计思想就是使用了核心线程池 corePoolSize,阻塞队列 workQueue 和线程池 maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在需要框架中都会使用。
需要注意线程和任务之间的区别,任务是保存在 workQueue 中的,线程是从线程池里面取的,由 CAPACITY 控制容量。
addWorker 方法的主要工作是在线程池中创建一个新的线程并执行,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize,代码如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取运行状态 int rs = runStateOf(c); /* * 这个if判断 * 如果rs >= SHUTDOWN,则表示此时不再接收新任务; * 接着判断以下3个条件,只要有1个不满足,则返回false: * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 * 2. firsTask为空 * 3. 阻塞队列不为空 * * 首先考虑rs == SHUTDOWN的情况 * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false; * 然后,如果firstTask为空,并且workQueue也为空,则返回false, * 因为队列中已经没有任务了,不需要再添加线程了 */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程数 int wc = workerCountOf(c); // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较, // 如果为false则根据maximumPoolSize来比较。 // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增加workerCount,如果成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); // Re-read ctl // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根据firstTask来创建Worker对象 w = new Worker(firstTask); // 每一个Worker对象都会创建一个线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一个HashSet workers.add(w); int s = workers.size(); // largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这里需要注意有以下几点:
在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。
t.start()
会调用 Worker 类中的 run 方法,Worker 本身实现了 Runnable 接口。原因在创建线程得时候,将 Worker 实例传入了 t 当中,可参见 Worker 类的构造函数。
wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次调用 addWorker 来添加线程会先判断当前线程数是否超过了CAPACITY,然后再去判断是否超 corePoolSize 或 maximumPoolSize,说明线程数实际上是由 CAPACITY 来控制的。
上面分析过程中,提到了一个 Worker 类,对于某些对源码不是很熟悉得同学可能有点不清楚,下面就来看看 Worker 的源码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask;
// 注意此处传入的是this this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */
// 这里其实会调用外部的 runWorker 方法来执行自己。 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) {
// 如果已经设置过1了,这时候在设置1就会返回false,也就是不可重入 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 提供安全中断线程得方法 void interruptIfStarted() { Thread t;
// 一开始 setstate(-1) 避免了还没开始运行就被中断可能 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
首先看到的是 Worker 继承了(AbstractQueuedSynchronizer) AQS,并实现了 Runnable 接口,说明 Worker 本身也是线程。然后看其构造函数可以发现,内部有两个属性变量分别是 Runnable 和 Thread 实例,该类其实就是对传进来得属性做了一个封装,并加入了获取锁的逻辑(继承了 AQS )。具体可参考文章:透过 ReentrantLock 分析 AQS 的实现原理
Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的:
lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;
如果正在执行任务,则不应该中断线程;
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;
之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁。如果使用 ReentrantLock,它是可重入的,这样如果在任务中调用了如 setCorePoolSize 这类线程池控制的方法,会中断正在运行的线程,因为 size 小了,需要中断一些线程 。
所以,Worker 继承自 AQS,用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了 setState(-1);
,把 state 变量设置为 -1,为什么这么做呢?是因为 AQS 中默认的 state 是 0,如果刚创建了一个 Worker 对象,还没有执行任务时,这时就不应该被中断,看一下 tryAquire 方法:
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
正因为如此,在 runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0。tryAcquire 方法是根据 state 是否是 0 来判断的,所以,setState(-1);
将 state 设置为 -1 是为了禁止在执行任务前对线程进行中断。
前面提到了内部类 Worker 的 run 方法调用了外部类 runWorker,下面来看下 runWork 的具体逻辑。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // status 设置为0,允许中断,也可以避免再次加锁失败 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 要派发task的时候,需要上锁 w.lock(); // 如果线程池当前状态至少是stop,则设置中断标志; // 如果线程池当前状态是RUNNININ,则重置中断标志,重置后需要重新 //检查下线程池状态,因为当重置中断标志时候,可能调用了线程池的shutdown方法 //改变了线程池状态。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //任务执行前干一些事情 beforeExecute(wt, task); Throwable thrown = null; try { task.run();//执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任务执行完毕后干一些事情 afterExecute(task, thrown); } } finally { task = null; //统计当前worker完成了多少个任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //执行清了工作 processWorkerExit(w, completedAbruptly); } }
总结一下 runWorker 方法的执行过程:
while 循环不断地通过 getTask() 方法从阻塞队列中取任务;
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
调用 task.run()
执行任务;
如果 task 为 null 则跳出循环,执行 processWorkerExit 方法;
runWorker 方法执行完毕,也代表着 Worker 中的 run 方法执行完毕,销毁线程。
这里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 类中是空的,留给子类来实现。
completedAbruptly 变量来表示在执行任务过程中是否出现了异常,在 processWorkerExit 方法中会对该变量的值进行判断。
getTask 方法是从阻塞队列里面获取任务,具体代码逻辑如下:
private Runnable getTask() { // timeOut变量的值表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断: * 1. rs >= STOP,线程池是否正在stop; * 2. 阻塞队列是否为空。 * 如果以上条件满足,则将workerCount减1并返回null。 * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // timed变量用于判断是否需要进行超时控制。 // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; // 对于超过核心线程数量的这些线程,需要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法; * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时 * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; * 如果减1失败,则返回重试。 * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null; * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。 * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果 r == null,说明已经超时,timedOut设置为true timedOut = true; } catch (InterruptedException retry) { // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 timedOut = false; } } }
其实到这里后,你会发现在 ThreadPoolExcute 内部有几个重要的检验:
判断当前的运行状态,根据运行状态来做处理,如果当前都停止运行了,那很多操作也就没必要了;
判断当前线程池的数量,然后将该数据和 corePoolSize 以及 maximumPoolSize 进行比较,然后再去决定下一步该做啥;
首先是第一个 if 判断,当运行状态处于非 RUNNING 状态,此外 rs >= STOP(线程池是否正在 stop)或阻塞队列是否为空。则将 workerCount 减 1 并返回 null。为什么要减 1 呢,因为此处其实是去获取一个 task,但是发现处于停止状态了,也就是没必要再去获取运行任务了,那这个线程就没有存在的意义了。后续也会在 processWorkerExit 将该线程移除。
第二个 if 条件目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。
什么时候会销毁?当然是 runWorker 方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。
getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法。
下面在看 processWorkerExit 方法的具体逻辑:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1; // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; // 从workers中移除,也就表示着从线程池中移除了一个工作线程 workers.remove(w); } finally { mainLock.unlock(); } // 根据线程池状态进行判断是否结束线程池 tryTerminate(); int c = ctl.get(); /* * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker; * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker; * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,processWorkerExit 执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期。但是这有两点需要注意:
大家想想什么时候才会调用这个方法,任务干完了才会调用。那么没事做了,就需要看下是否有必要结束线程池,这时候就会调用 tryTerminate。
如果此时线程处于 STOP 状态以下,那么就会判断核心线程数是否达到了规定的数量,没有的话,就会继续创建一个线程。
tryTerminate 方法根据线程池状态进行判断是否结束线程池,代码如下:
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为以下几种情况时,直接返回: * 1. RUNNING,因为还在运行中,不能停止; * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了; * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果线程数量不为0,则中断一个空闲的工作线程,并返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默认什么都不做,留给子类实现 terminated(); } finally { // 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
interruptIdleWorkers(
boolean onlyOne) 如果 ONLY_ONE = true 那么就的最多
让一个空闲线程发生中断,ONLY_ONE = false 时是所有空闲线程都会发生中断。那线程什么时候会处于空闲状态呢?
一是线程数量很多,任务都完成了;二是线程在 getTask 方法中执行 workQueue.take()
时,如果不执行中断会一直阻塞。
所以每次在工作线程结束时调用 tryTerminate 方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。
shutdown 方法要将线程池切换到 SHUTDOWN 状态,并调用 interruptIdleWorkers 方法请求中断所有空闲的 worker,最后调用 tryTerminate 尝试结束线程池。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判断 checkShutdownAccess(); // 切换状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试结束线程池 tryTerminate(); }
这里思考一个问题:在 runWorker 方法中,执行任务时对 Worker 对象 w 进行了 lock 操作,为什么要在执行任务的时候对每个工作线程都加锁呢?
下面仔细分析一下:
在 getTask 方法中,如果这时线程池的状态是 SHUTDOWN 并且 workQueue 为空,那么就应该返回 null 来结束这个工作线程,而使线程池进入 SHUTDOWN 状态需要调用shutdown 方法;
shutdown 方法会调用 interruptIdleWorkers 来中断空闲的线程,interruptIdleWorkers 持有 mainLock,会遍历 workers 来逐个判断工作线程是否空闲。但 getTask 方法中没有mainLock;
在 getTask 中,如果判断当前线程池状态是 RUNNING,并且阻塞队列为空,那么会调用 workQueue.take()
进行阻塞;
如果在判断当前线程池状态是 RUNNING 后,这时调用了 shutdown 方法把状态改为了 SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了 workQueue.take()
后会一直阻塞而不会被销毁,因为在 SHUTDOWN 状态下不允许再有新的任务添加到 workQueue 中,这样一来线程池永远都关闭不了了;
由上可知,shutdown 方法与 getTask 方法(从队列中获取任务时)存在竞态条件;
解决这一问题就需要用到线程的中断,也就是为什么要用 interruptIdleWorkers 方法。在调用 workQueue.take()
时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出 InterruptedException,解除阻塞的状态;
但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;
所以 Worker 继承自 AQS,在工作线程处理任务时会进行 lock,interruptIdleWorkers 在进行中断时会使用 tryLock 来判断该工作线程是否正在处理任务,如果 tryLock 返回 true,说明该工作线程当前未执行任务,这时才可以被中断。
下面就来分析一下 interruptIdleWorkers 方法。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers 遍历 workers 中所有的工作线程,若线程没有被中断 tryLock 成功,就中断该线程。
为什么需要持有 mainLock ?因为 workers 是 HashSet 类型的,不能保证线程安全。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中断所有工作线程,无论是否空闲 interruptWorkers(); // 取出队列中没有被执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow 方法与 shutdown 方法类似,不同的地方在于:
设置状态为 STOP;
中断所有工作线程,无论是否是空闲的;
取出阻塞队列中没有被执行的任务并返回。
shutdownNow 方法执行完之后调用 tryTerminate 方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为 TERMINATED。
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
getTaskCount:线程池已经执行的和未执行的任务总数;
getCompletedTaskCount:线程池已完成的任务数量,该值小于等于 taskCount;
getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
getPoolSize:线程池当前的线程数量;
getActiveCount:当前线程池中正在执行任务的线程数量。
通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。
到此,关于 ThreadPoolExecutor 的内容就讲完了。
标签:变量 核心 needed 等于 单位 catch RKE 实现原理 block
原文地址:https://www.cnblogs.com/huansky/p/12467720.html