码迷,mamicode.com
首页 > 编程语言 > 详细

应用线程池ThreadPoolExecutor

时间:2014-08-31 18:43:41      阅读:312      评论:0      收藏:0      [点我收藏+]

标签:线程池   threadpoolexecutor   

线程池的大小

在配置和调整应用线程池的时候,首先考虑的是线程池的大小。

线程池的合理大小取决于未来提交的任务类型和所部署系统的特征。定制线程池的时候需要避免线程池的长度“过大”或者“过小”这两种极端情况。

线程池过大:那么线程对稀缺的CPU和内存资源的竞争,会导致内存高使用量,还可能耗尽资源。
线程池过小:由于存在很多可用的处理器资源还未工作,会对吞吐量造成损失。

精密的计算出线程池的确切大小是很困难的,一般我们会估算出一个合理的线程池大小。
对于计算密集型任务,一个具有N个处理核心的系统,可以使用N+1个线程的线程池。(这样当某个线程因为错误暂停,刚好有一个线程补上)
对于包含了I/O和其他阻塞操作的任务系统中,不是所有的线程都会在所有的时间被调度,因此就需要一个更大的线程池。你还需估算出任务花在等待的时间与用来计算的时间的比率。

Java中可以通过下面的这句代码获得CPU中处理核心的数量
int CPU_Num = Runtime.getRuntime().availableProcessors();

当然处理核心并不是唯一影响线程池大小的因素,如果线程池中的每个线程都要使用到池化资源(比如数据库连接池),那么数据库连接池的大小在指定线程池大小的时候也必须考虑进去。

配置ThreadPoolExecutor

通常我们会使用工具类Executors中的相关方法(如:newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool)去构建一个线程池,通过查看Executors类的相关源码,我们可以发现,这些方法都去实例化了一个ThreadPoolExectuor对象,只是传入构造方法的参数不一样。ThreadPoolExecutor是继承与抽象类AbstractExecutorService,是ExecutorService的一个实现,其中包含了多个重载的构造方法,用于去构造不同的线程池。
从ThreadPoolExecutor的构造方法中,我们能够得到构造一个线程池需要的一些参数。
  • corePoolSize:线程池中保存的核心线程的数量,即使这些核心线程是空转的。
  • maximumPoolSize:最大线程池的大小
  • keeAliveTime:当线程数大于核心线程数的时候,空闲线程存活的最大时间。
  • TimeUtil:keepAliveTime的时间单位
  • workQueue:执行前用于保存任务的队列(等候队列)。
  • handler:当任务超出最大线程池大小和等候队列的长度的时候,对后继到达任务的处理策略,也被叫做饱和策略。

其中核心线程数(corePoolSize)、最大线程数(maximumPoolSize)和存活时间(keepAliveTime)共同管理着线程的创建和销毁。

当一个ThreadPoolExecutor被初始创建后,所有的核心线程并非立即开始,而是要等到有任务提交的时候,除非你调用prestartAllCoreThreads。
当提交的任务数达到coolPoolSize大小后,之后提交的任务会被保存到workQueue中,而不是创建新的线程去执行它们。当workQueue充满后,就会去创建新的线程,但是总的线程数量不会大于maximumPoolSize。
当前线程数量大于corePoolSize的时候,如果空闲线程等待的时间超过了keepAliveTime那么这个空闲线程就会被销毁,当然如果当前线程数量没有超过corePoolSize,那么这个keepAliveTime是不起作用的。
通过调节核心大小和存活时间,可以促进线程池归还空闲线程占有的资源,让这些资源用于其他有用的工作,当然你必须权衡整个系统任务到来的数量和频率,因为频繁的创建和销毁线程会导致更大的开销。

1、查看Executors是如何通过newFixedThreadPool构建固定大小的线程池的
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

可以看到是直接去创建一个ThreadPoolExecutor实例,其中FixedThreadPool的核心线程池大小同最大线程池大小是一样的,keepAliveTime也被设置为0,也就是永远不会超时,使用LinkedBlockingQueue作为它的等候队列,它是一个无限的队列,如果当前的线程大于nThread,那么后续到达的任务都会保存在LinkedBlockingQueue,直到有可用的线程。如果到达的速度过快而又得不到线程处理,那么可能造成等候队列膨胀,导致内存耗尽。

