码迷,mamicode.com
首页 > 其他好文 > 详细

ThreadPoolExecutor源码2

时间:2019-08-19 17:33:45      阅读:88      评论:0      收藏:0      [点我收藏+]

标签:最小   advance   实现   越来越大   exec   保存   总线   启用   target   

public class ThreadPoolExecutor1 extends AbstractExecutorService1 {
    // 11100000000000000000000000000000 = -536870912,  高3位表示线程池状态, 后29位表示线程个数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;// 29
    // 2^29-1 = 00011111 11111111 11111111 11111111 = 536870911
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    // -1 = 11111111 11111111 11111111 11111111
    // 11100000 00000000 00000000 00000000 = -536870912,运行状态
    private static final int RUNNING = -1 << COUNT_BITS;
    // 00000000000000000000000000000000,关闭状态  
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    // 2^29 = 00100000 00000000 00000000 00000000 = 536870912,停止状态
    private static final int STOP = 1 << COUNT_BITS;
    // 2^30 = 01000000 00000000 00000000 00000000 = 1073741824,整理状态  
    private static final int TIDYING = 2 << COUNT_BITS;
    // 2^29+2^30,3*2^29,2^29+2^29+2^29 = 01100000 00000000 00000000 00000000 = 1610612736,终止状态
    private static final int TERMINATED = 3 << COUNT_BITS;

    // C的高3位,高3位表示线程池的运行状态。
    //111RUNNING:运行中,接受新任务处理队列中的任务。
    //000SHUTDOWN:不接收新任务处理队列中的任务; 
    //001STOP:不接收新任务也不处理队列中的任务还中断正在运行的任务;
    //010TIDYING:所有的任务都已经终止;011TERMINATED:terminated()方法已经执行完成 
    private static int runStateOf(int c) {
        return c & ~CAPACITY;//11100000 00000000 00000000 00000000
    }
    
    //C的低29位,跟00011111 11111111 11111111 11111111比较,低29位表示线程池中线程数,最大2^29-1。
    private static int workerCountOf(int c) {//对2^29取余,ctl加了多少次1,最大加2^29-1次。表示多少个worker已经在运行了。
        return c & CAPACITY;//00011111 11111111 11111111 11111111
    }

    private static int ctlOf(int rs, int wc) {//rs是状态值,wc是worker线程数量,进行或操作,就是修改状态,但是不改变数量。
        return rs | wc;
    }

    private static boolean runStateLessThan(int c, int s) {//ctl是不是小于某个状态值,
        return c < s;
    }

    /*ctl从11100000000000000000000000000000=-536870912开始,慢慢加1,一直越来越大,最后=111111111111111111111111=-1
      worker线程数量最大2^29-1=536870911个,就不能再增加了,所以ctl的范围是(-536870912,-1)一直小于0*/
    
    /*SHUTDOWN=0,
      STOP=001=2^29=536870912,
      TIDYING=010=2^30=1073741824
      RUNNING=111=-536870912
      TERMINATED=011=3*2^29=1610612736
      */
    private static boolean runStateAtLeast(int c, int s) {//ctl是不是大于某个状态值
        return c >= s;//RUNNING:111   SHUTDOWN:000    STOP:001    TIDYING:010    TERMINATED:011
    }

    private static boolean isRunning(int c) {//RUNNING正常状态,ctl(-536870912,-1)一直小于0,小于0就是正常
        return c < SHUTDOWN;
    }

    //ctl+1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    //ctl-1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    //ctl-1直到成功。 
    private void decrementWorkerCount() {
        do {
        } while (!compareAndDecrementWorkerCount(ctl.get()));
    }

    private final HashSet1<Worker> workers = new HashSet1<Worker>();
    private final BlockingQueue<Runnable> workQueue;//队列。先加到workers里面去,workers满了就加到workQueue里去。

    private final ReentrantLock mainLock = new ReentrantLock();//锁住workers
    private final Condition termination = mainLock.newCondition();

    //跟踪获得的最大池大小。仅在主锁下访问。
    private int largestPoolSize;

    //已完成任务总数
    private long completedTaskCount;

    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler1 handler;

