标签:full gc 监控 提交 security off super 而在 idle bug
在一次聚会中,我和一个腾讯大佬聊起了池化技术,提及到java的线程池实现问题,我说这个我懂啊,然后巴拉巴拉说了一大堆,然后腾讯大佬问我说,那你知道线程池有什么缺陷吗?我顿时哑口无言,甘拜下风,所以这次我再回来思考一下线程池的实现原理
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//校验几个参数不能小于零,否则抛出异常
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
默认的几个属性:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大线程数是 2^29-1=536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//将CAPACITY取费后和c进行取与运算,可以得到高3位的值,即线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//将c和CAPACITY取与运算,可以得到低29位的值,即线程池的个数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数(CAPACITY)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果当前的线程数小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//调用addWorker新建一个线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
//校验当前线程状态是RUNNING,并将command入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果不是运行状态,那么移除队列,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
//防止任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//到这里说明队列已经满了,所以新建一个线程,如果新建的线程数已经超过了maximumPoolSize,那么执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
用一张图来概括一下上面的内容:
我们下面看一下addWorker是如何创建线程的:
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);
//1
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//2. 校验传入的线程数是否超过了容量大小, 或者是否超过了corePoolSize或maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//到了这里说明线程数没有超,那么就用CAS将线程池的个数加1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//3 说明有其他的线程抢先更新了状态,继续下一轮的循环,跳到外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//4 如果线程是没有问题的话,那么将worker加入到队列中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// largestPoolSize 用于记录 workers 中的个数的最大值
// 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果worker入队成功,那么启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果worker启动失败,那么就回滚woker线程创建的状态
if (! workerStarted)
addWorkerFailed(w);
}
// 返回线程是否启动成功
return workerStarted;
}
addWorkerFailed
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
addWorkerFailed的处理就是将workers集合里面的worker移除,然后count减1,
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
....
}
Worker是继承AQS对象的,在创建Worker对象的时候会传入一个Runnable对象,并设置AQS的state状态为-1,并从线程工厂中新建一个线程
调用thread.start方法会调用到Worker的run方法中
public void run() {
runWorker(this);
}
Worker的run方法会调用到ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task为空,那么就从workQueue里面获取task
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态大于等于 STOP,那么意味着该线程也要中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 这是一个钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 这是一个钩子方法
afterExecute(task, thrown);
}
} finally {
// 置空 task,准备 getTask 获取下一个任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//异常情况或getTask获取不到任务时会执行关闭
processWorkerExit(w, completedAbruptly);
}
}
传入一个Worker首先去校验firstTask是不是null,如果是那么就调用getTask方法从workQueue队列里面获取,然后判断一下当前的线程是否需要中断,如需要的话执行钩子方法,然后调用task的run方法执行task;
如果while循环里面getTask获取不到任务的话,就结束循环调用processWorkerExit方法执行关闭;
如果是异常原因导致的while循环退出,那么会调用processWorkerExit并传入为true
getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//要么状态大于STOP,要么状态等于SHUTDOWN并且队列是空的,那么线程数减一后返回null
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//校验线程数是否超了,或者是否超时
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 到 workQueue 中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
这个方法返回null有如下几种情况:
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是异常原因中断,那么需要将运行线程数减一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//设置完成任务数
completedTaskCount += w.completedTasks;
//将worker从集合里移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//判断当前的线程池是否处于SHUTDOWN状态,判断是否要终止线程
tryTerminate();
int c = ctl.get();
//如果是RUNNING或SHUTDOWN则会进入这个方法
if (runStateLessThan(c, STOP)) {
//如不是以外中断则会往下走
if (!completedAbruptly) {
//判断是否保留最少核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果当前运行的Worker数比当前所需要的Worker数少的话,那么就会调用addWorker,添加新的Worker
addWorker(null, false);
}
}
这个任务处理流程看似简单,实际上有很多坑,你在使用的时候一定要注意。
但是,我们平时开发的 Web 系统通常都有大量的 IO 操作,比方说查询数据库、查询缓存等等。任务在执行 IO 操作的时候 CPU 就空闲了下来,这时如果增加执行任务的线程数而不是把任务暂存在队列中,就可以在单位时间内执行更多的任务,大大提高了任务执行的吞吐量。所以你看 Tomcat 使用的线程池就不是 JDK 原生的线程池,而是做了一些改造,当线程数超过 coreThreadCount 之后会优先创建线程,直到线程数到达 maxThreadCount,这样就比较适合于 Web 系统大量 IO 操作的场景了,你在实际运用过程中也可以参考借鉴。
我在实际项目中就曾经遇到过任务被丢给线程池之后,长时间都没有被执行的诡异问题。最初,我认为这是代码的 Bug 导致的,后来经过排查发现,是因为线程池的 coreThreadCount 和 maxThreadCount 设置的比较小,导致任务在线程池里面大量的堆积,在调大了这两个参数之后问题就解决了。跳出这个坑之后,我就把重要线程池的队列任务堆积量,作为一个重要的监控指标放到了系统监控大屏上。
我们这里直接学习Tomcat是如何优化线程池的,在我们平时的使用中如果使用LinkedBlockingQueue的话,默认是使用Integer.MAX_VALUE,即无界队列(这种情况下如果没有配置队列的capacity的话,队列始终不会满,那么始终无法进入开启新线程到达maxThreads个数的地步,则此时配置maxThreads其实是没有意义的)。
而在Tomcat中使用的是TaskQueue,TaskQueue的队列capacity为maxQueueSize,默认也是Integer.MAX_VALUE。但是,其重写offer方法,当其线程池大小小于maximumPoolSize的时候,返回false,即在一定程度改写了队列满的逻辑,修复了使用LinkedBlockingQueue默认的capacity为Integer.MAX_VALUE的时候,maxThreads失效的"bug"。从而可以继续增长线程到maxThreads,超过之后,继续放入队列。
所以综上,Tomcat的线程池使用了自己扩展的taskQueue,修改了offer的逻辑,以做到最小的改动实现了线程池的改造。
我们再来回顾一下ThreadPoolExecutor的execute方法是怎么写的:
ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//这里,如果使用workQueue的offer成功的话,那么就不会创建新的线程,如果失败的话,就会走到else if方法进行创建新的线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
TaskQueue
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private ThreadPoolExecutor parent = null;
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//当其线程池大小小于maximumPoolSize的时候,返回false
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
}
我们从这里可以看到
源码分析—ThreadPoolExecutor线程池三大问题及改进方案
标签:full gc 监控 提交 security off super 而在 idle bug
原文地址:https://www.cnblogs.com/luozhiyun/p/12037536.html