码迷,mamicode.com
首页 > 编程语言 > 详细

Java 1.7 ThreadPoolExecutor源码解析

时间:2017-12-28 13:59:24      阅读:239      评论:0      收藏:0      [点我收藏+]

标签:illegal   统计   新建   post   blocking   并发   cer   不可   排除   

相比1.6,1.7有些变化:

1、        增加了一个TIDYING状态,这个状态是介于STOP和TERMINATED之间的,如果执行完terminated钩子函数后状态就变成TERMINATED了;

2、        内部类Worker继承了AQS类作为一个独享锁,在运行每个任务前会获取自己的锁;

3、        runState和poolSize两个字段被合并成一个原子字段ctl了,不再使用mainLock保护了。

原文转载:http://blog.csdn.net/yuenkin/article/details/51040001

一、成员变量介绍

  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     /** 
  3.      * ctl字段其实表示两个含义:runState和workerCount(近似1.6中的poolSize) 
  4.      * int类型,高3位表示runState,低29位表示workerCount。目前这个版本也就限 
  5.      * 制了线程个数不会超过2^29-1。 
  6.      * RUNNING: 能接受新的任务且能处理队列里的请求 
  7.      * SHUTDOWN: 不能接受新的任务但是能处理队列里的请求 
  8.      * STOP: 不能接受新的任务、不能处理队列里的请求,workers会被interrupt 
  9.      * TIDYING: 所有的线程都已经terminated了,正准备调用terminated()方法 
  10.      * TERMININATED: terminated()方法已经调用结束了 
  11.      *  
  12.      * RUNNING->SHUTDOWN: 调用shutdown方法 
  13.      * (RUNNING/SHUTDOWN)>STOP: 调用shutdownNow方法 
  14.      * SHUTDOWN->TIDYING: 当workers和queue都空的时候 
  15.      * STOP->TIDYING: 当workers为空的时候 
  16.      * TIDYING->TERMINATED: 当terminated方法调用结束的时候。 
  17.      * awaitTermination()直到状态为TERMINATED时才会返回。 
  18.      *  
  19.  
  20.      */  
  21.     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  
  22.     private static final int COUNT_BITS = Integer.SIZE - 3;  
  23.     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  
  24.   
  25.     // runState is stored in the high-order bits  
  26.     private static final int RUNNING    = -1 << COUNT_BITS;  
  27.     private static final int SHUTDOWN   =  0 << COUNT_BITS;  
  28.     private static final int STOP       =  1 << COUNT_BITS;  
  29.     private static final int TIDYING    =  2 << COUNT_BITS;  
  30.     private static final int TERMINATED =  3 << COUNT_BITS;  
  31.   
  32.     // 取ctl的高三位,获取runState(运行状态)  
  33. private static int runStateOf(int c)     { return c & ~CAPACITY; }  
  34. // 取ctl的低29位,获取workerCount(worker的数量)  
  35. private static int workerCountOf(int c)  { return c & CAPACITY; }  
  36. // 把runState和workerCount合并成ctl,上面两个函数的反操作  
  37.     private static int ctlOf(int rs, int wc) { return rs | wc; }  

二、execute函数

  1. public void execute(Runnable command) {  
  2.         if (command == null)  
  3.             throw new NullPointerException();  
  4.         /* 
  5.          * 三步走: 
  6.          * 1. 如果RUNNING的线程数目小于corePoolSize,直接调用addWorker方法 
  7.          * 启动一个新线程。addWorker函数会检查runState和workerCount,如果不 
  8.          * 需要新建一个thread就会返回false了 
  9.          *  
  10.          * 2. 如果任务被成功的放入了workQueue,我们仍然需要做个double-check 
  11.          * 因为调用完isRunning(c)后池中的线程可能都退出了或者线程池被shut  
  12.          * down了。重新检查状态看是要remove掉新来的任务还是创建一个新线程来执 
  13.          * 行(如果没有活动的线程了) 
  14.          *  
  15.          * 3. 如果放入workQueue失败了,我们尝试创建一个新worker。如果失败了, 
  16.          * 说明线程池被关闭了或者饱和了(超过最大值了),就直接拒了。 
  17.          *  
  18.          */  
  19.         int c = ctl.get();  
  20.         if (workerCountOf(c) < corePoolSize) {  
  21. // addWorker有可能会失败,失败后重新获取状态并继续往下走  
  22.             if (addWorker(command, true))  
  23.                 return;  
  24.             c = ctl.get();  
  25.         }  
  26.         if (isRunning(c) && workQueue.offer(command)) {  
  27.             int recheck = ctl.get();  
  28. // 如果isRunning(c)&&workQueue.offer中间并发发生了shutdown,需要remove  
  29. // 掉刚放入workQueue的command任务。注意:此时如果有一个worker刚执行完一个task  
  30. // 然后从workQueue获取下一个task时,这里的remove就会失败了。  
  31.             if (! isRunning(recheck) && remove(command))  
  32.                 reject(command);  
  33. // 如果是RUNNING状态但是没有可工作的线程,需要直接new一个  
  34.             else if (workerCountOf(recheck) == 0)  
  35.                 addWorker(null, false);  
  36.         }  
  37.         else if (!addWorker(command, false))  
  38.             reject(command);  
  39.     }  

