标签:状态 人事 tin vat tran keepalive move 退出 tun
线程池的实现原理无非复用二字,类似数据库连接池,都是将一些重复创建的东西拿来重复使用。其中最关键的问题就两个:一个是怎么复用;一个是怎么回收。在数据库连接池中,一个连接的生命周期是我们可以手动控制的,相对来说容易一些。我们通过使用一个链表来持有连接并复用,超过最大连接数就回收。线程池不同,线程的生命周期不可控,当run方法运行结束了,线程就自然消亡了,因此麻烦一些,我们需要通过一个循环来让run方法不停歇的运行着,跑完一个又一个的任务。线程和任务由worker持有,当一个工人的所有任务(包括队列中的)都跑完了就会被回收。
JDK8的线程池ThreadPoolExecutor类2000多行,当然其中注释占了大头。其中5个内部类,除去4个饱和策略,分量最重的是一个私有的内部类Worker,我叫它工具人类,它是关键。看下它的说明:
/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable
工具人类继承AQS是为了实现不可重入的互斥锁(见标黄外文),因为我们不想要工具人在执行任务时被人打断,比如setCorePoolSize方法里的interruptIdleWorkers方法。
工具人类实现了Runnable接口是为了让它自己先顶替真正的任务,在runWorker方法中实现线程的复用。工具人投入池中,用自己的线程出力,执行自己的任务,直到超过规定正式员工人数,所有任务进入缓存队列中排队。此时队列是动态的,只要工具人手头任务处理完了,就会从队列中拿到新任务进行处理。一旦任务爆仓,就得外包工具人上场。把ThreadPoolExecutor精简一下,去掉线程池的生命周期,去掉饱和策略,从2000多行变成了300多行。直接看代码:
import java.util.HashSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThreadPool implements Executor { private AtomicInteger workerCounts = new AtomicInteger(1); // 正在忙着的工具人数量 private final BlockingQueue<Runnable> workQueue; // 缓存队列,用于缓存超过核心线程数的任务 private final HashSet<ThreadPool.Worker> workers = new HashSet<>(); // 工具人池 private int completedTaskCount = 1; // 已完成任务总数,初始值为1 private int corePoolSize; // 核心线程数 private int maxPoolSize; // 最大线程数 private volatile long keepAliveTime; // 多出核心线程数的线程保命的时间 private Lock mainLock = new ReentrantLock(); // 线程池的全局锁 ThreadPool(int corePoolSize, int maxPoolSize, int queueSize, long keepAliveTime, TimeUnit unit) { this.corePoolSize = corePoolSize; this.maxPoolSize = maxPoolSize; this.workQueue = new ArrayBlockingQueue<>(queueSize); this.keepAliveTime = unit.toNanos(keepAliveTime); } /** * 线程池的主方法,也是入口 * * @param command */ @Override public void execute(Runnable command) { int currentWork = workerCounts.get(); // 先处理核心线程 if (currentWork < corePoolSize) { if (addWork(command, true)) { System.out.println("execute >>> 核心线程数内执行完毕。"); return; } } // 正式员工忙不过来,就把任务加入缓存队列中 if (workQueue.offer(command)) { System.out.println("execute >>> 已加入缓存队列中,队列中有 " + workQueue.size() + " 个任务。"); if (workerCounts.get() == 0) { // 当前已经没有活着的工具人了,因为当前任务都跑完了,需要再创建工具人 addWork(null, false); // 此时无需给工具人分配任务了,因为任务入队缓存队列中,只需从缓存队列中取出即可 } } else if (!addWork(command, false)) { // 缓存队列满了,让外包工具人处理,如果外包工具人名额也满了,那就真搞不定了 System.err.println("execute >>> 外包工具人名额也超额,线程池搞不定了。"); } } /** * 任务入池 * * @param firstTask 初始任务 * @param isCorePoolSize 是否核心线程数 * @return 任务是否已执行 */ public boolean addWork(Runnable firstTask, boolean isCorePoolSize) { // 双循环,通过设置标记来直接从内层循环跳出外层循环 retry: for (; ; ) { // 校验任务是否为空.若缓存队列不为空时,任务可以为空,见execute方法 if (firstTask == null && workQueue.isEmpty()) return false; for (; ; ) { int c = workerCounts.get(); // 获取在跑的工具人数量 System.out.println("addWork >>> 工具人数 : " + c + " 个;是否核心线程数 : " + isCorePoolSize); // 若在核心线程数内,判断工具人数是否已超过;同理,已经超过核心线程数,则看是否超过最大任务数 if (c > (isCorePoolSize ? corePoolSize : maxPoolSize)) return false; // 先将在忙的工具人数+1,跳出到最外层,执行循环下面的逻辑 if (compareAndIncrementWorkerCount(c)) break retry; } } boolean workerStarted = false; // 任务是否已开始执行 boolean workerAdded; // 任务是否已添加到池子里 Worker w = null; final Thread t; try { w = new ThreadPool.Worker(firstTask); // 实例化工具人,将初始任务分配给该工具人 t = w.thread; // 从工具人那里领取线程,等会儿把任务跑起来 if (t != null) { final Lock mainLock = this.mainLock; // 使用全局锁来添加工具人到池子里 mainLock.lock(); try { workers.add(w); // 工具人入池 System.out.println("addWork >>> 工具人池里有 " + workers.size() + " 个工具人。"); workerAdded = true; } finally { mainLock.unlock(); } if (workerAdded) { // 工具人入池成功,可以让他的线程跑起来了 System.out.println("addWork >>> 开始拉起线程,准备进入Workder.run()..."); t.start(); // 关键点1:工具人的线程跑起来了,怎么跑?工具人自己的run(),他也是个Runnable workerStarted = true; } } } finally { if (!workerStarted) // 任务没跑起来,回滚:回收该工具人,在忙着的工具人数-1 addWorkerFailed(w); } return workerStarted; } /** * 新增工具人失败,执行回滚操作 * * @param w */ private void addWorkerFailed(Worker w) { final Lock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); // 刚进去的工具人,从哪来回哪去 decrementWorkerCount(); // 刚加的在忙着的工具人数减回去 } finally { mainLock.unlock(); } } /** * 使用CAS将工具人数+1 * * @param expect * @return */ private boolean compareAndIncrementWorkerCount(int expect) { return workerCounts.compareAndSet(expect, expect + 1); } /** * 工具人执行任务。1个关键点。 * * @param worker */ final void runWork(Worker worker) { Runnable task = worker.task; // 获取该工具人要执行的任务 worker.task = null; // 清空该工具人的任务,所以firstTask就是一次的任务,后面都得从缓存队列中取任务 worker.unlock(); // 这里还是允许中断的 boolean completedAbruptly = true; // 任务执行是否被打断 try { // 关键点3:线程复用——若任务不为空,或者队列中的任务不为空,工具人的线程将一直执行下去。一旦断了,工具人也就可以去死了 while (task != null || (task = getTask()) != null) { worker.lock(); // 每次跑之前,工具人先锁住(不可重复锁),这里就不允许中断了 try { task.run(); System.out.println("runWork >>> 任务真的跑起来了,目前已完成 " + completedTaskCount + " 个任务。"); } finally { task = null; // 任务完成,清空 worker.completedTasks++; // 累计该工具人所完成的任务数量 worker.unlock(); // 工具人解锁(不可重复锁) } } completedAbruptly = false; // 没有被打断 } finally { System.out.println("runWork >>> 这个工具人没任务可做了,要死了..."); processWorkerExit(worker, completedAbruptly); } } /** * 将濒死的工具人从池中剔除 * * @param w * @param completedAbruptly 工具人在干活时是否被人打断过 */ private void processWorkerExit(ThreadPool.Worker w, boolean completedAbruptly) { if (completedAbruptly) // 不曾被人打断,在干活的人数-1 decrementWorkerCount(); final Lock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 死后记账,把该工具人所完成的任务数汇总到总完成任务数中去 workers.remove(w); // 回收工具人的尸体 System.err.println("processWorkerExit >>> 工具人被回收了。"); } finally { mainLock.unlock(); } } /** * 空转,直到工具人数-1 */ private void decrementWorkerCount() { do { } while (!compareAndDecrementWorkerCount(workerCounts.get())); } /** * 使用CAS将工具人数-1 */ private boolean compareAndDecrementWorkerCount(int expect) { return workerCounts.compareAndSet(expect, expect - 1); } /** * 获取队列中的工作任务。两个关键点。 * * @return */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (; ; ) { int wc = workerCounts.get(); // 取得在忙工具人数 boolean timed = wc > corePoolSize; // 超过核心线程池数,超时的外包工具人就危险了 if ((wc > maxPoolSize || (timed && timedOut)) // 要么超过最大数了,要么出现了要炒掉的外包工具人 && (wc > 1 || workQueue.isEmpty())) { // 要么还有忙的工具人,要么缓存队列清空了 if (compareAndDecrementWorkerCount(wc)) return null; // 超时回收这里返回null,runWork就得退出while循环,进入回收工具人阶段,也就是炒人 continue; // 没有炒掉,继续清点外包工具人 } System.out.println("getTask >>> 工作队列中还有 " + workQueue.size() + " 个任务。"); try { Runnable r = timed ? // 什么时候会出现poll超时?当然是队列为空的时候。关键点4——确认这个外包工具人是否要被裁 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 缓存队列空的时候,这里会阻塞。关键点5——线程池靠它不让JVM退出 if (r != null) return r; timedOut = true; // 走到这里说明超时都获取不到任务了,那么说明这个外包工具人可以炒掉了 } catch (InterruptedException retry) { timedOut = false; } } } /** * 工具人,持有线程池中的线程和任务。1个关键点。 */ private class Worker extends AbstractQueuedSynchronizer implements Runnable { private Thread thread; // 线程,用来处理任务,消费者用来消费任务的 private Runnable task; // 任务,生产者创建的 volatile long completedTasks; // 已完成的任务数量 public Worker(Runnable task) { setState(-1); // 设置线程状态:SIGNAL this.task = task; // 设置工具人的任务 thread = new Thread(this); // 创建新线程,这里很关键,必须将工具人本身作为任务,分配给这个线程 } @Override public void run() { System.out.println("Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)..."); runWork(this); // 关键点2:让工具人线程运行工具人任务:工具人.run() -> 线程池.runWork(工具人) -> 工具人.task.run() } /** * 不可重入锁 */ public void lock() { acquire(1); } /** * 解不可重入锁 */ public void unlock() { release(1); } // 以下都是实现AQS中的方法 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } } public static void main(String[] args) { int threadNum = 2; ThreadPool threadPool = new ThreadPool(4, 8, 6, 4, TimeUnit.SECONDS); for (int i = 0; i < threadNum; i++) { final int j = i + 1; threadPool.execute(() -> { System.out.println("hello, world.我是任务" + (j)); }); } } }
运行结果:
addWork >>> 工具人数 : 1 个;是否核心线程数 : true addWork >>> 工具人池里有 1 个工具人。 addWork >>> 开始拉起线程,准备进入Workder.run()... execute >>> 核心线程数内执行完毕。 addWork >>> 工具人数 : 2 个;是否核心线程数 : true addWork >>> 工具人池里有 2 个工具人。 addWork >>> 开始拉起线程,准备进入Workder.run()... execute >>> 核心线程数内执行完毕。 Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)... hello, world.我是任务1 Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)... hello, world.我是任务2 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 0 个任务。 getTask >>> 工作队列中还有 0 个任务。
我们只生产了两个任务,没有超过核心线程数4,所以召唤的两个工具人事做完了后,常驻内存了。把main中的threadNum再分别改为10:
addWork >>> 工具人数 : 1 个;是否核心线程数 : true addWork >>> 工具人池里有 1 个工具人。 addWork >>> 开始拉起线程,准备进入Workder.run()... execute >>> 核心线程数内执行完毕。 addWork >>> 工具人数 : 2 个;是否核心线程数 : true addWork >>> 工具人池里有 2 个工具人。 addWork >>> 开始拉起线程,准备进入Workder.run()... execute >>> 核心线程数内执行完毕。 addWork >>> 工具人数 : 3 个;是否核心线程数 : true addWork >>> 工具人池里有 3 个工具人。 addWork >>> 开始拉起线程,准备进入Workder.run()... execute >>> 核心线程数内执行完毕。 execute >>> 已加入缓存队列中,队列中有 1 个任务。 execute >>> 已加入缓存队列中,队列中有 2 个任务。 execute >>> 已加入缓存队列中,队列中有 3 个任务。 execute >>> 已加入缓存队列中,队列中有 4 个任务。 execute >>> 已加入缓存队列中,队列中有 5 个任务。 execute >>> 已加入缓存队列中,队列中有 6 个任务。 addWork >>> 工具人数 : 4 个;是否核心线程数 : false addWork >>> 工具人池里有 4 个工具人。 addWork >>> 开始拉起线程,准备进入Workder.run()... Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)... hello, world.我是任务1 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 6 个任务。 hello, world.我是任务4 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 5 个任务。 hello, world.我是任务5 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 4 个任务。 hello, world.我是任务6 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 3 个任务。 hello, world.我是任务7 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 2 个任务。 hello, world.我是任务8 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 1 个任务。 hello, world.我是任务9 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 0 个任务。 Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)... hello, world.我是任务2 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 0 个任务。 Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)... hello, world.我是任务3 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 0 个任务。 Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)... hello, world.我是任务10 runWork >>> 任务真的跑起来了,目前已完成 1 个任务。 getTask >>> 工作队列中还有 0 个任务。 runWork >>> 这个工具人没任务可做了,要死了... getTask >>> 工作队列中还有 0 个任务。 getTask >>> 工作队列中还有 0 个任务。 processWorkerExit >>> 工具人被回收了。 getTask >>> 工作队列中还有 0 个任务。
这次跑了10个任务,核心线程数4,所以只有3个正式工具人被召唤,6个任务进入了缓存队列,最后一个任务召唤出来了一个外包工具人处理第10个任务。正式员工常驻内存,而外包的下场的悲惨的。
标签:状态 人事 tin vat tran keepalive move 退出 tun
原文地址:https://www.cnblogs.com/wuxun1997/p/14157693.html