    private volatile long keepAliveTime;//闲置线程存活时间,闲置线程在阻塞等待队列,
    private volatile boolean allowCoreThreadTimeOut;//等待队列时候是否启用keepAliveTime超时

    //核心池大小 
    private volatile int corePoolSize;

    //最大池大小。 受CAPACITY限制。
    private volatile int maximumPoolSize;

    private static final RejectedExecutionHandler1 defaultHandler = new AbortPolicy();
    public static class AbortPolicy implements RejectedExecutionHandler1 {
        public AbortPolicy() {
        }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {//抛出异常,不是什么都不做
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }
        
    private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");

    //执行终结器时要使用的上下文,
    private final AccessControlContext acc;

    private final class Worker extends AbstractQueuedSynchronizer1 implements Runnable {//是一个AQS和runnable任务,Worker里面的方法只要里面的一个线程访问,
        //这个类永远不会被序列化,但是我们提供了一个serialversionID来禁止javac警告。
        private static final long serialVersionUID = 6138294804551838833L;
        //this worker 正在运行在的Thread。线程池里面创建的线程,不是外部线程。
        final Thread thread;
        //要运行的初始任务。
        Runnable firstTask;
        //这个worker完成的任务
        volatile long completedTasks;

        //一个worker就是一个任务。同时是一个AQS队列,同时是一个runnable
        Worker(Runnable firstTask) {
            //初始设置为-1来抑制中断方法的执行。unlock才变为0。只有在runWorker()方法里面先unlock置为0其他方法才能获取锁。
            //interruptIdleWorkers方法来tryLock获取锁来中断时候,是中断不了的。runWorker方法运行时才表明关联线程已启动,这时去中断关联线程才有意义,
            //lock()方法也执行不了,只能先执行unlock()方法,才能去获取锁。
            setState(-1); 
            this.firstTask = firstTask;//executorService1.execute(new Runnable() = firstTask)
            this.thread = getThreadFactory().newThread(this);//this = ThreadPoolExecutor1$Worker,返回new Thread(ThreadPoolExecutor1$Worker)
        }

        public void run() {
            try {
                runWorker(this);//this = ThreadPoolExecutor1$Worker。这个方法在自己的thread里面运行,不会有多线程问题
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;//true:有人获取锁,false:没人获取锁
        }
        
        //worker类继承自AQS并实现了自己的加锁解锁方法,
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {//尝试获取锁
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;//排AQS队
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);//state初始等于-1,
            return true;
        }

        public void lock() throws InterruptedException {//会去排队
            acquire(1);
        }

        public boolean tryLock() {//不会去排队
            return tryAcquire(1);
        }

        public void unlock() {
            release(1);//释放锁,并且唤醒head中的第一个
        }

        public boolean isLocked() {
            return isHeldExclusively();
        }
        