execute函数大体思路和1.6一致,就三种情况:

①          当前线程池中线程数目小于corePoolSize,直接new一个thread;

②          当先线程池数据大于corePoolSize,则放入workQueue中;

③          如果workQueue满了且线程池中线程数小于maximumPoolSize,则new一个thread。

  1. private boolean addWorker(Runnable firstTask, boolean core) {  
  2.         retry:  
  3.         for (;;) {  
  4.             int c = ctl.get();  
  5.             int rs = runStateOf(c);  
  6.   
  7. // 如果被shutdown了,一般就直接返回false。但是需要排除一个特例情况:当线程池状  
  8. // 态是shutdown,但workQueue不空且workers空了,会调用addWorker(null,false)  
  9. // 方法创建一个线程处理workQueue里的任务,这时不能直接返回false。  
  10.             if (rs >= SHUTDOWN &&  
  11.                 ! (rs == SHUTDOWN &&  
  12.                    firstTask == null &&  
  13.                    ! workQueue.isEmpty()))  
  14.                 return false;  
  15.   
  16.             for (;;) {  
  17.                 int wc = workerCountOf(c);  
  18. // 如果当前workers数目大于CAPACITY或者大于用户设置了,直接返回false  
  19.                 if (wc >= CAPACITY ||  
  20.                     wc >= (core ? corePoolSize : maximumPoolSize))  
  21.                     return false;  
  22.                 if (compareAndIncrementWorkerCount(c))  
  23.                     break retry;  
  24.                 c = ctl.get();  // Re-read ctl  
  25. // 如果仅仅是workerCount变化了,那么继续内层的循环;如果连runState也变化了,  
  26. // 则要重新继续外层的循环。  
  27.                 if (runStateOf(c) != rs)  
  28.                     continue retry;  
  29.             }  
  30.         }  
  31.   
  32.         boolean workerStarted = false;  
  33.         boolean workerAdded = false;  
  34.         Worker w = null;  
  35.         try {  
  36.             final ReentrantLock mainLock = this.mainLock;  
  37.             w = new Worker(firstTask);  
  38.             final Thread t = w.thread;  
  39.             if (t != null) {  
  40.                 mainLock.lock();  
  41.                 try {  
  42.                     // Recheck while holding lock.  
  43.                     // Back out on ThreadFactory failure or if  
  44.                     // shut down before lock acquired.  
  45.                     int c = ctl.get();  
  46.                     int rs = runStateOf(c);  
  47. // 再次检查runState的状态,如果是RUNNING或者SHUTDOWN但是firstTask不空,则  
  48. // 把new出来的worker放入workers中。  
  49.                     if (rs < SHUTDOWN ||  
  50.                         (rs == SHUTDOWN && firstTask == null)) {  
  51.                         if (t.isAlive()) // precheck that t is startable  
  52.                             throw new IllegalThreadStateException();  
  53.                         workers.add(w);  
  54.                         int s = workers.size();  
  55.                         if (s > largestPoolSize)  
  56.                             largestPoolSize = s;  
  57.                         workerAdded = true;  
  58.                     }  
  59.                 } finally {  
  60.                     mainLock.unlock();  
  61.                 }  
  62.                 if (workerAdded) {  
  63. // 创建worker成功后直接启动线程了  
  64.                     t.start();  
  65.                     workerStarted = true;  
  66.                 }  
  67.             }  
  68.         } finally {  
  69.             if (! workerStarted)  
  70. // 创建失败要做清理操作  
  71.                 addWorkerFailed(w);  
  72.         }  
  73.         return workerStarted;  
  74.     }  

