码迷,mamicode.com
首页 > 其他好文 > 详细

ThreadPoolExecutor 的 addworker方法

时间:2017-09-08 14:48:02      阅读:386      评论:0      收藏:0      [点我收藏+]

标签:failure   count   aqs   设定   stp   循环   vol   方法   running   

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

Worker 是ThreadpoolExecutor的内部类 

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

继承了aqs,实现了runnable接口,

aqs是一个同步队列,是reentrantlock的实际实现者,里面有一个 volatile 的 state属性,用cas操作来保证同步

runnable可以执行run方法,作为线程的逻辑实现。

 

言归正传,c = ctl.get(); ctl是一个原子类,

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

初始化的时候被赋值:RUNNING 是-1左移29位,也就是前三位是1,其他都是0

ctlof是把两个参数或操作,和0或不变

c= ctl.get()方法,因为c开始是负值,所以如果c自增,那么,前三位不变,从后面开始加

runstateof方法,

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static int runStateOf(int c)     { return c & ~CAPACITY; }

1左移29位到30位,减一,后面29位都为1,再取反,前面3位为1,后面29位为0,这个和-1左移29位一样。

c和 这个数相与,前三位不变,后面的都变为0,最后得到一个负数,所以一般和shutdown也就是0相比较,是不会大于等于0的。

一般第一个if条件进不去(如果线程增加的很多,导致负数变为0或者正数,如果正好为0,那么 要么firsttalk不为空,要么workqueue为空,才能返回false)

 

workerCountOf()方法, 返回除了 前三位 之后的位数,也就是当前线程数大小,如果大于等于 2的29次方,或者大于设定的核心线程或者最大线程数,表明已经不能再增加线程了,返回false

如果

if (compareAndIncrementWorkerCount(c))
break retry;

 

break retry 是跳出所有循环,向下走,如果不用retry,只能跳出一层循环。

 

c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)

 

如果cas失败,说明有并发操作已经更改了workercount,那么再获取一次

如果runstate已经变了,用continue retry,说明,加入的线程数量已经使得ctl从负数变为0了,那么重新判断一次shutdown,

  boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

获取aqs的worker里面,取当前属性中的线程,在new worker的时候,会

 

 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

new一个线程,引用付给thread,然后state设为-1.

 

所以后面是能取到thread的,然后加锁,如果rs小于0,说明线程并没有大到限值,(或者等于0,但是传入的task为空?)

如果当前线程已经是启动了start方法了,那么说明有异常,否则,将worker加入workers,然后启动线程。

 

ThreadPoolExecutor 的 addworker方法

标签:failure   count   aqs   设定   stp   循环   vol   方法   running   

原文地址:http://www.cnblogs.com/chuliang/p/7494151.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!