        //中断worker关联线程
        void interruptIfStarted() {
            Thread t;
            //state=-1不能中断,说明还没有运行起来
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    /* 
     1.advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN=00000000000
     第一个条件c>=0000000,说明ctl不是在正常的增加(正常增加都是小于0),c=SHUTDOWN000xxxx|STOP001xxx|TIDYING010xxx|TERMINATED011xxx返回true。就什么都不做。
     c=RUNNING111xxxxx就返回false,第二个条件吧ctl变成000xxxxx
     也就是说:SHUTDOWN000xxxxx|STOP001xxxxx|TIDYING010xxxxx|TERMINATED011xxxxx不能变为shutdown状态,running111xxxxx状态可以变为shutdown状态。
     
     2.advanceRunState(STOP);//更新状态为STOP00100000000
      第一个条件c>=00100000000,说明ctl已经不正常了,c=TIDYING010xxxxx|TERMINATED011xxxxx|STOP001xxxxx,返回true。就什么都不做。
     c=RUNNING111xxxxx|SHUTDWON000xxxxx返回false。第二个条件吧ctl变成001xxxxx
     也就是说:TIDYING010|TERMINATED011|STOP001不能变为STOP状态,running状态|SHUTDOWN可以变为STOP状态。
     */
    //状态是不可逆的,如果跑到前面的状态了,就不动,否则修改。
    private void advanceRunState(int targetState) {//更新状态
        for (;;) {
            int c = ctl.get(); 
            //c>=targetState就不修改状态。c<targetState就去修改状态。
            if (runStateAtLeast(c, targetState) || 
                    //ctlOf(状态值,数量值)进行或操作,就是修改状态,但是不改变数量
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

    //尝试终止线程池
    final void tryTerminate() {
        for (;;) {//死循环
            int c = ctl.get();
            
            //以下两种情况终止线程池,其他情况直接返回:
            //1.状态为stop
            //2.状态为shutdown且任务队列为空
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            
            //false&&false&&(false&&):c>=0&&c<010&&c!=0:c=STOP状态
            //false&&false&&(true&&false):c>=0&&c<010&&c=0&&队列空:c=SHUTDOWN状态队列空
            
            //若线程不为空则中断一个闲置线程后直接返回
            if (workerCountOf(c) != 0) { // worker数量不等于0
                interruptIdleWorkers(ONLY_ONE);//中断线程,是否只中断一次
                return;
            }
            
            //worker数量等于0,并且  处于stop状态或者SHUTDOWN状态队列空
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//设置状态为TIDYING=010
                    try {//线程池终止后做的事情
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));//设置状态为TERMINATED=011
                        termination.signalAll();//唤醒条件队列所有线程
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            //状态设置失败则再重试
        }
    }
    /*
     调用该方法来尝试终止线程池,在进入for循环后第一个if判断过滤了不符合条件的终止操作,只有状态为stop,
     或者状态为shutdown且任务队列为空这两种情况才能继续执行。
     第二个if语句判断工作者数量是否为0,不为0的话也直接返回。经过这两重判断之后才符合终止线程池的条件,
     于是先通过CAS操作将线程池状态设置为tidying状态,在tidying状态会调用用户自己实现的terminated()方法来做一些处理。
     到了这一步,不管terminated()方法是否成功执行最后都会将线程池状态设置为terminated,也就标志着线程池真正意义上的终止了。
     最后会唤醒所有等待线程池终止的线程,让它们继续执行
     */


    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);//检查每一个线程的权限。线程池的线程全在workers里面的worker里面的thread上面。
            } finally {
                mainLock.unlock();
            }
        }
    }