addWorker函数尝试新建一个thread来运行传递给它的task。当线程池被STOP或SHUTDOWN或threadFactory返 回null时或者OOM时,会返回false并做相应的清理。整个过程分为两步:1、尝试设置workerCount,成功了就到步骤2;2、尝试创建一 个worker并加入到workers里。

  1. private void addWorkerFailed(Worker w) {  
  2.         final ReentrantLock mainLock = this.mainLock;  
  3.         mainLock.lock();  
  4.         try {  
  5.             if (w != null)  
  6.                 workers.remove(w);  
  7.             decrementWorkerCount();  
  8.             tryTerminate();  
  9.         } finally {  
  10.             mainLock.unlock();  
  11.         }  
  12.     }  

  addWorkerFailed函数做些清理操作:1、把创建的worker从workers中删除;2、把workerCount减1;3、检查是否可以terminated线程池,防止这个worker的存在导致执行awaitTermination操作的客户端线程阻塞了。

  1.   final void tryTerminate() {  
  2.         for (;;) {  
  3.             int c = ctl.get();  
  4. // 如果是以下三种情况直接返回:  
  5. // 1.RUNNING状态; 2.runState>=TIDYING,说明有其他线程执行了tryTerminate操  
  6. // 作; 3.SHUTDOWN状态且workQueue不空  
  7.             if (isRunning(c) ||  
  8.                 runStateAtLeast(c, TIDYING) ||  
  9.                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  
  10.                 return;  
  11. // 如果workerCount大于0,则中断一个空闲的worker,就返回了。为啥只中断一个呢?  
  12. // 因为worker线程退出时也会调用tryTerminate方法(一个接一个的传播)  
  13.             if (workerCountOf(c) != 0) { // Eligible to terminate  
  14.                 interruptIdleWorkers(ONLY_ONE);  
  15.                 return;  
  16.             }  
  17.   
  18.             final ReentrantLock mainLock = this.mainLock;  
  19.             mainLock.lock();  
  20.             try {  
  21. // 走到这里说明workers数量为0了,尝试把线程池状态改成TIDYING并调用terminated  
  22. // 函数->状态再设置成TERMINATED。如果设置TIDYING失败,则继续循环。  
  23.                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {  
  24.                     try {  
  25.                         terminated();  
  26.                     } finally {  
  27. // terminated函数抛异常也需要执行下面的操作。  
  28.                         ctl.set(ctlOf(TERMINATED, 0));  
  29.                         termination.signalAll();  
  30.                     }  
  31.                     return;  
  32.                 }  
  33.             } finally {  
  34.                 mainLock.unlock();  
  35.             }  
  36.             // else retry on failed CAS  
  37.         }  
  38.     }  

tryTerminate函数尝试TERMINATED线程 池(当a、SHUTDOWN且queue和pool都空;b、STOP且queue为空了)。如果workers不为0,则中断任意一个空闲的 worker后直接返回。否则:首先,将线程池状态改成TIDYING;其次,调用用户的钩子函数terminated;最后,将状态设置成 TERMINATED。

  1. private void interruptIdleWorkers(boolean onlyOne) {  
  2.         final ReentrantLock mainLock = this.mainLock;  
  3.         mainLock.lock();  
  4.         try {  
  5.             for (Worker w : workers) {  
  6.                 Thread t = w.thread;  
  7. // 如果tryLock成功,就说明这个worker是空闲的。  
  8.                 if (!t.isInterrupted() && w.tryLock()) {  
  9.                     try {  
  10.                         t.interrupt();  
  11.                     } catch (SecurityException ignore) {  
  12.                     } finally {  
  13.                         w.unlock();  
  14.                     }  
  15.                 }  
  16.                 if (onlyOne)  
  17. // 如果只中断一个就break,只有tryTerminate函数中使用到这种情况。  
  18.                     break;  
  19.             }  
  20.         } finally {  
  21.             mainLock.unlock();  
  22.         }  
  23.     }  

interruptIdleWorkers函数根据onlyOne参数决定中断一个或所有空闲的workers(这些workers都阻塞在getTask方法中)。

三、shutdown函数

  1. public void shutdown() {  
  2.         final ReentrantLock mainLock = this.mainLock;  
  3.         mainLock.lock();  
  4.         try {  
  5. // 检查调用者是否有权限执行shutdown  
  6.             checkShutdownAccess();  
  7. // 将线程池的状态改成SHUTDOWN  
  8.             advanceRunState(SHUTDOWN);  
  9. // 中断所有空闲的workers  
  10.             interruptIdleWorkers();  
  11.             onShutdown(); // hook for ScheduledThreadPoolExecutor  
  12.         } finally {  
  13.             mainLock.unlock();  
  14.         }  
  15. // 尝试终止线程池  
  16.         tryTerminate();  
  17.     }  

shutdown函数就执行几步:把状态改成SHUTDOWN,中断所有空闲的workers,调用onShutdown钩子函数,最后调用tryTerminate尝试终止线程池。

  1. private void advanceRunState(int targetState) {  
  2.         for (;;) {  
  3.             int c = ctl.get();  
  4.             if (runStateAtLeast(c, targetState) ||  
  5.                 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))  
  6.                 break;  
  7.         }  
  8.     }  

