标签:改变 poll general count adf 忽略 inter 函数 wan
ThreadPoolExecutor 下文简称 TPE ,我们使用它都是从Executror 这个类中的方法 :
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 } 6 7 8 public static ExecutorService newSingleThreadExecutor() { 9 return new FinalizableDelegatedExecutorService 10 (new ThreadPoolExecutor(1, 1, 11 0L, TimeUnit.MILLISECONDS, 12 new LinkedBlockingQueue<Runnable>())); 13 } 14 15 16 17 public static ExecutorService newCachedThreadPool() { 18 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 19 60L, TimeUnit.SECONDS, 20 new SynchronousQueue<Runnable>()); 21 } 22 23 24 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 25 return new ScheduledThreadPoolExecutor(corePoolSize); 26 } 27 28 //ScheduledExecutorService 29 public ScheduledThreadPoolExecutor(int corePoolSize) { 30 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 31 new DelayedWorkQueue()); 32 } 33 34 35 public class ScheduledThreadPoolExecutor 36 extends ThreadPoolExecutor 37 implements ScheduledExecutorService 38 39
Executror 的方法名很明显地说明了创建的对象的用途,我们也可以看到它们实际的都是走到了TLE构造函数,只是传入的参数不同。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory) { 7 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 8 threadFactory, defaultHandler); 9 } 10 11 12 public ThreadPoolExecutor(int corePoolSize, 13 int maximumPoolSize, 14 long keepAliveTime, 15 TimeUnit unit, 16 BlockingQueue<Runnable> workQueue, 17 ThreadFactory threadFactory, 18 RejectedExecutionHandler handler) { 19 if (corePoolSize < 0 || 20 maximumPoolSize <= 0 || 21 maximumPoolSize < corePoolSize || 22 keepAliveTime < 0) 23 throw new IllegalArgumentException(); 24 if (workQueue == null || threadFactory == null || handler == null) 25 throw new NullPointerException(); 26 this.corePoolSize = corePoolSize; 27 this.maximumPoolSize = maximumPoolSize; 28 this.workQueue = workQueue; 29 this.keepAliveTime = unit.toNanos(keepAliveTime); 30 this.threadFactory = threadFactory; 31 this.handler = handler; 32 }
由此可以推断通过配置TLE的各个参数,实现不同的功能。
官方文档中有详细介绍,细节请看官方文档
这两个在构造方法中需要指定,核心线程数和最大线程池线程数,很好理解,就像两条上限线,当来任务时没达到核心线程数,那么就开启一条新线程去执行,要是达到核心数量了,怎么办,任务入列,要是超过了我设定的最大线程数量,那么不再接受任务。这两个值可以动态设置: setCorePoolSize(int)
和setMaximumPoolSize(int)
.
默认情况下,甚至核心线程最初只在新任务到达时创建并启动,但可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态覆盖。 如果使用非空队列构造池,则可能需要预启动线程。
使用ThreadFactory创建新线程。 如果没有另外指定,则使用Executors.defaultThreadFactory(),它将所有线程创建在同一个ThreadGroup中,并具有相同的NORM_PRIORITY优先级和非守护进程状态。 通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护程序状态等。如果ThreadFactory在通过从newThread返回null请求时无法创建线程,则执行程序将继续,但可能无法 执行任何任务。 线程应该拥有“modifyThread”RuntimePermission。 如果使用池的工作线程或其他线程不具有此权限,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能保持可以终止但未完成的状态。(取至官方文档谷歌翻译)
前面说到 Core and maximum pool sizes 就像两个上限线,当超过了核心线程数后,任务开始执行完成,那么线程就空闲了,此时要是空闲达到了 Keep-alive times 这个设定值,那么线程就会被回收。使用Long.MAX_VALUE类型的TimeUnit.NANOSECONDS有效地禁止空闲线程在关闭之前终止。
the keep-alive policy applies only when there are more than corePoolSize threads. But method
allowCoreThreadTimeOut(boolean)
can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero
队列的使用和线程池的线程数有关。
队列处理策略 :
发
下面是状态变量的定义
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 // runState is stored in the high-order bits 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS; 11 12 // Packing and unpacking ctl 13 private static int runStateOf(int c) { return c & ~CAPACITY; } 14 private static int workerCountOf(int c) { return c & CAPACITY; } 15 private static int ctlOf(int rs, int wc) { return rs | wc; } 16 17 /* 18 * Bit field accessors that don‘t require unpacking ctl. 19 * These depend on the bit layout and on workerCount being never negative. 20 */ 21 22 private static boolean runStateLessThan(int c, int s) { 23 return c < s; 24 } 25 26 private static boolean runStateAtLeast(int c, int s) { 27 return c >= s; 28 } 29 30 private static boolean isRunning(int c) { 31 return c < SHUTDOWN; 32 }
runStatus 提供这几种状态 :
看 execute 方法。
1 //可以创建线程处理就处理,不行就入列,入列也失败,拒绝! 2 public void execute(Runnable command) { 3 if (command == null) 4 throw new NullPointerException(); 5 /* 6 * Proceed in 3 steps: 7 * 8 * 1. If fewer than corePoolSize threads are running, try to 9 * start a new thread with the given command as its first 10 * task. The call to addWorker atomically checks runState and 11 * workerCount, and so prevents false alarms that would add 12 * threads when it shouldn‘t, by returning false. 13 * 14 * 2. If a task can be successfully queued, then we still need 15 * to double-check whether we should have added a thread 16 * (because existing ones died since last checking) or that 17 * the pool shut down since entry into this method. So we 18 * recheck state and if necessary roll back the enqueuing if 19 * stopped, or start a new thread if there are none. 20 * 21 * 3. If we cannot queue task, then we try to add a new 22 * thread. If it fails, we know we are shut down or saturated 23 * and so reject the task. 24 */ 25 int c = ctl.get(); 26 //未达到 corePoolSize 增加新线程执行 27 if (workerCountOf(c) < corePoolSize) { 28 if (addWorker(command, true)) 29 return; 30 c = ctl.get(); 31 } 32 //增加线程失败,或者有可能 worker的数量大于等于 core ,或是大于 maxSize ,任务入列 33 if (isRunning(c) && workQueue.offer(command)) { 34 //注意 :此时任务已成功入列!!! 35 int recheck = ctl.get(); 36 //再次检查,要是 此时是 SHUTDOWN 状态(线程池关闭),那么移除这个任务,同时拒绝这个请求 37 if (! isRunning(recheck) && remove(command)) 38 reject(command); 39 //非running 状态 ,同时 workerCounter 为 0 (可能线程池里的任务都执行完了),那么新建一个线程,而不去处理,为什么要这样呢? 40 //因为任务此时在队列中了,创建线程后,自动会去获取任务并处理 41 //要是都不是就退出这个方法,此时任务在队列中等待被处理 42 else if (workerCountOf(recheck) == 0) 43 addWorker(null, false); 44 } 45 //线程池 shut down 或是队列满了,再次新建一个线程执行,但是这次的线程数的判断边界是 maxSize ,即是 46 // addWorker的第二个参数来指定 47 else if (!addWorker(command, false)) 48 reject(command); 49 }
假设目前还未达到core 的数量,那么进入 addWorker方法 。
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 // 线程池满足如下条件中的任意一种时, 就会直接结束该方法, 并且返回 false 9 // 表示没有创建新线程, 新提交的任务也没有被执行. 10 // 1 .处于 STOP, TYDING 或 TERMINATD 状态 11 // 2 .处于 SHUTDOWN 状态, 并且参数 firstTask != null 12 // 3 .处于 SHUTDOWN 状态, firstTask == null 且阻塞队列 workQueue为空 13 if (rs >= SHUTDOWN && 14 ! (rs == SHUTDOWN && 15 firstTask == null && 16 ! workQueue.isEmpty())) 17 return false; 18 for (;;) { 19 int wc = workerCountOf(c); 20 //此处可以看到 第二个参数,core 是用来 选定线程数边界的 21 if (wc >= CAPACITY || 22 wc >= (core ? corePoolSize : maximumPoolSize)) 23 return false; 24 //自旋增加 c , ctl的值 ,成功就 break 退出 25 if (compareAndIncrementWorkerCount(c)) 26 break retry; 27 c = ctl.get(); // Re-read ctl 28 if (runStateOf(c) != rs) 29 //说明有人抢了, 30 continue retry; 31 // else CAS failed due to workerCount change; retry inner loop 32 // 或者 CAS 失败是因为 workerCount 改变,继续loop 33 } 34 } 35 36 //下面是增加一个线程的操作,创建 worker ,上锁,再次判断,创建成功后线程开始执行。 37 boolean workerStarted = false; 38 boolean workerAdded = false; 39 Worker w = null; 40 try { 41 w = new Worker(firstTask); 42 final Thread t = w.thread; 43 if (t != null) { 44 final ReentrantLock mainLock = this.mainLock; 45 mainLock.lock(); 46 try { 47 // Recheck while holding lock. 48 // Back out on ThreadFactory failure or if 49 // shut down before lock acquired. 50 // recheck 当获得锁的时候 ,退出因为 ThreadFactory 失败或是 在获得锁之前 线程池 shut down 51 int rs = runStateOf(ctl.get()); 52 53 if (rs < SHUTDOWN || 54 (rs == SHUTDOWN && firstTask == null)) { 55 if (t.isAlive()) // precheck that t is startable 56 throw new IllegalThreadStateException(); 57 workers.add(w); 58 int s = workers.size(); 59 if (s > largestPoolSize) 60 largestPoolSize = s; 61 workerAdded = true; 62 } 63 } finally { 64 mainLock.unlock(); 65 } 66 if (workerAdded) { 67 //走到这里线程肯定是创建了,并且线程池一定是正常的。 68 t.start(); 69 workerStarted = true; 70 } 71 } 72 } finally { 73 if (! workerStarted) 74 addWorkerFailed(w); 75 } 76 return workerStarted; 77 }
addWoker 中创建了一个Worker,我们先看一下构造方法,再慢慢分析它。
1 Worker(Runnable firstTask) { 2 setState(-1); // inhibit interrupts until runWorker 3 this.firstTask = firstTask; 4 this.thread = getThreadFactory().newThread(this); 5 }
1 public interface ThreadFactory { 2 3 /** 4 * Constructs a new {@code Thread}. Implementations may also initialize 5 * priority, name, daemon status, {@code ThreadGroup}, etc. 6 * 7 * @param r a runnable to be executed by new thread instance 8 * @return constructed thread, or {@code null} if the request to 9 * create a thread is rejected 10 */ 11 Thread newThread(Runnable r); 12 }
可以看到 ThreadFactory 实际就是创建线程的方法,同时传入一个 Runnable , worker 里面传入了一个this ,我们赶紧看一下worker的定义。
1 private final class Worker extends AbstractQueuedSynchronizer implements Runnable
意图很明显,就是worker 本身带有一个任务(可以为NULL),让刚创建的线程去执行这个任务。下面看一下它到底执行了什么?
1 /** Delegates main run loop to outer runWorker */ 2 public void run() { 3 runWorker(this); 4 } 5 6 7 8 /** Delegates main run loop to outer runWorker */ 9 public void run() { 10 runWorker(this); 11 } 12 13 14 /** 15 * Main worker run loop. Repeatedly gets tasks from queue and 16 * executes them, while coping with a number of issues: 17 * 18 * 1. We may start out with an initial task, in which case we 19 * don‘t need to get the first one. Otherwise, as long as pool is 20 * running, we get tasks from getTask. If it returns null then the 21 * worker exits due to changed pool state or configuration 22 * parameters. Other exits result from exception throws in 23 * external code, in which case completedAbruptly holds, which 24 * usually leads processWorkerExit to replace this thread. 25 * 26 * 2. Before running any task, the lock is acquired to prevent 27 * other pool interrupts while the task is executing, and then we 28 * ensure that unless pool is stopping, this thread does not have 29 * its interrupt set. 30 * 31 * 3. Each task run is preceded by a call to beforeExecute, which 32 * might throw an exception, in which case we cause thread to die 33 * (breaking loop with completedAbruptly true) without processing 34 * the task. 35 * 36 * 4. Assuming beforeExecute completes normally, we run the task, 37 * gathering any of its thrown exceptions to send to afterExecute. 38 * We separately handle RuntimeException, Error (both of which the 39 * specs guarantee that we trap) and arbitrary Throwables. 40 * Because we cannot rethrow Throwables within Runnable.run, we 41 * wrap them within Errors on the way out (to the thread‘s 42 * UncaughtExceptionHandler). Any thrown exception also 43 * conservatively causes thread to die. 44 * 45 * 5. After task.run completes, we call afterExecute, which may 46 * also throw an exception, which will also cause thread to 47 * die. According to JLS Sec 14.20, this exception is the one that 48 * will be in effect even if task.run throws. 49 * 50 * The net effect of the exception mechanics is that afterExecute 51 * and the thread‘s UncaughtExceptionHandler have as accurate 52 * information as we can provide about any problems encountered by 53 * user code. 54 * 55 * @param w the worker 56 * 57 * 58 * 这里可以看到执行完任务后,就会阻塞在 getTask ,而线程没有被回收 59 * 除非getTask 返回 null ,所有我们利用 一些调用满足 getTask 返回 null 60 * 例如 : 超时设置 61 * 62 * 63 **/ 64 final void runWorker(Worker w) { 65 Thread wt = Thread.currentThread(); 66 Runnable task = w.firstTask; 67 w.firstTask = null; 68 w.unlock(); // allow interrupts 允许中断,释放锁(为了下面抢任务) 69 boolean completedAbruptly = true; 70 try { 71 //抢任务,抢到就加锁,即是 firstTask 为null 时才会去 getTask 72 while (task != null || (task = getTask()) != null) { 73 // 加锁,防止线程被其他线程中断 74 w.lock(); 75 // If pool is stopping, ensure thread is interrupted; 76 // if not, ensure thread is not interrupted. This 77 // requires a recheck in second case to deal with 78 // shutdownNow race while clearing interrupt 79 if ((runStateAtLeast(ctl.get(), STOP) || 80 (Thread.interrupted() && 81 runStateAtLeast(ctl.get(), STOP))) && 82 !wt.isInterrupted()) 83 wt.interrupt(); 84 try { 85 //子类实现 86 beforeExecute(wt, task); 87 Throwable thrown = null; 88 try { 89 task.run(); 90 } catch (RuntimeException x) { 91 thrown = x; throw x; 92 } catch (Error x) { 93 thrown = x; throw x; 94 } catch (Throwable x) { 95 thrown = x; throw new Error(x); 96 } finally { 97 //抛出异常后,依旧执行这里 98 afterExecute(task, thrown); 99 } 100 } finally { 101 task = null; 102 w.completedTasks++; 103 w.unlock(); 104 } 105 } 106 //来到这里,1.task == null 107 completedAbruptly = false; 108 } finally { 109 processWorkerExit(w, completedAbruptly); 110 } 111 } 112 113 114 115 116 117 /** 118 * Performs blocking or timed wait for a task, depending on 119 * current configuration settings, or returns null if this worker 120 * must exit because of any of: 121 * 1. There are more than maximumPoolSize workers (due to 122 * a call to setMaximumPoolSize). 123 * 2. The pool is stopped. 124 * 3. The pool is shutdown and the queue is empty. 125 * 4. This worker timed out waiting for a task, and timed-out 126 * workers are subject to termination (that is, 127 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 128 * both before and after the timed wait, and if the queue is 129 * non-empty, this worker is not the last thread in the pool. 130 * 131 * @return task, or null if the worker must exit, in which case 132 * workerCount is decremented 133 * 134 * 135 * 136 * 上面的注解是 4 种返回 null 的情况, 其中第一种的原因有没有可能是因为其他线程创建线程导致的呢? 137 * worker 必须退出,顺便数量在这里减少一 138 * 139 */ 140 private Runnable getTask() { 141 boolean timedOut = false; // Did the last poll() time out? 142 143 for (;;) { 144 int c = ctl.get(); 145 int rs = runStateOf(c); 146 147 // Check if queue empty only if necessary. 148 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 149 decrementWorkerCount(); 150 return null; 151 } 152 153 int wc = workerCountOf(c); 154 155 // Are workers subject to culling? 156 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 157 158 //自旋失败后继续 loop 直到 成功 159 if ((wc > maximumPoolSize || (timed && timedOut)) 160 && (wc > 1 || workQueue.isEmpty())) { 161 //这里自旋 162 if (compareAndDecrementWorkerCount(c)) 163 return null; 164 continue; 165 } 166 167 try { 168 //前面 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 169 //可以知道 keepAliveTime 区别于 上面两个条件 1. allowCoreThreadTimeOut 2.wc > corePoolSize 170 Runnable r = timed ? 171 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 172 workQueue.take(); 173 if (r != null) 174 return r; 175 timedOut = true; 176 // 在这里会捕获中断异常!!这里很重要,提供了假如是调用了 shutDown() 方法,线程可以退出的出口 177 } catch (InterruptedException retry) { 178 timedOut = false; 179 } 180 } 181 }
思路就是从任务队列中不断拿取任务(无任务状态下,线程阻塞),然后执行该任务。有个很奇怪的地方,runworker 中在获取任务前释放了锁,在获取到任务后再次获取锁。为什么呢?在 worker 这个类前有注解。
1 /** 2 * Class Worker mainly maintains interrupt control state for 3 * threads running tasks, along with other minor bookkeeping. 4 * This class opportunistically extends AbstractQueuedSynchronizer 5 * to simplify acquiring and releasing a lock surrounding each 6 * task execution. This protects against interrupts that are 7 * intended to wake up a worker thread waiting for a task from 8 * instead interrupting a task being run. We implement a simple 9 * non-reentrant mutual exclusion lock rather than use 10 * ReentrantLock because we do not want worker tasks to be able to 11 * reacquire the lock when they invoke pool control methods like 12 * setCorePoolSize. Additionally, to suppress interrupts until 13 * the thread actually starts running tasks, we initialize lock 14 * state to a negative value, and clear it upon start (in 15 * runWorker). 16 */
worker 使用了“中断控制状态”来维护线程运行,该类继承 AbstractQueueSynchronizer ,它的作用是当这个线程在执行任务时不被其他线程中断,而是让其他线程等待被唤醒。同时,该类使用无重入的独占互斥锁而不是 ReentrantLock ,因为我们不想在调用setCorePoolSize 重入该锁。
现在线程池就在愉快地执行任务了,假如我这时候停止线程池。
1 public void shutdown() { 2 final ReentrantLock mainLock = this.mainLock; 3 mainLock.lock(); 4 try { 5 //检查是否可以 shutdown 6 checkShutdownAccess(); 7 //设置为 SHUTDOWN 8 advanceRunState(SHUTDOWN); 9 //中断所有worker 10 interruptIdleWorkers(); 11 onShutdown(); // hook for ScheduledThreadPoolExecutor 12 } finally { 13 mainLock.unlock(); 14 } 15 //终结这个线程池 16 tryTerminate(); 17 } 18 19 20 21 /** 22 * If there is a security manager, makes sure caller has 23 * permission to shut down threads in general (see shutdownPerm). 24 * If this passes, additionally makes sure the caller is allowed 25 * to interrupt each worker thread. This might not be true even if 26 * first check passed, if the SecurityManager treats some threads 27 * specially. 28 */ 29 private void checkShutdownAccess() { 30 SecurityManager security = System.getSecurityManager(); 31 if (security != null) { 32 security.checkPermission(shutdownPerm); 33 final ReentrantLock mainLock = this.mainLock; 34 mainLock.lock(); 35 try { 36 for (Worker w : workers) 37 security.checkAccess(w.thread); 38 } finally { 39 mainLock.unlock(); 40 } 41 } 42 } 43 44 45 46 /** 47 * Transitions runState to given target, or leaves it alone if 48 * already at least the given target. 49 * 50 * @param targetState the desired state, either SHUTDOWN or STOP 51 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 52 */ 53 private void advanceRunState(int targetState) { 54 for (;;) { 55 int c = ctl.get(); 56 if (runStateAtLeast(c, targetState) || 57 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 58 break; 59 } 60 } 61 62 63 private void interruptIdleWorkers() { 64 interruptIdleWorkers(false); 65 } 66 67 68 69 /** 70 * Interrupts threads that might be waiting for tasks (as 71 * indicated by not being locked) so they can check for 72 * termination or configuration changes. Ignores 73 * SecurityExceptions (in which case some threads may remain 74 * uninterrupted). 75 * 76 * @param onlyOne If true, interrupt at most one worker. This is 77 * called only from tryTerminate when termination is otherwise 78 * enabled but there are still other workers. In this case, at 79 * most one waiting worker is interrupted to propagate shutdown 80 * signals in case all threads are currently waiting. 81 * Interrupting any arbitrary thread ensures that newly arriving 82 * workers since shutdown began will also eventually exit. 83 * To guarantee eventual termination, it suffices to always 84 * interrupt only one idle worker, but shutdown() interrupts all 85 * idle workers so that redundant workers exit promptly, not 86 * waiting for a straggler task to finish. 87 * 88 * 89 * 从方法名可以看出 : 中断空闲 workers 90 * 从下面的代码也可以看到要是传过来的参数是 false 91 * 那么所有线程将被中断 (shutDown()方法有运用到) 92 * 93 */ 94 private void interruptIdleWorkers(boolean onlyOne) { 95 final ReentrantLock mainLock = this.mainLock; 96 mainLock.lock(); 97 try { 98 99 for (Worker w : workers) { 100 Thread t = w.thread; 101 //这里 worker 的 tryLock 需要注意一下,这里要是 worker 正在执行任务(这就解释了为什么在runWorker 方法中,worker要加锁了), 102 // 那么 tryLock 返回 false, 103 if (!t.isInterrupted() && w.tryLock()) { 104 try { 105 t.interrupt(); 106 } catch (SecurityException ignore) { 107 //注意这里忽略了这个 exception 108 //所以文档中指出了有可能某些线程依旧会保持为非中断状态 109 } finally { 110 w.unlock(); 111 } 112 } 113 if (onlyOne) 114 break; 115 } 116 } finally { 117 mainLock.unlock(); 118 } 119 } 120 121 122 /** 123 * Performs any further cleanup following run state transition on 124 * invocation of shutdown. A no-op here, but used by 125 * ScheduledThreadPoolExecutor to cancel delayed tasks. 126 */ 127 void onShutdown() { 128 } 129 130 131 132 133 /** 134 * Attempts to stop all actively executing tasks, halts the 135 * processing of waiting tasks, and returns a list of the tasks 136 * that were awaiting execution. These tasks are drained (removed) 137 * from the task queue upon return from this method. 138 * 139 * <p>This method does not wait for actively executing tasks to 140 * terminate. Use {@link #awaitTermination awaitTermination} to 141 * do that. 142 * 143 * <p>There are no guarantees beyond best-effort attempts to stop 144 * processing actively executing tasks. This implementation 145 * cancels tasks via {@link Thread#interrupt}, so any task that 146 * fails to respond to interrupts may never terminate. 147 * 148 * @throws SecurityException {@inheritDoc} 149 */ 150 public List<Runnable> shutdownNow() { 151 List<Runnable> tasks; 152 final ReentrantLock mainLock = this.mainLock; 153 mainLock.lock(); 154 try { 155 checkShutdownAccess(); 156 // STOP 终于在这里发挥了作用!!S:D 157 advanceRunState(STOP); 158 interruptWorkers(); 159 //remove task 从队列中 160 tasks = drainQueue(); 161 } finally { 162 mainLock.unlock(); 163 } 164 tryTerminate(); 165 return tasks; 166 } 167
看一下 tryTerminate 方法。
1 /** 2 * Transitions to TERMINATED state if either (SHUTDOWN and pool 3 * and queue empty) or (STOP and pool empty). If otherwise 4 * eligible to terminate but workerCount is nonzero, interrupts an 5 * idle worker to ensure that shutdown signals propagate. This 6 * method must be called following any action that might make 7 * termination possible -- reducing worker count or removing tasks 8 * from the queue during shutdown. The method is non-private to 9 * allow access from ScheduledThreadPoolExecutor. 10 * 11 * 12 * 如果满足其中一个条件 : 13 * 1. SHUTDOWN 并且 workerCount为 0 并且 队列为空 14 * 2. STOP 并且 workerCount为 0 15 * 那么将状态转化为 TERMINATED ; 16 * 17 * 如果 workerCount(c)!=0 ,那么调用 interruptIdleWorkers(true); 然后就return 18 * 19 * 20 * 所以我们可以知道在线程池正常的状态的下必定直接 return 21 * 22 */ 23 final void tryTerminate() { 24 for (;;) { 25 int c = ctl.get(); 26 if (isRunning(c) || 27 runStateAtLeast(c, TIDYING) || 28 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 29 return; 30 //具备条件时执行下面 31 if (workerCountOf(c) != 0) { // Eligible to terminate 32 // 传入一个参数 true ,表示只中断一个,这是因为,每个线程当自己没任务时,肯定 33 interruptIdleWorkers(ONLY_ONE); 34 return; 35 } 36 37 final ReentrantLock mainLock = this.mainLock; 38 mainLock.lock(); 39 try { 40 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 41 try { 42 //看到了吗,介绍线程池状态时,讲到要是是 TIDYING 状态时,会调用这个钩子方法 43 terminated(); 44 } finally { 45 ctl.set(ctlOf(TERMINATED, 0)); 46 termination.signalAll(); 47 } 48 return; 49 } 50 } finally { 51 mainLock.unlock(); 52 } 53 // else retry on failed CAS 54 } 55 }
从上面我们看到要是调用了 shutDown() 或是 shutDownNow ()那么我们正阻塞在 getTask()方法的线程就会收到中断异常,于是就会getTask就会返回 null 。那么我们继续看一下线程继续向下执行的逻辑。
1 /** 2 * Performs cleanup and bookkeeping for a dying worker. Called 3 * only from worker threads. Unless completedAbruptly is set, 4 * assumes that workerCount has already been adjusted to account 5 * for exit. This method removes thread from worker set, and 6 * possibly terminates the pool or replaces the worker if either 7 * it exited due to user task exception or if fewer than 8 * corePoolSize workers are running or queue is non-empty but 9 * there are no workers. 10 * 11 * @param w the worker 12 * @param completedAbruptly if the worker died due to user exception 13 */ 14 private void processWorkerExit(Worker w, boolean completedAbruptly) { 15 if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted 16 decrementWorkerCount(); 17 18 //加锁执行 19 final ReentrantLock mainLock = this.mainLock; 20 mainLock.lock(); 21 try { 22 completedTaskCount += w.completedTasks; 23 //移除 worker 24 workers.remove(w); 25 } finally { 26 mainLock.unlock(); 27 } 28 29 tryTerminate(); 30 31 int c = ctl.get(); 32 // replacement 的意思在这里,是上面已经移除了一个 worker , 这里调用 addWorker 再补充 33 if (runStateLessThan(c, STOP)) { 34 if (!completedAbruptly) { 35 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 36 if (min == 0 && ! workQueue.isEmpty()) 37 min = 1; 38 if (workerCountOf(c) >= min) 39 return; // replacement not needed 40 } 41 addWorker(null, false); 42 } 43 } 44
下一篇我们我们讲TLE 具体衍生的不同类型的线程池。
标签:改变 poll general count adf 忽略 inter 函数 wan
原文地址:https://www.cnblogs.com/Benjious/p/10200248.html