2、查看Executors是如何通过newCachedThreadPool构建缓存线程池的
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

CachedThreadPool的核心线程池大小为0,最大线程池大小是Integer.MAX_VALUE,也就是无限大,存活的时间60秒,使用的SynchronousQueue作为工作队列,这个队列其实不是一个真正意义上的队列,因为它没有内部空间存放元素,而是一种管理直接在线程之间移交信息的机制,为了把一个任务放入到SynchronousQueue中,必须有另外一个线程要从Synchronous中取任务,这样我们每提交一个任务到CachedThreadPool的时候,ThreadPoolExecutor就会创建一个新的线程去取这个提交的任务来执行。

通过观察上面的两个线程池创建,我们也可以利用ThreadPoolExecutor去创建一个定制的线程池,不过还需要注意以下一些问题。
  • 工作队列(workQueue)选型
  • 饱和策略(handler)
  • 线程工厂的定制(ThreadFactory)

一、工作队列的选型:在前面的两个例子中,可以看到FixedThreadPool选用LinkedBlockingQueue作为工作队列,而CachedThreadPool选用SynchronousQueue作为工作队列,不同类型的工作队列会带来不一样的线程池特性。

使用线程池而不是“每任务每线程”(thread-per-task),一方面是为了方便线程的管理,另一方面是无限制创建线程锁带来的性能问题。在使用线程池的时候,如果新请求到达的频率超过了线程池能够处理它们的速度,请求将在等候队列中等待,但是如果请求速度过快,仍然存在耗尽资源的风险。
ThreadPoolExecutor允许你提供一个BlockingQueue来保存等待执行的任务。选用的队列有以下3中类型
  • 无限队列
  • 有限队列
  • 同步移交队列
其中BlockingQueue接口的实现如下图:
bubuko.com,布布扣
比如newFixedThreadPool和newSingleThreaExecutor就是选用了无限的LinkedBlockingQueue,允许等候任务数量无限制的增长,这在一定程度上不是安全的。一个稳妥的资源管理策略是使用有限队列,比如ArrayBlockingQueue或有限的LinkedBlockingQueue或者PriorityBlockingQueue,来防止任务过快增长,耗尽资源。但是使用有界队列又带来了新的问题,当到达界值的时候,仍然有新的任务源源不断的到来,该怎么办?这就是饱和策略处理的问题。还有一种同步移交队列,比如上述CacheThreadPool中使用的SynchronousQueue,是使用在一个特别庞大的或者无限的线程池之上,这相当于完全绕开了队列,将任务直接交给线程执行,这样做往往具有更好的效率。
使用LinkedBlockingQueue或者ArrayBlockingQueue这种FIFO队列会造成任务以它们到达的顺序执行,这是一种公平的执行策略。如果想控制任务执行的顺序,可以使用优先级阻塞队列PriorityBlockingQueue。
注意:只有当任务彼此独立的时候,才能使有限线程或有限队列的使用合理。倘若任务之间相互依赖,有限线程池或者有限队列会引起线程饥饿死锁。比如,线程池中的线程a执行一个依赖的任务1时,需要任务2的执行结果,而任务2因为没有执行线程正在等候队列中等待,这样正在执行的线程等待等候队列的任务,而等候队列中的任务又等待正在执行的线程结束。使用一个无限线程池可以避免这类问题。

二、饱和策略:任务饱和状态下,对后继任务的处理策略。

当一个有界队列充满后(线程池和等候队列都充满),饱和策略开始发挥作用。我们可以在通过ThreadPoolExecutor构建线程池的时候,将饱和策略传进去,也就是上面所说的参数handler,它是RejectedExecutionHandler类型的,JDK类库中提供了RejectedExecutionHandler接口的几种饱和策略的实现,这些实现类都是作为ThreadPoolExecutor的静态内部类存在的,有以下几种:
  • AbortPolicy:直接抛出RejectedExecutionExecption
  • CallerRunsPolicy:让调用者执行任务
  • DisCardPolicy:直接丢弃后继的任务
  • DiscardOldestPolicy:会丢弃最老的那个任务