advanceRunState函数将线程池的状态改成指定状态值,如果现在状态值比target值大就直接返回。targeState的值是SHUTDOWN或者STOP,不能是TIDYING或者TERMINATED(这两种状态需要调用tryTerminate函数设置)。

四、shutdownNow函数

  1. public List<Runnable> shutdownNow() {  
  2.         List<Runnable> tasks;  
  3.         final ReentrantLock mainLock = this.mainLock;  
  4.         mainLock.lock();  
  5.         try {  
  6. // 检查调用者是否有权限执行关闭  
  7.             checkShutdownAccess();  
  8. // 将线程池的状态改成STOP  
  9.             advanceRunState(STOP);  
  10. // 和shutdown不同,这里中断所有的worker线程  
  11.             interruptWorkers();  
  12. // 删除workQueue里的任务并返回任务列表  
  13.             tasks = drainQueue();  
  14.         } finally {  
  15.             mainLock.unlock();  
  16.         }  
  17. // 尝试终止线程池  
  18.         tryTerminate();  
  19.         return tasks;  
  20.     }  

shutdownNow函数会中断所有的worker线程,删除workQueue里的任务,最后尝试终止线程池并返回workQueue里的任务。

  1. private void interruptWorkers() {  
  2.         final ReentrantLock mainLock = this.mainLock;  
  3.         mainLock.lock();  
  4.         try {  
  5. // 中断所有的worker线程  
  6.             for (Worker w : workers)  
  7.                 w.interruptIfStarted();  
  8.         } finally {  
  9.             mainLock.unlock();  
  10.         }  
  11.     }  
  1. private List<Runnable> drainQueue() {  
  2.         BlockingQueue<Runnable> q = workQueue;  
  3.         List<Runnable> taskList = new ArrayList<Runnable>();  
  4.         q.drainTo(taskList);  
  5.         if (!q.isEmpty()) {  
  6.             for (Runnable r : q.toArray(new Runnable[0])) {  
  7.                 if (q.remove(r))  
  8.                     taskList.add(r);  
  9.             }  
  10.         }  
  11.         return taskList;  
  12.     }  

