码迷,mamicode.com
首页 > 编程语言 > 详细

不惑JAVA之JAVA基础 - 线程池

时间:2016-05-07 10:33:22      阅读:185      评论:0      收藏:0      [点我收藏+]

标签:

简单原理

一个比较简单的线程池至少应包含线程池管理器、工作线程、任务队列、任务接口等部分。

  • 线程池管理器(ThreadPool Manager)的作用是创建、销毁并管理线程池,将工作线程放入线程池中;
  • 工作线程是一个可以循环执行任务的线程,在没有任务时进行等待;
  • 任务队列的作用是提供一种缓冲机制,将没有处理的任务放在任务队列中;
  • 任务接口是每个任务必须实现的接口,主要用来规定任务的入口、任务执行完后的收尾工作、任务的执行状态等,工作线程通过该接口调度任务的执行。

线程池的优点

  1. 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  2. 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行;
  3. 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

例子

之前项目中用到的代码

public Integer repayUnprocessBorrowRepayTaskIds(boolean isMultiThread) {

        int updateOverdueBadloanThreadPoolSize = BORROW_REPAY_THREAD_POOL_SIZE;
        // 吐槽一下,对Redis的滥用啊,写到配置文件中不是更好么?
        String updateOverdueBadloanThreadPoolSize_s = redisService.get(Constants.BORROW_REPAY_THREAD_POOL_SIZE);
        if (StringUtil.isNotEmpty(updateOverdueBadloanThreadPoolSize_s)) {
            updateOverdueBadloanThreadPoolSize = Integer.parseInt(updateOverdueBadloanThreadPoolSize_s);
        }
        log.info("BORROW_REPAY_THREAD_POOL_SIZE:" + updateOverdueBadloanThreadPoolSize);
        // ------1 线程池初始化代码 
        ExecutorService executorService = Executors.newFixedThreadPool(updateOverdueBadloanThreadPoolSize);
        int _counter = 0;
        List<Integer> borrowRepayTaskIds = borrowRepayTaskService.getUnprocessBorrowRepayTaskIds(updateOverdueBadloanThreadPoolSize);
        if (StringUtil.isNotEmpty(borrowRepayTaskIds)) {
            log.info("repayUnprocessBorrowRepayTaskIds ready to process, count:" + borrowRepayTaskIds.size());
            for (Integer borrowRepayTaskId : borrowRepayTaskIds) {
                _counter++;
                if (isMultiThread && updateOverdueBadloanThreadPoolSize > 1) { // 多线程
                // --------2 线程池执行代码
                    executorService.execute(new BorrowRepayTaskThread(this, borrowRepayTaskId));
                } else { // 单线程
                    repayByTask(borrowRepayTaskId);
                }
            }
        } else {
            log.info("repayUnprocessBorrowRepayTaskIds ready to process, count:0");
        }
        executorService.shutdown();
        try {
            while (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                log.info("Waiting repayUnprocessBorrowRepayTaskIds thread pool close..."); // Waiting thread pool close...
            }
        } catch (InterruptedException ex) { // should not reach here
            ex.printStackTrace();
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        } catch (Exception ex) { // should not reach here
            ex.printStackTrace();
            executorService.shutdownNow();
        }
        log.info("repayUnprocessBorrowRepayTaskIds done, count:" + _counter);
        return _counter;
    }
  1. Executors类中提供的几个静态方法来创建线程池;
  2. 这里使用了newFixedThreadPool线程池。newFixedThreadPool的作用是创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程;
  3. ExecutorService是线程池接口

几种常用线程池

newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

newFixedThreadPool(常用)
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

newFixedThreadPool底层实现

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

this是调用的
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数解释:

  • corePoolSize - 池中所保存的线程数,包括空闲线程。 (通过应用参数传过来的)
  • maximumPoolSize - 池中允许的最大线程数。 (与corePoolSize数量一致)
  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 (这里是0L)
  • unit - keepAliveTime 参数的时间单位。
  • workQueue - 执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
    (newFixedThreadPool使用的是LinkedBlockingQueue线程安全阻塞队列)
  • handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
    (newFixedThreadPool使用的是RejectedExecutionHandler)

详细说一下corePoolSize
核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

线程池的主要工作流程

可以看到使用ThreadPoolExecutor方法实现的,深入研究一下了解具体的实现原理。
技术分享

  1. 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  3. 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

线程池执行任务方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
        // 如果线程数小于基本线程数,则创建线程并执行当前任务
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

工作线程

线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从Worker的run方法里看到这点。

public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //当任务队列中没有任务时,进行清理工作       
        }
    }
}

线程池配置与监控

下面这些方法可以了解一下,我觉得最好的方法就是通过压力测试,观察核心功能的处理能力,服务器CPU、IO和JVM的具体情况,进行调试是最好的。

合理配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
2. 任务的优先级:高,中和低。
3. 任务的执行时间:长,中和短。
4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。
5. 任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。
6. 建议使用有界队列。

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

* taskCount:线程池需要执行的任务数量。
* completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
* largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
* getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
* getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。

参考
http://www.cnblogs.com/dolphin0520/p/3932921.html
http://ifeve.com/java-threadpool/
http://my.oschina.net/jielucky/blog/157250?fromerr=imwl7YeR 这个解释的比较详细。

不惑JAVA之JAVA基础 - 线程池

标签:

原文地址:http://blog.csdn.net/happy_85/article/details/51329753

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!