码迷,mamicode.com
首页 > 编程语言 > 详细

线程池源码解析

时间:2019-09-20 18:35:56      阅读:100      评论:0      收藏:0      [点我收藏+]

标签:cond   中断   pool   ble   全局   thrown   start   com   为什么   

ThreadPoolExecutor的几个重要属性

  • BlockingQueue workQueue

    阻塞队列。存放将要执行的任务

  • HashSet workers

    当前线程池的线程集合。下文会重点介绍Worker这个内部类

  • corePoolSize

    核心线程数

  • maximumPoolSize

    最大线程数

  • keepAliveTime

    非核心线程保持空闲的最长时间

  • allowCoreThreadTimeOut

    核心线程是否被回收。默认是不回收核心线程的

  • RejectedExecutionHandler defaultHandler = new AbortPolicy()

    默认拒绝策略。可以看到默认是抛异常

      public static class AbortPolicy implements RejectedExecutionHandler {
    
          public AbortPolicy() { }
    
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              throw new RejectedExecutionException("Task " + r.toString() +
                                                   " rejected from " +
                                                   e.toString());
          }
      }

源码分析

execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
        
    int c = ctl.get();
    //当前线程数 < 核心线程数        
    if (workerCountOf(c) < corePoolSize) {
        //新建线程并执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //(上一步addWorker失败了 || 当前线程数 >= 核心线程数) && 阻塞队列未满 && 线程池运行中
    if (isRunning(c) && workQueue.offer(command)) {
        //为了再次校验线程状态
        int recheck = ctl.get();
        //线程池不是运行中 && 将任务移除阻塞队列成功
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //所有线程都被回收了 但是之前workQueue已经接收了任务
        else if (workerCountOf(recheck) == 0)
            //这里为什么传null?
            addWorker(null, false);
    }
    
    // 阻塞队列满了
    // 当前线程数 < 最大线程数 新建线程会成功
    else if (!addWorker(command, false))
        //当前线程数 >= 最大线程数 执行拒绝策略
        reject(command);
}

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 很恶心的判断。就当线程池被搞了吧。正常情况下不会进来
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 根据core参数来判断能不能新建线程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //改线程数+1。后续失败会对这个操作回滚
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //这之前的操作其实就是一些校验,相当于预创建线程
    //现在才开始真正的创建线程并执行
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //将任务封装为Worker。new出来的时候内部就新建了一个线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //获取全局锁 一个个来执行
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                //再进行一系列的校验
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //已经被执行了抛异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    //记录下这个线程池整个生命周期里的最大的线程数(这东西没什么卵用)
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 这里其实调的是Worker#runWorker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //擦屁股。因为之前的预创建在还没正真执行的时候就将工作线程数+1了,所以这里回滚。再从workers中移除
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker.runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //之前的伏笔 task = null的时候 走 task = getTask()
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //如果线程池状态大于等于STOP,那么意味着该线程也要中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //钩子方法。由开发者重写扩展
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //钩子方法。由开发者重写扩展
                    afterExecute(task, thrown);
                }
            } finally {
                //置空task,准备getTask获取下一个任务
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //将这个worker移除
        //运行状态小于STOP的情况下
        //allowCoreThreadTimeOut为false && 当前线程数小于核心线程数 新建一个worker
        processWorkerExit(w, completedAbruptly);
    }
}

getTask

private Runnable getTask() {
        boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池被搞了
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否要回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //keepAliveTime的作用
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

# 总结

  • 当前线程数 < 核心线程数:直接新建一个线程并执行任务

  • 当前线程数 >= 核心线程数 && 阻塞队列未满:将任务放入到阻塞队列

  • 核心线程数 <= 当前线程数 < 最大线程数 && 阻塞队列已满:新建线程并执行任务

  • 当前线程数 >= 最大线程数 && 阻塞队列已满:执行拒绝策略

--

当阻塞队列已经接收了任务,但此时所有线程被回收了,此时的任务将如何处理?

else if (workerCountOf(recheck) == 0)
    //这里为什么传null?
    addWorker(null, false);

新建一个线程去阻塞队列里获取任务并执行。

--

扩展

fixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

核心线程数=最大线程数。

阻塞队列默认的用LinkedBlockingQueue且容量是Integer.MAX_VALUE。

那么问题来了。只要阻塞队列不满,这个线程池就一直会接收任务。到达一定数量,还未到Integer.MAX_VALUE的时候机器肯定爆了。显然我们实际项目中不应该直接用fixedThreadPool

cachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

最大线程数是Integer.MAX_VALUE。只要有任务过来,不断的新建线程。

--

这两种默认的线程池说白了就是无限接受任务。所以我们实际项目中应该自己构造线程池来解决实际需求。

线程池源码解析

标签:cond   中断   pool   ble   全局   thrown   start   com   为什么   

原文地址:https://www.cnblogs.com/chenshengyue/p/11558648.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!