    //中断所有线程,即使是活动的。忽略SecurityExceptions(在这种情况下,某些线程可能保持不间断)。
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();//中断workers的线程,加锁防止workers变化。
        } finally {
            mainLock.unlock();
        }
    }

    
    private void interruptIdleWorkers(boolean onlyOne) {//中断worker的线程
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {//没有中断并且,获取锁成功,tryLock不会阻塞排队,只会把state=0变成1,
                    //前提是没有线程获取w里面的锁。就可以中断w里面的线程。 其他线程要想操作w就要先获取锁。
                    //worker每处理一个任务,会加锁一次解锁一次。
                    try {
                        t.interrupt();//Worker的线程中断,加锁防止workers变化。
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    //中断空闲worker。中断可以获取锁的worker里面的线程,就是worker w还没有调用lock方法的worker。
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private static final boolean ONLY_ONE = true;


    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

     
    void onShutdown() {
    }
    

    //RUNNING返回true,SHUTDOWN并且shutdownOK=true返回true,SHUTDOWN并且shutdownOK=false返回false,
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

    
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))//从workqueeu中移除
                    taskList.add(r);//添加到taskList
            }
        }
        return taskList;
    }
    
    /*addWorker本事只是为线程池添加一个Worker,其本身所做的事情其实很简单,但难就难在要确保安全有效得添加一个Worker。
    为此addWorker()方法做了很多额外的工作。比如判断线程池的运行状态,当前Worker数量是否已经饱和等等。可以发现在这个方法,
    或者说整个ThreadPoolExecutor中,很多时候都是使用双重检查的方式来对线程池状态进行检查。其实这都是为了效率,
    最简单不过直接使用Synchronized或ReentranLock进行同步,但这样效率会低很多,所以在这里,
    只有在万不得已的情况下,才会使用悲观的ReentranLock。*/
    
    //添加任务,executorService1.execute(new Runnable() = firstTask),任意个线程并发,
    private boolean addWorker(Runnable firstTask, boolean core) {//core是不是核心线程
        
        //两层循环,外层循环判断线程池状态,状态不符合就return,内层循环判断线程数,线程数超过限定值return。
        
        retry: for (;;) {
            int c = ctl.get(); 
            int rs = runStateOf(c);//前3位不变,后面29位=0。
            //c处于RUNNING状态,c=111xxxxxxxxx,rs=111 0000000000000000
            //c处于SHUTDOWN状态,c=000xxxxxxxxx,rs=000 0000000000000000
            //c处于STOP状态,c=001xxxxxxxxx,rs=001 0000000000000000
            //c处于TIDYING状态,c=010xxxxxxxxx,rs=010 0000000000000000
            //c处于TERMINATED状态,c=011xxxxxxxxx,rs=011 0000000000000000
            
            //状态判断。rs后面全是0,ctl后面不是0。

            //rs正常运行小于0,SHUTDOWN=0,其他STOP,TIDYING,TERMINATED都是大于0
            
            //true&&!(false):rs>=0(不是RUNNING状态)&& rs != SHUTDOWN=000:rs>0处于STOP,TIDYING,TERMINATED
            //true&&!(true&&false):rs>=0&&rs=0&&firstTask!=null:SHUTDOWN状态&&有第一个任务
            //true&&!(true&&true&&false):rs>=0&&rs=0&&firstTask=null&&workQueue=null:SHUTDOWN状态&&没有第一个任务&&队列空
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;

            //只有以下两种情况会继续添加线程
            //1.false:rs<0(处于RUNNING状态),
            //2.true&&!(true&&true&&true):rs>=0&&rs=0&&firstTask=null&&workQueue!=null:shutdown状态,首任务空,队列还有任务。其他地方调用addWorker(null,true|false)
            for (;;) {
                int wc = workerCountOf(c);//已经运行的worker线程,worker线程数目大于CAPACITY=2^29-1就不嗯能够再加worker线程数目了。
                
                //以下三种情况不继续添加线程:
                //1.线程数大于线程池总容量
                //2.当前线程为核心线程,且核心线程数达到corePoolSize
                //3.当前线程非核心线程,且总线程达到maximumPoolSize
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;//不开worker线程,加到队列里面去,
                if (compareAndIncrementWorkerCount(c))//ctl加一成功
                    break retry;//加1成功就退出
                c = ctl.get(); //ctl加一失败,
                if (runStateOf(c) != rs)//状态没变重新加1。
                    continue retry;//状态变了重新获取状态。
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //executorService1.execute(new Runnable() = firstTask),Worker里面有一个firstTask和Thread,Thread里面有这个Worker w。
            w = new Worker(firstTask);//firstTask可以为null
            //Worker里面封装了这个任务,并且实例化了一个线程,总共3个Worker,3个任务(3个第一个任务),3个线程。线程run时候就开一个线程去执行Worker里的第一个任务,线程Thread run的时候,是Thread
            //里面的Runnable去run,所以Worker要设置成这个Thread的Runnable,然后让Worker去run(转调外部类的run方法,把worker传进去)。Worker里面仅仅保存的是这个worker的第一个任务,第一个任务执行完会死循环执行queue队列的任务。
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//单线程进来,其余阻塞
                try {
                    // 保持锁止时重新检查。如果线程发生故障或在获取锁之前关闭,请退出。
                    int rs = runStateOf(ctl.get());
                    //true|:running状态,增加worker
                    //false|true:shutdown状态并且task=null,增加worker
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) //如果线程已经开启则抛出异常
                            throw new IllegalThreadStateException();
                        workers.add(w);//Worker加到workers里面去,多线程并发要加锁
                        int s = workers.size();
                        if (s > largestPoolSize)//记录线程达到的最大值
                            largestPoolSize = s;
                        workerAdded = true;//添加成功
                    }
                    //false|(fasle|):STOP|TIDYING|TERMINATER:不增加worker
                    //false|(true|false):SHUTDOWN,task不为null:不增加worker
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//添加成功
                    t.start();//线程123执行start(),就会去执行Worker的run方法。
                    workerStarted = true;//启动成功
                }
            }
        } finally {
            if (!workerStarted)//线程启动成功
                addWorkerFailed(w);//新建worker失败
        }
        return workerStarted;//是否启动成功
    }

    //新建worker失败
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;//加锁
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);//workers中移除
            decrementWorkerCount();//减少ctl
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    //工作线程如果从getTask方法中获得null,则会退出while循环并随后执行processWorkerExit方法。移除自己。completedAbruptly=fasle没有异常,
    //该方法会在这个工作线程终止之前执行一些操作:统计该工作者完成的任务数,然后将其从workers集合中删除,每删除一个工作者之后都会去调用tryTerminate方法尝试终止线程池,但并不一定会真的终止线程池。
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) //若非正常完成则将线程数减为0
            decrementWorkerCount();//ctl减1

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;//统计完成的任务总数
            workers.remove(w);//移除work
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//尝试终止线程池
        /*
          从tryTerminate方法返回后再次去检查一遍线程池的状态,如果线程池状态为running或者shutdown,
          并且线程数小于最小值,则恢复一个工作者。这个最小值是怎样计算出来的呢?
          我们来看看。如果allowCoreThreadTimeOut为true则最小值为0,否则最小值为corePoolSize。
          但还有一个例外情况,就是虽然允许核心线程超时了,但是如果任务队列不为空的话,那么必须保证有一个线程存在,因此这时最小值设为1
          后面就是判断如果工作线程数大于最小值就不新增线程了,否则就新增一个非核心线程。
          从这个方法可以看到,每个线程退出时都会去判断要不要再恢复一个线程,因此线程池中的线程总数也是动态增减的。
         */
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {//SHUTDOWN|RUNNING,则将线程数恢复到最小值
            if (!completedAbruptly) {//线程正常完成任务被移除
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//允许核心线程超时最小值为0, 否则最小值为核心线程数
                if (min == 0 && !workQueue.isEmpty())//如果任务队列还有任务, 则保证至少有一个线程
                    min = 1;
                if (workerCountOf(c) >= min)//若线程数大于最小值则不新增了
                    return;  
            }
            addWorker(null, false);//新增工作线程
        }
    }

    private Runnable getTask() {//不是worker的方法,corePoolSize个多线程并发访问,
        boolean timedOut = false; //上一次获取任务是否超时

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//rs是一个值,ctl是多个值。
            
            //if判断,从这里我们可以看到,如果线程池状态为shutdown,会继续消费任务队列里面的任务;如果线程池状态为stop,则停止消费任务队列里剩余的任务。
            
            //true&&(true|): rs=STOP|TIDYIN|TERMINATED
            //true&&(false|true): rs=SHUTDOWN并且队列空。SHUTDOWN了还会去执行队列。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//ctl-1,销毁当前线程
                return null;//这个线程  退出这个for循环和外部的while循环,这个worker线程退出终止,执行完。
            }

            //false:rs=RUNNING:
            //true&&(false|false):rs=SHUTDOWN并且队列不为空:每个线程都不会终止,继续处理队列的任务。
            int wc = workerCountOf(c);

            // 是否开始超时等待:1.允许核心线程超时,2.线程数大于corePoolSize
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //是否退出当前线程,
            if (    ( wc > maximumPoolSize || (timed && timedOut) ) 
                    && ( wc > 1 || workQueue.isEmpty() )) {
                if (compareAndDecrementWorkerCount(c))
                    return null;//终止当前线程
                continue;
            }

            //不退出当前线程:
            try {//会阻塞等到队列有任务,超时等待返回空,继续死循环。
                //注意:闲置线程会一直在这阻塞
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true; //是因为超时退出阻塞的,
            } catch (InterruptedException retry) {
                timedOut = false;//不是因为超时退出阻塞的,
            }
        }
    }

    final void runWorker(Worker w) throws InterruptedException {//不是worker的方法,是外部类的方法,会有corePoolSize个多线程访问问题,
        Thread wt = Thread.currentThread();
        Thread t = w.thread;//相等的
        
        Runnable task = w.firstTask;//第一个任务,不是队列的任务
        w.firstTask = null;
        //设置work的state=0和去掉owenerThread属性
        //Worker也是一个ReentantLock,但是3个线程,每个线程一个woker,woker w不是线程共享的锁,不会多线程获取这把锁,unlock()不会有多线程访问,
        w.unlock(); //把state由-1变成0。interruptIdleWorkers方法就可以中断这个线程了。
        boolean completedAbruptly = true;//有异常
        try {
            //先执行初始是一个任务task,执行完之后从workQueue中取任务去执行。
            while (task != null || (task = getTask()) != null) {//不断的从任务队列中获取任务,直到getTask方法返回null,然后工作线程退出while循环最后执行processWorkerExit方法来移除自己。
                //就有多线程调用w.lock(),每个线程一个woker,woker w不是线程共享的锁,此处代码不会多线程获取这把锁,
                //设置work的state=1和owenerThread属性
                //如果shutdown方法里面的interrupt方法,调用了w.tryLock(),那么当前线程就会加入到w的队列,并且当前线程阻塞等待唤醒,唤醒之后继续这里执行。
                //每次都使用锁以保证当前worker在运行task过程中不会被中断。
                w.lock();
                 
                //其他线程可以从workers中获取一个worker,其他线程没有获取到锁,就不能对这个线程中断。本线程的下面执行就不能被中断。
                
                //ctl大于等于STOP:STOP|TIDYING|TERMINATED 
                if (   (       runStateAtLeast(ctl.get(), STOP) 
                           || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
                       )
                       && !wt.isInterrupted()
                   )
                    wt.interrupt();//线程中断
                try {
                    beforeExecute(wt, task);//正常执行,就继续运行。
                    Throwable thrown = null;
                    try {
                        task.run();//外部添加的任务run
                    } catch (RuntimeException x) {
                        thrown = x;//收集异常,给afterExecute
                        throw x;
                    } catch (Error x) {
                        thrown = x;//收集异常,给afterExecute
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;//收集异常,给afterExecute
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//异不异常都会处理,thrown来区分。afterexecute这也可能引发一个异常,
                    }
                } finally {
                    task = null;//将执行完的任务置空 
                    w.completedTasks++;//将完成的任务数加一
                    w.unlock();
                }
            }
            completedAbruptly = false;//正常完成,没有异常
        } finally {
            processWorkerExit(w, completedAbruptly);//异不异常都会处理,completedAbruptly来区分。移除自己。
        }
    }
    
    public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors1.defaultThreadFactory(),
                defaultHandler);
    }
    
    public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }
    
    public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler1 handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }
    
    public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler1 handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    //这个任务有可能新建线程执行,有可能在已经存在的线程里面去执行。线程池已经shutdown了或者池子满了就丢弃任务,
    public void execute(Runnable command) {
        //executorService1.execute(new Runnable() = firstTask),会有多线程访问。任意多个线程调用同一个线程池executorService.execute方法。
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get(); 
        int m;
        
        //最大开corePoolSize=3个线程,来一个新建一个Worker添加到workers里面去然后Worker.start(),最多添加3个。workers变成3个之后不会较小,一直不变。
        //后面进来的添加到workQueue里面去,前面3个线程只有执行完了就会去阻塞等到队列有任务。前面3个线程一直不会退出。
        
        if ( (m = workerCountOf(c)) < corePoolSize) {//任意多个线程可以并发访问。当前 运行的线程 数量是否少于corePoolSize。
            if (addWorker(command, true))//添加到workers里面去,然后start。创建一个新的工作线程来执行任务。
                return;
            c = ctl.get();
        }
        
        //上面代码任意个线程并发,下面代码超过corePoolSize的线程并发。若队列已满则返回false。
        if (isRunning(c) && workQueue.offer(command)) {//c<0(运行状态)并且任务加到队列成功,3个核心的线程刚才在阻塞等待workQueue现在队列有元素了,会唤醒去处理
            
            //在成功将任务放入到任务队列后,还会再次检查线程池是否是Running状态,如果不是则将刚刚添加的任务从队列中移除,然后再执行拒绝策略。
            int recheck = ctl.get();//这里进行再次检查状态,
            
            //!true:还是处于正常运行状态:不移除, 
            //!fasle:不处于运行态,queue中移除command并且tryTerminate()成功移除后再执行拒绝策略
            
            if (!isRunning(recheck) && remove(command))
                reject(command);//丢弃任务,如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。
            
            //处于运行状态,task添加到队列中,没有worker线程,
            //若线程数为0则新建一个worker。稍后这个空闲的worker就会自动去队列里面取任务来执行
            //如果从队列中移除任务失败,则再检查一下线程数是否为0(有可能刚好全部线程都被终止了),是的话就新建一个非核心线程去处理。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//开一个线程但是不执行初始任务,等着执行队列任务
            
        //不是运行状态,或者队列添加失败,拒绝策略拒绝task
        //如果任务队列已经满了,此时offer方法会返回false,接下来会再次调用addWorker方法新增一个非核心线程来处理该任务。如果期间创建线程失败,则最后会执行拒绝策略。
        } else if (!addWorker(command, false))
            reject(command); 
    }

    //平缓关闭线程池
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;//获取锁,新建不了worker
        mainLock.lock();
        try {
            checkShutdownAccess();//检查是否有关闭的权限
            advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN,这时线程池会拒绝接收外部传过来的任务,
            interruptIdleWorkers();//中断闲置的线程,还没有执行lock的worker w的线程,剩余的线程会继续消费完任务队列里的任务之后才会终止。
            onShutdown();  //对外提供的钩子
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试终止线程池
    }

    //立刻关闭线程池
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检查是否有关闭的权限
            advanceRunState(STOP);//更新状态为stop,这是线程池也不再接收外界的任务
            interruptWorkers();//中断所有工作线程
            tasks = drainQueue();//排干任务队列
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试终止线程池
        return tasks;//最后返回未被处理的任务集合。
    }
    //调用shutdown()和shutdownNow()方法后还未真正终止线程池,这两个方法最后都会调用tryTerminate()方法来终止线程池。
    
    public boolean isShutdown() {
        return !isRunning(ctl.get());
    }

    //ctl>=0并且小于0110000000000,ctl=SHUTDOWN|STOP|TIDYING
    public boolean isTerminating() {
        int c = ctl.get();
        return !isRunning(c) && runStateLessThan(c, TERMINATED);
    }

    //ctl>=TERMINATED
    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))//状态>=011,表示已经处于TERMINATED
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

    protected void finalize() {
        SecurityManager sm = System.getSecurityManager();
        if (sm == null || acc == null) {
            shutdown();
        } else {
            PrivilegedAction<Void> pa = () -> {
                shutdown();
                return null;
            };
            AccessController.doPrivileged(pa, acc);
        }
    }

    public void setThreadFactory(ThreadFactory threadFactory) {
        if (threadFactory == null)
            throw new NullPointerException();
        this.threadFactory = threadFactory;
    }

    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }

    public void setRejectedExecutionHandler(RejectedExecutionHandler1 handler) {
        if (handler == null)
            throw new NullPointerException();
        this.handler = handler;
    }

    public RejectedExecutionHandler1 getRejectedExecutionHandler() {
        return handler;
    }

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();//中断所有可以获取锁的worker
        else if (delta > 0) {
            //我们不知道“需要”多少新线程。作为一种启发式方法,预启动足够多的新工作人员(最多新的核心大小)
            //来处理队列中当前的任务数,但如果队列在执行此操作时变为空,则停止。
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {//开k个worker线程
                if (workQueue.isEmpty())//队列空就停止
                    break;
            }
        }
    }

    public int getCorePoolSize() {
        return corePoolSize;
    }

    //worker线程数量小于corePoolSize。就开一个空初始任务的Worker和线程。
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
    }

    //与pretartcorethread相同,只是安排至少启动一个线程,即使corepoolsize为0。
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    //启动3个worker线程去执行,有任务进来,就不初始化线程,直接都给队列,然后这些线程从队列取任务。
    //饿初始化线程池的线程,不是懒初始化,等到任务来了才初始化线程。
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

    public boolean allowsCoreThreadTimeOut() {
        return allowCoreThreadTimeOut;
    }

    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)//true,启用超时
                interruptIdleWorkers();
        }
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        //已经开的worker数量大于maximumPoolSize就中断所有可以获取锁的worker的线程
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
            interruptIdleWorkers();
    }

    public long getKeepAliveTime(TimeUnit unit) {
        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
    }

    public BlockingQueue<Runnable> getQueue() {
        return workQueue;
    }

    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);//workQueue队列中移除task
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

    /**
    尝试从工作队列中删除所有已取消的@link future任务。
    此方法可作为存储回收操作使用,对功能没有其他影响。
    被取消的任务永远不会执行,但可能会累积到工作队列中,直到工作线程可以主动删除它们。
    调用此方法将尝试立即删除它们。但是,此方法可能无法在存在其他线程干扰的情况下删除任务。
     */
    public void purge() {
        final BlockingQueue<Runnable> q = workQueue;
        try {
            Iterator<Runnable> it = q.iterator();
            while (it.hasNext()) {
                Runnable r = it.next();
                if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
                    it.remove();
            }
        } catch (ConcurrentModificationException fallThrough) {
            // Take slow path if we encounter interference during traversal.
            // Make copy for traversal and call remove for cancelled entries.
            // The slow path is more likely to be O(N*N).
            for (Object r : q.toArray())
                if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
                    q.remove(r);
        }

        tryTerminate(); // In case SHUTDOWN and now empty
    }

    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

    //返回已完成执行的任务的大致总数。 近似值,在连续调用中不会减少。
    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

    public String toString() {
        long ncompleted;
        int nworkers, nactive;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            ncompleted = completedTaskCount;
            nactive = 0;
            nworkers = workers.size();
            for (Worker w : workers) {
                ncompleted += w.completedTasks;
                if (w.isLocked())
                    ++nactive;
            }
        } finally {
            mainLock.unlock();
        }
        int c = ctl.get();
        String rs = (runStateLessThan(c, SHUTDOWN) ? "Running"
                : (runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down"));
        return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive
                + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]";
    }

    protected void beforeExecute(Thread t, Runnable r) {}

    protected void afterExecute(Runnable r, Throwable t) {}

    protected void terminated() {}// Executor中断调用 

    public static class CallerRunsPolicy implements RejectedExecutionHandler1 {
        public CallerRunsPolicy() {
        }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
            if (!e.isShutdown()) {//池子关闭了,就丢弃这个任务,什么都不执行就是丢弃。
                r.run();//池子没有关闭,在调用者线程执行这个任务,不再使用线程池的线程来执行任务。
            }
        }
    }

    public static class DiscardPolicy implements RejectedExecutionHandler1 {
        public DiscardPolicy() {
        }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
        }//什么都不执行就是丢弃任务
    }

    public static class DiscardOldestPolicy implements RejectedExecutionHandler1 {
        public DiscardOldestPolicy() {
        }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
            if (!e.isShutdown()) {//池子没有关闭
                e.getQueue().poll();//移除队列中的第一个元素,丢弃阻塞队列中靠最前的任务
                e.execute(r);//把这个任务丢进去
            }//池子关闭了什么都不做丢弃
        }
    }
}
public class ThreadPoolExample1 {
   
