我们在使用线程的时候就去建立一个线程,这样实现起来非常简便,但是会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间段很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率。
那么如何解决此类问题呢?
在Java中可以通过线程池来解决这样的效果。前面有文章简单提到过线程池的使用。今天我们来详细讲解下Java的线程池,由易而难,循序渐进,步骤如下:
首先我们从最核心的ThreadPoolExecutor类的方法讲起
然后讲述它的实现原理
接着给出了相应的示例
最后我们讨论下如何合理配置线程池的大小
一、Java中的ThreadPoolExecutor类
在了解ThreadPoolExecutor类之前我们先来看下ThreadPoolExecutor的具体实现源代码
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; public class ThreadPoolExecutor extends AbstractExecutorService { ....//这里只给出构造器的源码, public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
从上面代码可以看到,继承了AbstractExecutorService类的ThreadPoolExecutor听了四个构造器,但最终执行的是第四个构造器
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
其中,corePoolSize:核心池的大小,创建线程池后,在默认情况下没有任何线程,而是等待有任务到来才创建线程。这个参数的意义:当线程池中线程的数据到达corePoolSize后,就会把到达的任务放到缓存队列中去。
maximumPoolSize:线程池最大线程数,表示在线程池中最多能够创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久会终止。
unit:参数keepAlivetime的时间单位。
workQueue:一个阻塞队列,用来存储等待执行的任务
threadFactory:线程工厂,主要用来创建线程
handler:表示当拒绝处理任务时的策略,主要有
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
在ThreadPoolExecutor类中有几个非常重要的方法:
execute() submit() shutdown() shutdownNow()
execute()方法实际上是在ThreadPoolExecutor的曾祖父类Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
shutdown()和shutdownNow()是用来关闭线程池的。
使用示例:
public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+ executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在执行task "+taskNum); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"执行完毕"); } }
但是在Java Doc中并建议我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池 Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
具体示例,请参照:http://aku28907.blog.51cto.com/5668513/1773073
2、如何合理配置线程池的大小
一般需要根据任务的类型来配置线程池大小:
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
如果是IO密集型任务,参考值可以设置为2*NCPU
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。
本文出自 “阿酷博客源” 博客,请务必保留此出处http://aku28907.blog.51cto.com/5668513/1775415
原文地址:http://aku28907.blog.51cto.com/5668513/1775415