标签:最小 advance 实现 越来越大 exec 保存 总线 启用 target
public class ThreadPoolExecutor1 extends AbstractExecutorService1 { // 11100000000000000000000000000000 = -536870912, 高3位表示线程池状态, 后29位表示线程个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;// 29 // 2^29-1 = 00011111 11111111 11111111 11111111 = 536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // -1 = 11111111 11111111 11111111 11111111 // 11100000 00000000 00000000 00000000 = -536870912,运行状态 private static final int RUNNING = -1 << COUNT_BITS; // 00000000000000000000000000000000,关闭状态 private static final int SHUTDOWN = 0 << COUNT_BITS; // 2^29 = 00100000 00000000 00000000 00000000 = 536870912,停止状态 private static final int STOP = 1 << COUNT_BITS; // 2^30 = 01000000 00000000 00000000 00000000 = 1073741824,整理状态 private static final int TIDYING = 2 << COUNT_BITS; // 2^29+2^30,3*2^29,2^29+2^29+2^29 = 01100000 00000000 00000000 00000000 = 1610612736,终止状态 private static final int TERMINATED = 3 << COUNT_BITS; // C的高3位,高3位表示线程池的运行状态。 //111RUNNING:运行中,接受新任务处理队列中的任务。 //000SHUTDOWN:不接收新任务处理队列中的任务; //001STOP:不接收新任务也不处理队列中的任务还中断正在运行的任务; //010TIDYING:所有的任务都已经终止;011TERMINATED:terminated()方法已经执行完成 private static int runStateOf(int c) { return c & ~CAPACITY;//11100000 00000000 00000000 00000000 } //C的低29位,跟00011111 11111111 11111111 11111111比较,低29位表示线程池中线程数,最大2^29-1。 private static int workerCountOf(int c) {//对2^29取余,ctl加了多少次1,最大加2^29-1次。表示多少个worker已经在运行了。 return c & CAPACITY;//00011111 11111111 11111111 11111111 } private static int ctlOf(int rs, int wc) {//rs是状态值,wc是worker线程数量,进行或操作,就是修改状态,但是不改变数量。 return rs | wc; } private static boolean runStateLessThan(int c, int s) {//ctl是不是小于某个状态值, return c < s; } /*ctl从11100000000000000000000000000000=-536870912开始,慢慢加1,一直越来越大,最后=111111111111111111111111=-1 worker线程数量最大2^29-1=536870911个,就不能再增加了,所以ctl的范围是(-536870912,-1)一直小于0*/ /*SHUTDOWN=0, STOP=001=2^29=536870912, TIDYING=010=2^30=1073741824 RUNNING=111=-536870912 TERMINATED=011=3*2^29=1610612736 */ private static boolean runStateAtLeast(int c, int s) {//ctl是不是大于某个状态值 return c >= s;//RUNNING:111 SHUTDOWN:000 STOP:001 TIDYING:010 TERMINATED:011 } private static boolean isRunning(int c) {//RUNNING正常状态,ctl(-536870912,-1)一直小于0,小于0就是正常 return c < SHUTDOWN; } //ctl+1 private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } //ctl-1 private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } //ctl-1直到成功。 private void decrementWorkerCount() { do { } while (!compareAndDecrementWorkerCount(ctl.get())); } private final HashSet1<Worker> workers = new HashSet1<Worker>(); private final BlockingQueue<Runnable> workQueue;//队列。先加到workers里面去,workers满了就加到workQueue里去。 private final ReentrantLock mainLock = new ReentrantLock();//锁住workers private final Condition termination = mainLock.newCondition(); //跟踪获得的最大池大小。仅在主锁下访问。 private int largestPoolSize; //已完成任务总数 private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler1 handler; private volatile long keepAliveTime;//闲置线程存活时间,闲置线程在阻塞等待队列, private volatile boolean allowCoreThreadTimeOut;//等待队列时候是否启用keepAliveTime超时 //核心池大小 private volatile int corePoolSize; //最大池大小。 受CAPACITY限制。 private volatile int maximumPoolSize; private static final RejectedExecutionHandler1 defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler1 { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {//抛出异常,不是什么都不做 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); //执行终结器时要使用的上下文, private final AccessControlContext acc; private final class Worker extends AbstractQueuedSynchronizer1 implements Runnable {//是一个AQS和runnable任务,Worker里面的方法只要里面的一个线程访问, //这个类永远不会被序列化,但是我们提供了一个serialversionID来禁止javac警告。 private static final long serialVersionUID = 6138294804551838833L; //this worker 正在运行在的Thread。线程池里面创建的线程,不是外部线程。 final Thread thread; //要运行的初始任务。 Runnable firstTask; //这个worker完成的任务 volatile long completedTasks; //一个worker就是一个任务。同时是一个AQS队列,同时是一个runnable Worker(Runnable firstTask) { //初始设置为-1来抑制中断方法的执行。unlock才变为0。只有在runWorker()方法里面先unlock置为0其他方法才能获取锁。 //interruptIdleWorkers方法来tryLock获取锁来中断时候,是中断不了的。runWorker方法运行时才表明关联线程已启动,这时去中断关联线程才有意义, //lock()方法也执行不了,只能先执行unlock()方法,才能去获取锁。 setState(-1); this.firstTask = firstTask;//executorService1.execute(new Runnable() = firstTask) this.thread = getThreadFactory().newThread(this);//this = ThreadPoolExecutor1$Worker,返回new Thread(ThreadPoolExecutor1$Worker) } public void run() { try { runWorker(this);//this = ThreadPoolExecutor1$Worker。这个方法在自己的thread里面运行,不会有多线程问题 } catch (InterruptedException e) { e.printStackTrace(); } } protected boolean isHeldExclusively() { return getState() != 0;//true:有人获取锁,false:没人获取锁 } //worker类继承自AQS并实现了自己的加锁解锁方法, protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) {//尝试获取锁 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false;//排AQS队 } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0);//state初始等于-1, return true; } public void lock() throws InterruptedException {//会去排队 acquire(1); } public boolean tryLock() {//不会去排队 return tryAcquire(1); } public void unlock() { release(1);//释放锁,并且唤醒head中的第一个 } public boolean isLocked() { return isHeldExclusively(); } //中断worker关联线程 void interruptIfStarted() { Thread t; //state=-1不能中断,说明还没有运行起来 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } /* 1.advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN=00000000000 第一个条件c>=0000000,说明ctl不是在正常的增加(正常增加都是小于0),c=SHUTDOWN000xxxx|STOP001xxx|TIDYING010xxx|TERMINATED011xxx返回true。就什么都不做。 c=RUNNING111xxxxx就返回false,第二个条件吧ctl变成000xxxxx 也就是说:SHUTDOWN000xxxxx|STOP001xxxxx|TIDYING010xxxxx|TERMINATED011xxxxx不能变为shutdown状态,running111xxxxx状态可以变为shutdown状态。 2.advanceRunState(STOP);//更新状态为STOP00100000000 第一个条件c>=00100000000,说明ctl已经不正常了,c=TIDYING010xxxxx|TERMINATED011xxxxx|STOP001xxxxx,返回true。就什么都不做。 c=RUNNING111xxxxx|SHUTDWON000xxxxx返回false。第二个条件吧ctl变成001xxxxx 也就是说:TIDYING010|TERMINATED011|STOP001不能变为STOP状态,running状态|SHUTDOWN可以变为STOP状态。 */ //状态是不可逆的,如果跑到前面的状态了,就不动,否则修改。 private void advanceRunState(int targetState) {//更新状态 for (;;) { int c = ctl.get(); //c>=targetState就不修改状态。c<targetState就去修改状态。 if (runStateAtLeast(c, targetState) || //ctlOf(状态值,数量值)进行或操作,就是修改状态,但是不改变数量 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } //尝试终止线程池 final void tryTerminate() { for (;;) {//死循环 int c = ctl.get(); //以下两种情况终止线程池,其他情况直接返回: //1.状态为stop //2.状态为shutdown且任务队列为空 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; //false&&false&&(false&&):c>=0&&c<010&&c!=0:c=STOP状态 //false&&false&&(true&&false):c>=0&&c<010&&c=0&&队列空:c=SHUTDOWN状态队列空 //若线程不为空则中断一个闲置线程后直接返回 if (workerCountOf(c) != 0) { // worker数量不等于0 interruptIdleWorkers(ONLY_ONE);//中断线程,是否只中断一次 return; } //worker数量等于0,并且 处于stop状态或者SHUTDOWN状态队列空 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//设置状态为TIDYING=010 try {//线程池终止后做的事情 terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0));//设置状态为TERMINATED=011 termination.signalAll();//唤醒条件队列所有线程 } return; } } finally { mainLock.unlock(); } //状态设置失败则再重试 } } /* 调用该方法来尝试终止线程池,在进入for循环后第一个if判断过滤了不符合条件的终止操作,只有状态为stop, 或者状态为shutdown且任务队列为空这两种情况才能继续执行。 第二个if语句判断工作者数量是否为0,不为0的话也直接返回。经过这两重判断之后才符合终止线程池的条件, 于是先通过CAS操作将线程池状态设置为tidying状态,在tidying状态会调用用户自己实现的terminated()方法来做一些处理。 到了这一步,不管terminated()方法是否成功执行最后都会将线程池状态设置为terminated,也就标志着线程池真正意义上的终止了。 最后会唤醒所有等待线程池终止的线程,让它们继续执行 */ private void checkShutdownAccess() { SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) security.checkAccess(w.thread);//检查每一个线程的权限。线程池的线程全在workers里面的worker里面的thread上面。 } finally { mainLock.unlock(); } } } //中断所有线程,即使是活动的。忽略SecurityExceptions(在这种情况下,某些线程可能保持不间断)。 private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted();//中断workers的线程,加锁防止workers变化。 } finally { mainLock.unlock(); } } private void interruptIdleWorkers(boolean onlyOne) {//中断worker的线程 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) {//没有中断并且,获取锁成功,tryLock不会阻塞排队,只会把state=0变成1, //前提是没有线程获取w里面的锁。就可以中断w里面的线程。 其他线程要想操作w就要先获取锁。 //worker每处理一个任务,会加锁一次解锁一次。 try { t.interrupt();//Worker的线程中断,加锁防止workers变化。 } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } //中断空闲worker。中断可以获取锁的worker里面的线程,就是worker w还没有调用lock方法的worker。 private void interruptIdleWorkers() { interruptIdleWorkers(false); } private static final boolean ONLY_ONE = true; final void reject(Runnable command) { handler.rejectedExecution(command, this); } void onShutdown() { } //RUNNING返回true,SHUTDOWN并且shutdownOK=true返回true,SHUTDOWN并且shutdownOK=false返回false, final boolean isRunningOrShutdown(boolean shutdownOK) { int rs = runStateOf(ctl.get()); return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); } private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r))//从workqueeu中移除 taskList.add(r);//添加到taskList } } return taskList; } /*addWorker本事只是为线程池添加一个Worker,其本身所做的事情其实很简单,但难就难在要确保安全有效得添加一个Worker。 为此addWorker()方法做了很多额外的工作。比如判断线程池的运行状态,当前Worker数量是否已经饱和等等。可以发现在这个方法, 或者说整个ThreadPoolExecutor中,很多时候都是使用双重检查的方式来对线程池状态进行检查。其实这都是为了效率, 最简单不过直接使用Synchronized或ReentranLock进行同步,但这样效率会低很多,所以在这里, 只有在万不得已的情况下,才会使用悲观的ReentranLock。*/ //添加任务,executorService1.execute(new Runnable() = firstTask),任意个线程并发, private boolean addWorker(Runnable firstTask, boolean core) {//core是不是核心线程 //两层循环,外层循环判断线程池状态,状态不符合就return,内层循环判断线程数,线程数超过限定值return。 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//前3位不变,后面29位=0。 //c处于RUNNING状态,c=111xxxxxxxxx,rs=111 0000000000000000 //c处于SHUTDOWN状态,c=000xxxxxxxxx,rs=000 0000000000000000 //c处于STOP状态,c=001xxxxxxxxx,rs=001 0000000000000000 //c处于TIDYING状态,c=010xxxxxxxxx,rs=010 0000000000000000 //c处于TERMINATED状态,c=011xxxxxxxxx,rs=011 0000000000000000 //状态判断。rs后面全是0,ctl后面不是0。 //rs正常运行小于0,SHUTDOWN=0,其他STOP,TIDYING,TERMINATED都是大于0 //true&&!(false):rs>=0(不是RUNNING状态)&& rs != SHUTDOWN=000:rs>0处于STOP,TIDYING,TERMINATED //true&&!(true&&false):rs>=0&&rs=0&&firstTask!=null:SHUTDOWN状态&&有第一个任务 //true&&!(true&&true&&false):rs>=0&&rs=0&&firstTask=null&&workQueue=null:SHUTDOWN状态&&没有第一个任务&&队列空 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; //只有以下两种情况会继续添加线程 //1.false:rs<0(处于RUNNING状态), //2.true&&!(true&&true&&true):rs>=0&&rs=0&&firstTask=null&&workQueue!=null:shutdown状态,首任务空,队列还有任务。其他地方调用addWorker(null,true|false) for (;;) { int wc = workerCountOf(c);//已经运行的worker线程,worker线程数目大于CAPACITY=2^29-1就不嗯能够再加worker线程数目了。 //以下三种情况不继续添加线程: //1.线程数大于线程池总容量 //2.当前线程为核心线程,且核心线程数达到corePoolSize //3.当前线程非核心线程,且总线程达到maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;//不开worker线程,加到队列里面去, if (compareAndIncrementWorkerCount(c))//ctl加一成功 break retry;//加1成功就退出 c = ctl.get(); //ctl加一失败, if (runStateOf(c) != rs)//状态没变重新加1。 continue retry;//状态变了重新获取状态。 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //executorService1.execute(new Runnable() = firstTask),Worker里面有一个firstTask和Thread,Thread里面有这个Worker w。 w = new Worker(firstTask);//firstTask可以为null //Worker里面封装了这个任务,并且实例化了一个线程,总共3个Worker,3个任务(3个第一个任务),3个线程。线程run时候就开一个线程去执行Worker里的第一个任务,线程Thread run的时候,是Thread //里面的Runnable去run,所以Worker要设置成这个Thread的Runnable,然后让Worker去run(转调外部类的run方法,把worker传进去)。Worker里面仅仅保存的是这个worker的第一个任务,第一个任务执行完会死循环执行queue队列的任务。 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//单线程进来,其余阻塞 try { // 保持锁止时重新检查。如果线程发生故障或在获取锁之前关闭,请退出。 int rs = runStateOf(ctl.get()); //true|:running状态,增加worker //false|true:shutdown状态并且task=null,增加worker if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //如果线程已经开启则抛出异常 throw new IllegalThreadStateException(); workers.add(w);//Worker加到workers里面去,多线程并发要加锁 int s = workers.size(); if (s > largestPoolSize)//记录线程达到的最大值 largestPoolSize = s; workerAdded = true;//添加成功 } //false|(fasle|):STOP|TIDYING|TERMINATER:不增加worker //false|(true|false):SHUTDOWN,task不为null:不增加worker } finally { mainLock.unlock(); } if (workerAdded) {//添加成功 t.start();//线程123执行start(),就会去执行Worker的run方法。 workerStarted = true;//启动成功 } } } finally { if (!workerStarted)//线程启动成功 addWorkerFailed(w);//新建worker失败 } return workerStarted;//是否启动成功 } //新建worker失败 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock;//加锁 mainLock.lock(); try { if (w != null) workers.remove(w);//workers中移除 decrementWorkerCount();//减少ctl tryTerminate(); } finally { mainLock.unlock(); } } //工作线程如果从getTask方法中获得null,则会退出while循环并随后执行processWorkerExit方法。移除自己。completedAbruptly=fasle没有异常, //该方法会在这个工作线程终止之前执行一些操作:统计该工作者完成的任务数,然后将其从workers集合中删除,每删除一个工作者之后都会去调用tryTerminate方法尝试终止线程池,但并不一定会真的终止线程池。 private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) //若非正常完成则将线程数减为0 decrementWorkerCount();//ctl减1 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks;//统计完成的任务总数 workers.remove(w);//移除work } finally { mainLock.unlock(); } tryTerminate();//尝试终止线程池 /* 从tryTerminate方法返回后再次去检查一遍线程池的状态,如果线程池状态为running或者shutdown, 并且线程数小于最小值,则恢复一个工作者。这个最小值是怎样计算出来的呢? 我们来看看。如果allowCoreThreadTimeOut为true则最小值为0,否则最小值为corePoolSize。 但还有一个例外情况,就是虽然允许核心线程超时了,但是如果任务队列不为空的话,那么必须保证有一个线程存在,因此这时最小值设为1 后面就是判断如果工作线程数大于最小值就不新增线程了,否则就新增一个非核心线程。 从这个方法可以看到,每个线程退出时都会去判断要不要再恢复一个线程,因此线程池中的线程总数也是动态增减的。 */ int c = ctl.get(); if (runStateLessThan(c, STOP)) {//SHUTDOWN|RUNNING,则将线程数恢复到最小值 if (!completedAbruptly) {//线程正常完成任务被移除 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//允许核心线程超时最小值为0, 否则最小值为核心线程数 if (min == 0 && !workQueue.isEmpty())//如果任务队列还有任务, 则保证至少有一个线程 min = 1; if (workerCountOf(c) >= min)//若线程数大于最小值则不新增了 return; } addWorker(null, false);//新增工作线程 } } private Runnable getTask() {//不是worker的方法,corePoolSize个多线程并发访问, boolean timedOut = false; //上一次获取任务是否超时 for (;;) { int c = ctl.get(); int rs = runStateOf(c);//rs是一个值,ctl是多个值。 //if判断,从这里我们可以看到,如果线程池状态为shutdown,会继续消费任务队列里面的任务;如果线程池状态为stop,则停止消费任务队列里剩余的任务。 //true&&(true|): rs=STOP|TIDYIN|TERMINATED //true&&(false|true): rs=SHUTDOWN并且队列空。SHUTDOWN了还会去执行队列。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();//ctl-1,销毁当前线程 return null;//这个线程 退出这个for循环和外部的while循环,这个worker线程退出终止,执行完。 } //false:rs=RUNNING: //true&&(false|false):rs=SHUTDOWN并且队列不为空:每个线程都不会终止,继续处理队列的任务。 int wc = workerCountOf(c); // 是否开始超时等待:1.允许核心线程超时,2.线程数大于corePoolSize boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //是否退出当前线程, if ( ( wc > maximumPoolSize || (timed && timedOut) ) && ( wc > 1 || workQueue.isEmpty() )) { if (compareAndDecrementWorkerCount(c)) return null;//终止当前线程 continue; } //不退出当前线程: try {//会阻塞等到队列有任务,超时等待返回空,继续死循环。 //注意:闲置线程会一直在这阻塞 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; //是因为超时退出阻塞的, } catch (InterruptedException retry) { timedOut = false;//不是因为超时退出阻塞的, } } } final void runWorker(Worker w) throws InterruptedException {//不是worker的方法,是外部类的方法,会有corePoolSize个多线程访问问题, Thread wt = Thread.currentThread(); Thread t = w.thread;//相等的 Runnable task = w.firstTask;//第一个任务,不是队列的任务 w.firstTask = null; //设置work的state=0和去掉owenerThread属性 //Worker也是一个ReentantLock,但是3个线程,每个线程一个woker,woker w不是线程共享的锁,不会多线程获取这把锁,unlock()不会有多线程访问, w.unlock(); //把state由-1变成0。interruptIdleWorkers方法就可以中断这个线程了。 boolean completedAbruptly = true;//有异常 try { //先执行初始是一个任务task,执行完之后从workQueue中取任务去执行。 while (task != null || (task = getTask()) != null) {//不断的从任务队列中获取任务,直到getTask方法返回null,然后工作线程退出while循环最后执行processWorkerExit方法来移除自己。 //就有多线程调用w.lock(),每个线程一个woker,woker w不是线程共享的锁,此处代码不会多线程获取这把锁, //设置work的state=1和owenerThread属性 //如果shutdown方法里面的interrupt方法,调用了w.tryLock(),那么当前线程就会加入到w的队列,并且当前线程阻塞等待唤醒,唤醒之后继续这里执行。 //每次都使用锁以保证当前worker在运行task过程中不会被中断。 w.lock(); //其他线程可以从workers中获取一个worker,其他线程没有获取到锁,就不能对这个线程中断。本线程的下面执行就不能被中断。 //ctl大于等于STOP:STOP|TIDYING|TERMINATED if ( ( runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) ) && !wt.isInterrupted() ) wt.interrupt();//线程中断 try { beforeExecute(wt, task);//正常执行,就继续运行。 Throwable thrown = null; try { task.run();//外部添加的任务run } catch (RuntimeException x) { thrown = x;//收集异常,给afterExecute throw x; } catch (Error x) { thrown = x;//收集异常,给afterExecute throw x; } catch (Throwable x) { thrown = x;//收集异常,给afterExecute throw new Error(x); } finally { afterExecute(task, thrown);//异不异常都会处理,thrown来区分。afterexecute这也可能引发一个异常, } } finally { task = null;//将执行完的任务置空 w.completedTasks++;//将完成的任务数加一 w.unlock(); } } completedAbruptly = false;//正常完成,没有异常 } finally { processWorkerExit(w, completedAbruptly);//异不异常都会处理,completedAbruptly来区分。移除自己。 } } public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors1.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler1 handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler1 handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } //这个任务有可能新建线程执行,有可能在已经存在的线程里面去执行。线程池已经shutdown了或者池子满了就丢弃任务, public void execute(Runnable command) { //executorService1.execute(new Runnable() = firstTask),会有多线程访问。任意多个线程调用同一个线程池executorService.execute方法。 if (command == null) throw new NullPointerException(); int c = ctl.get(); int m; //最大开corePoolSize=3个线程,来一个新建一个Worker添加到workers里面去然后Worker.start(),最多添加3个。workers变成3个之后不会较小,一直不变。 //后面进来的添加到workQueue里面去,前面3个线程只有执行完了就会去阻塞等到队列有任务。前面3个线程一直不会退出。 if ( (m = workerCountOf(c)) < corePoolSize) {//任意多个线程可以并发访问。当前 运行的线程 数量是否少于corePoolSize。 if (addWorker(command, true))//添加到workers里面去,然后start。创建一个新的工作线程来执行任务。 return; c = ctl.get(); } //上面代码任意个线程并发,下面代码超过corePoolSize的线程并发。若队列已满则返回false。 if (isRunning(c) && workQueue.offer(command)) {//c<0(运行状态)并且任务加到队列成功,3个核心的线程刚才在阻塞等待workQueue现在队列有元素了,会唤醒去处理 //在成功将任务放入到任务队列后,还会再次检查线程池是否是Running状态,如果不是则将刚刚添加的任务从队列中移除,然后再执行拒绝策略。 int recheck = ctl.get();//这里进行再次检查状态, //!true:还是处于正常运行状态:不移除, //!fasle:不处于运行态,queue中移除command并且tryTerminate()成功移除后再执行拒绝策略 if (!isRunning(recheck) && remove(command)) reject(command);//丢弃任务,如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。 //处于运行状态,task添加到队列中,没有worker线程, //若线程数为0则新建一个worker。稍后这个空闲的worker就会自动去队列里面取任务来执行 //如果从队列中移除任务失败,则再检查一下线程数是否为0(有可能刚好全部线程都被终止了),是的话就新建一个非核心线程去处理。 else if (workerCountOf(recheck) == 0) addWorker(null, false);//开一个线程但是不执行初始任务,等着执行队列任务 //不是运行状态,或者队列添加失败,拒绝策略拒绝task //如果任务队列已经满了,此时offer方法会返回false,接下来会再次调用addWorker方法新增一个非核心线程来处理该任务。如果期间创建线程失败,则最后会执行拒绝策略。 } else if (!addWorker(command, false)) reject(command); } //平缓关闭线程池 public void shutdown() { final ReentrantLock mainLock = this.mainLock;//获取锁,新建不了worker mainLock.lock(); try { checkShutdownAccess();//检查是否有关闭的权限 advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN,这时线程池会拒绝接收外部传过来的任务, interruptIdleWorkers();//中断闲置的线程,还没有执行lock的worker w的线程,剩余的线程会继续消费完任务队列里的任务之后才会终止。 onShutdown(); //对外提供的钩子 } finally { mainLock.unlock(); } tryTerminate();//尝试终止线程池 } //立刻关闭线程池 public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();//检查是否有关闭的权限 advanceRunState(STOP);//更新状态为stop,这是线程池也不再接收外界的任务 interruptWorkers();//中断所有工作线程 tasks = drainQueue();//排干任务队列 } finally { mainLock.unlock(); } tryTerminate();//尝试终止线程池 return tasks;//最后返回未被处理的任务集合。 } //调用shutdown()和shutdownNow()方法后还未真正终止线程池,这两个方法最后都会调用tryTerminate()方法来终止线程池。 public boolean isShutdown() { return !isRunning(ctl.get()); } //ctl>=0并且小于0110000000000,ctl=SHUTDOWN|STOP|TIDYING public boolean isTerminating() { int c = ctl.get(); return !isRunning(c) && runStateLessThan(c, TERMINATED); } //ctl>=TERMINATED public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED))//状态>=011,表示已经处于TERMINATED return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } protected void finalize() { SecurityManager sm = System.getSecurityManager(); if (sm == null || acc == null) { shutdown(); } else { PrivilegedAction<Void> pa = () -> { shutdown(); return null; }; AccessController.doPrivileged(pa, acc); } } public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } public ThreadFactory getThreadFactory() { return threadFactory; } public void setRejectedExecutionHandler(RejectedExecutionHandler1 handler) { if (handler == null) throw new NullPointerException(); this.handler = handler; } public RejectedExecutionHandler1 getRejectedExecutionHandler() { return handler; } public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers();//中断所有可以获取锁的worker else if (delta > 0) { //我们不知道“需要”多少新线程。作为一种启发式方法,预启动足够多的新工作人员(最多新的核心大小) //来处理队列中当前的任务数,但如果队列在执行此操作时变为空,则停止。 int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) {//开k个worker线程 if (workQueue.isEmpty())//队列空就停止 break; } } } public int getCorePoolSize() { return corePoolSize; } //worker线程数量小于corePoolSize。就开一个空初始任务的Worker和线程。 public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } //与pretartcorethread相同,只是安排至少启动一个线程,即使corepoolsize为0。 void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } //启动3个worker线程去执行,有任务进来,就不初始化线程,直接都给队列,然后这些线程从队列取任务。 //饿初始化线程池的线程,不是懒初始化,等到任务来了才初始化线程。 public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; } public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value)//true,启用超时 interruptIdleWorkers(); } } public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; //已经开的worker数量大于maximumPoolSize就中断所有可以获取锁的worker的线程 if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); } public int getMaximumPoolSize() { return maximumPoolSize; } public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); long keepAliveTime = unit.toNanos(time); long delta = keepAliveTime - this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0) interruptIdleWorkers(); } public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } public BlockingQueue<Runnable> getQueue() { return workQueue; } public boolean remove(Runnable task) { boolean removed = workQueue.remove(task);//workQueue队列中移除task tryTerminate(); // In case SHUTDOWN and now empty return removed; } /** 尝试从工作队列中删除所有已取消的@link future任务。 此方法可作为存储回收操作使用,对功能没有其他影响。 被取消的任务永远不会执行,但可能会累积到工作队列中,直到工作线程可以主动删除它们。 调用此方法将尝试立即删除它们。但是,此方法可能无法在存在其他线程干扰的情况下删除任务。 */ public void purge() { final BlockingQueue<Runnable> q = workQueue; try { Iterator<Runnable> it = q.iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future<?> && ((Future<?>) r).isCancelled()) it.remove(); } } catch (ConcurrentModificationException fallThrough) { // Take slow path if we encounter interference during traversal. // Make copy for traversal and call remove for cancelled entries. // The slow path is more likely to be O(N*N). for (Object r : q.toArray()) if (r instanceof Future<?> && ((Future<?>) r).isCancelled()) q.remove(r); } tryTerminate(); // In case SHUTDOWN and now empty } public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Remove rare and surprising possibility of // isTerminated() && getPoolSize() > 0 return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } } public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } } //返回已完成执行的任务的大致总数。 近似值,在连续调用中不会减少。 public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } public String toString() { long ncompleted; int nworkers, nactive; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { ncompleted = completedTaskCount; nactive = 0; nworkers = workers.size(); for (Worker w : workers) { ncompleted += w.completedTasks; if (w.isLocked()) ++nactive; } } finally { mainLock.unlock(); } int c = ctl.get(); String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : (runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down")); return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]"; } protected void beforeExecute(Thread t, Runnable r) {} protected void afterExecute(Runnable r, Throwable t) {} protected void terminated() {}// Executor中断调用 public static class CallerRunsPolicy implements RejectedExecutionHandler1 { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) { if (!e.isShutdown()) {//池子关闭了,就丢弃这个任务,什么都不执行就是丢弃。 r.run();//池子没有关闭,在调用者线程执行这个任务,不再使用线程池的线程来执行任务。 } } } public static class DiscardPolicy implements RejectedExecutionHandler1 { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) { }//什么都不执行就是丢弃任务 } public static class DiscardOldestPolicy implements RejectedExecutionHandler1 { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) { if (!e.isShutdown()) {//池子没有关闭 e.getQueue().poll();//移除队列中的第一个元素,丢弃阻塞队列中靠最前的任务 e.execute(r);//把这个任务丢进去 }//池子关闭了什么都不做丢弃 } } }
public class ThreadPoolExample1 { public static void main(String[] args) { ThreadPoolExecutor1 executorService1 = new ThreadPoolExecutor1(3, 3, 6L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // executorService1.prestartAllCoreThreads(); // ThreadPoolExecutor1 executorService1 = (ThreadPoolExecutor1) Executors1.newCachedThreadPool(); // ExecutorService executorService2 = Executors1.newSingleThreadExecutor(); new Thread(() -> {//外部线程,不是线程池线程,丢任务给池子。 int i = 0; while (true) { Threadd w = new Threadd(executorService1,"子线程"+(++i));//外部线程,不是线程池线程,丢任务给池子。 w.start(); } },"发射线程").start(); executorService1.shutdown(); } static int j =0; static class Threadd extends Thread {//外部线程,不是线程池线程 ThreadPoolExecutor1 executorService1; Threadd(ThreadPoolExecutor1 executorService1,String name) { super(name); this.executorService1 = executorService1; } @Override public void run() {//多个线程(不是main线程)来调用executorService.execute()来丢任务给池子。 try { executorService1.execute(new Runnable() {//execute的参数是一个任务 String name = "任务"+(j++); @Override public void run() { System.out.println(name+"完成"); } }); } catch (Exception e) { e.printStackTrace(); } } } }
标签:最小 advance 实现 越来越大 exec 保存 总线 启用 target
原文地址:https://www.cnblogs.com/yaowen/p/11378173.html