默认的“abort”策略会抛出未检查的异常,供调用者捕获后编写自己的处理逻辑。“discard”策略会默默地直接丢弃到达的任务。“discardOldest”策略选择性的丢弃任务,丢弃最老(等候时间最长)的任务,也就是本该接下来执行的任务,然后尝试去重新提交任务。“caller-runs”策略既不会丢弃任务,也不会抛出异常,它会把任务推回到调用者那里,以减轻负担,它不会在线程池中执行最新提交的任务。
在一个无限队列中,我们也可以使用阻塞的方法控制任务向线程池提交的速率,比如,可以根据线程池大小指定一个信号量,来实现控制任务注入率。
补充:关于“caller-runs”策略,如在主线程中不断的向线程池提交任务,当达到饱和时,就会把新提交的任务推回到主线程中去执行,这样主线程就会去执行提交任务,而不在向线程池提交新任务,线程池也就有时间去处理等候任务,而且在一个WebService程序中,主线程在处理推回的提交任务的时候,是不会再接受新的web请求,这样新的请求就无法到达应用程序,在TCP层等候,由TCP层协议去决定后继任务的处理策略,这样一来就把负荷逐渐由应用线程池到主线程到TCP层外移,使得服务器在高负载的情况下可以平缓的劣化。

三、线程工厂的定制: 线程池创建新的线程是通过线程工厂去创建的,ThreadFactory接口只有一个唯一的方法:newThread,用于创建一个新的线程,默认的线程工厂会创建一个新的、非后台的线程。你可以通过实现ThreadFactory接口去实现自定义的线程工厂。


扩展ThreadPoolExecutor

决定ThreadPoolExecutor是可以扩展的,它提供了一些未实现的钩子方法让子类去实现,有beforeExecute、afterExecute、和terminate。执行任务的线程会调用这些方法,用它们去添加日志、时序、监视器或者统计信息的收集。就好像是通过AOP去实现一些切面逻辑一样,ThreadPoolExecutor让我们自己去实现已经提供的这些切面。
下面的这段代码显示了一个定制的线程池,它通过使用beforeExecute、afterExecute、terminated方法,加入了线程池执行任务过程中的日志和统计收集功能。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

public class TimingThreadPool extends ThreadPoolExecutor{
	
	private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
	private final Logger log = Logger.getLogger("TimingThreadPool");
	private final AtomicLong numTasks = new AtomicLong();
	private final AtomicLong totalTime = new AtomicLong();
	
	public TimingThreadPool(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
		// TODO Auto-generated constructor stub
	}
	
	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		// TODO Auto-generated method stub
		super.beforeExecute(t, r);
		String str = String.format("Thread %s: start %s", t, r);
//		System.out.println(str);
		log.fine(str);
		startTime.set(System.nanoTime());
	}
	
	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		// TODO Auto-generated method stub
		try {
			long endTime = System.nanoTime();
			long taskTime = endTime - startTime.get();
			numTasks.incrementAndGet();
			totalTime.addAndGet(taskTime);
			String str = String.format("Thread %s : end %s, time=%dns", t, r, taskTime);
//			System.out.println(str);
			log.fine(str);
		} finally {
			super.afterExecute(r, t);
		}
	}
	
	@Override
	protected void terminated() {
		// TODO Auto-generated method stub
		try {
			String str = String.format("Terminated : avg time = %dns", totalTime.get() / numTasks.get());
//			System.out.println(str);
			log.info(str);
		}finally {
			super.terminated();
		}
	}
	
	public Logger getLog() {
		return log;
	}
	
}

这样线程池执行过程中的日志信息就会被加入到该线程池的Logger中。




应用线程池ThreadPoolExecutor

标签:线程池   threadpoolexecutor   

原文地址:http://blog.csdn.net/diaorenxiang/article/details/38960285

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!