    public static void main(String[] args) {
        
        ThreadPoolExecutor1 executorService1 = new ThreadPoolExecutor1(3, 3, 6L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        
//        executorService1.prestartAllCoreThreads();
        
//        ThreadPoolExecutor1 executorService1 = (ThreadPoolExecutor1) Executors1.newCachedThreadPool();
//      ExecutorService executorService2 = Executors1.newSingleThreadExecutor();
        
        new Thread(() -> {//外部线程,不是线程池线程,丢任务给池子。
            int i = 0;
            while (true) {
                Threadd w = new Threadd(executorService1,"子线程"+(++i));//外部线程,不是线程池线程,丢任务给池子。
                w.start();
            }
        },"发射线程").start();
        
        executorService1.shutdown();
    }
    static int j =0; 
    static class Threadd extends Thread {//外部线程,不是线程池线程
        ThreadPoolExecutor1 executorService1;
        Threadd(ThreadPoolExecutor1 executorService1,String name) {
            super(name);
            this.executorService1 = executorService1;
        }
        @Override
        public void run() {//多个线程(不是main线程)来调用executorService.execute()来丢任务给池子。
            try {
                executorService1.execute(new Runnable() {//execute的参数是一个任务
                    String name = "任务"+(j++);
                    @Override
                    public void run() {
                        System.out.println(name+"完成");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

 

ThreadPoolExecutor源码2

标签:最小   advance   实现   越来越大   exec   保存   总线   启用   target   

原文地址:https://www.cnblogs.com/yaowen/p/11378173.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!