五、Worker内部类

  1. private final class Worker  
  2.         extends AbstractQueuedSynchronizer  
  3.         implements Runnable  
  4.     {  
  5.         /** Thread this worker is running in.  Null if factory fails. */  
  6.         final Thread thread;  
  7.         /** Initial task to run.  Possibly null. */  
  8.         Runnable firstTask;  
  9.         /** Per-thread task counter */  
  10.         volatile long completedTasks;  
  11.   
  12.         /** 
  13.          * Creates with given first task and thread from ThreadFactory. 
  14.          * @param firstTask the first task (null if none) 
  15.          */  
  16.         Worker(Runnable firstTask) {  
  17. // 初始值为-1,防止worker还没启动就被interrupt了;在start开始时会将状态改成0  
  18.             setState(-1); // inhibit interrupts until runWorker  
  19.             this.firstTask = firstTask;  
  20.             this.thread = getThreadFactory().newThread(this);  
  21.         }  
  22. protected boolean isHeldExclusively() {  
  23.             return getState() != 0;  
  24.         }  
  25.   
  26.         protected boolean tryAcquire(int unused) {  
  27.             if (compareAndSetState(0, 1)) {  
  28.                 setExclusiveOwnerThread(Thread.currentThread());  
  29.                 return true;  
  30.             }  
  31.             return false;  
  32.         }  
  33.   
  34.         protected boolean tryRelease(int unused) {  
  35.             setExclusiveOwnerThread(null);  
  36.             setState(0);  
  37.             return true;  
  38.         }  
  39. // 参数1没有意义,是独占锁  
  40.         public void lock()        { acquire(1); }  
  41.         public boolean tryLock()  { return tryAcquire(1); }  
  42.         public void unlock()      { release(1); }  
  43.         public boolean isLocked() { return isHeldExclusively(); }  

Worker类主要维护着中断的管理和其他操作(runWorker函数),继承了AQS类实现了一个不可重入的Lock,在获取到一个任务后,准备执行前首先要获取这个锁。同时,在中断空闲的worker时也要先获取到这个锁。

  1. public void run() {  
  2.             runWorker(this);  
  3.         }  
  1. final void runWorker(Worker w) {  
  2.         Thread wt = Thread.currentThread();  
  3. // 有时我们不想从workQueue取第一个任务,直接执行刚提交的任务  
  4.         Runnable task = w.firstTask;  
  5.         w.firstTask = null;  
  6. // 把state设置成0,允许中断  
  7.         w.unlock(); // allow interrupts  
  8.         boolean completedAbruptly = true;  
  9.         try {  
  10. // 进入循环了  
  11.             while (task != null || (task = getTask()) != null) {  
  12.                 w.lock();  
  13.                 // 如果是STOP状态,需要保证线程是被中断了的;  
  14.                 // 如果不是需要清空中断状态,但是需要重新检查下状态防止在清除  
  15.                 // 中断时发生了shutdownNow  
  16.                 if ((runStateAtLeast(ctl.get(), STOP) ||  
  17.                      (Thread.interrupted() &&  
  18.                       runStateAtLeast(ctl.get(), STOP))) &&  
  19.                     !wt.isInterrupted())  
  20.                     wt.interrupt();  
  21.                 try {  
  22. // 执行前的钩子函数  
  23.                     beforeExecute(wt, task);  
  24.                     Throwable thrown = null;  
  25.                     try {  
  26.                         task.run();  
  27.                     } catch (RuntimeException x) {  
  28.                         thrown = x; throw x;  
  29.                     } catch (Error x) {  
  30.                         thrown = x; throw x;  
  31.                     } catch (Throwable x) {  
  32.                         thrown = x; throw new Error(x);  
  33.                     } finally {  
  34. // 执行后的钩子函数  
  35.                         afterExecute(task, thrown);  
  36.                     }  
  37.                 } finally {  
  38.                     task = null;  
  39.                     w.completedTasks++;  
  40.                     w.unlock();  
  41.                 }  
  42.             }  
  43.             completedAbruptly = false;  
  44.         } finally {  
  45.             processWorkerExit(w, completedAbruptly);  
  46.         }  
  47.     }  

runWorker函数循环从workQueue里获取 task并执行,但是需要注意以下几个问题:1.如果不想从workQueue里获取第一个任务执行,那就给worker.firstTask赋值。2、 如果getTask获取的值为null,或者你的task里抛异常了,那循环就退出了,然后worker线程也就退出了。3、在执行任务前先要获取 worker的锁,这里防止中断正在执行的线程。4、如果你的钩子函数beforeExecute函数抛异常了,那么你的任务就不会被执行 了,worker线程也会退出。5、如果task.run方法抛出Runtime或Error异常,会原样抛出,如果是Throwable,则会包装成一 个Error抛出,抛出异常前会执行afterExecute钩子函数,最后线程会退出。6、如果afterExecute钩子函数抛出异常,那么 worker线程也会退出。

  1. private Runnable getTask() {  
  2.         boolean timedOut = false; // Did the last poll() time out?  
  3.   
  4.         retry:  
  5.         for (;;) {  
  6.             int c = ctl.get();  
  7.             int rs = runStateOf(c);  
  8.   
  9.             // 如果SHUTDOWN且workQueue为空,或者STOP了,worker线程直接退出  
  10.             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  
  11.                 decrementWorkerCount();  
  12.                 return null;  
  13.             }  
  14. // 是否要回收这个worker线程?  
  15.             boolean timed;      // Are workers subject to culling?  
  16.   
  17.             for (;;) {  
  18.                 int wc = workerCountOf(c);  
  19.                 timed = allowCoreThreadTimeOut || wc > corePoolSize;  
  20. // 如果还没有超时过(循环第一次执行到这里)直接break  
  21.                 if (wc <= maximumPoolSize && ! (timedOut && timed))  
  22.                     break;  
  23. // 否则,如果线程数大于最大限制或者已经超时过了说明这个worker线程要准备退出了  
  24. // 先设置workerCount-1,成功的话直接退出;否则,看下runState是否和rs一样,如  
  25. // 果一样就在内部循环,不一样就要到外部循环  
  26.                 if (compareAndDecrementWorkerCount(c))  
  27.                     return null;  
  28.                 c = ctl.get();  // Re-read ctl  
  29.                 if (runStateOf(c) != rs)  
  30.                     continue retry;  
  31.                 // else CAS failed due to workerCount change; retry inner loop  
  32.             }  
  33.   
  34.             try {  
  35. // 无限阻塞或超时阻塞  
  36.                 Runnable r = timed ?  
  37.                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
  38.                     workQueue.take();  
  39.                 if (r != null)  
  40.                     return r;  
  41. // 没有获取到task肯定是超时了  
  42.                 timedOut = true;  
  43.             } catch (InterruptedException retry) {  
  44. // 如果被中断了,不能算作超时  
  45.                 timedOut = false;  
  46.             }  
  47.         }  
  48.     }  

getTask函数是从workQueue里获取一个task,有两种策略(无限阻塞或者超时,具体要看客户端的配置)。如果这个函数返回了null,那么worker线程就会退出了。退出的原因不外乎以下几种:

1.   当前线程池中worker数量大于maximumPoolSize了;

2.   线程池被STOP了(workQueue.poll/take时会捕获到InterruptedException异常);

3.   线程池被SHUTDOWN了且workQueue为空(workQueue.poll/take时会捕获到InterruptedException异常);

4.   获取task超时了(timedOut)&&(timed)。

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {  
  2. // 用户的函数抛异常了,需要调整workerCount的值,因为worker线程准备退出了  
  3.         if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted  
  4.             decrementWorkerCount();  
  5.   
  6. // 做些统计操作(bookkeeping)  
  7.         final ReentrantLock mainLock = this.mainLock;  
  8.         mainLock.lock();  
  9.         try {  
  10.             completedTaskCount += w.completedTasks;  
  11.             workers.remove(w);  
  12.         } finally {  
  13.             mainLock.unlock();  
  14.         }  
  15. // 尝试终止线程池  
  16.         tryTerminate();  
  17.   
  18.         int c = ctl.get();  
  19. // 如果是RUNNING或SHUTDOWN状态,要看下workQueue是否为空,  
  20. // 不能直接退出。如果workQueue不空,至少要保留1或corePoolSize个  
  21. // 线程(看allowCoreThreadTimeOut配置)。少于这个数目,就需要通过  
  22. // addWorker(null,false)方法补充新的线程进来。  
  23.         if (runStateLessThan(c, STOP)) {  
  24.             if (!completedAbruptly) {  
  25.                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;  
  26.                 if (min == 0 && ! workQueue.isEmpty())  
  27.                     min = 1;  
  28.                 if (workerCountOf(c) >= min)  
  29.                     return; // replacement not needed  
  30.             }  
  31.             addWorker(null, false);  
  32.         }  
  33.     }  

processWorkerExit函数是在runWork 循环退出后做的清理和bookkeeping(应该就是指completedTaskCount等变量的操作吧)操作。 completedAbruptly参数的含义是指用户的函数是否抛异常了(before/after/run等)。注意下函数最后会根据线程池的状态和 配置决定是否新建一个worker线程。

Java 1.7 ThreadPoolExecutor源码解析

标签:illegal   统计   新建   post   blocking   并发   cer   不可   排除   

原文地址:https://www.cnblogs.com/AndyAo/p/8135063.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!