标签:
从JDK5开始,工作单元和执行机制隔离开来,工作单元包括Runnable和Callable,执行机制由Executor提供。
调用关系:Java线程一对一映射到本地操作系统的系统线程,当多线程程序分解若干任务,使用用户级的调度器(Executor框架)将任务映射为固定数量的线程,底层,操作系统吧、内核将这些线程映射到硬件处理器上。
Executor是一个接口,它将任务的提交与任务的执行分离开来。
ThreadPoolExecutor是线程池的核心实现类,执行被提交的任务
ScheduledThreadPoolExecutor是一个实现类,在给定的延迟后运行或定期执行命令
Future接口和实现Future接口的FutureTask类,代表异步计算的结果
Runable接口和Callable接口的实现类,可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行
(1)创建任务对象
创建实现Runnable接口或Callable接口的任务对象。Runnable不会返回结果,Callable可以返回结果
可以使用工厂类Executors把Runnable封装成一个Callable
public static Callable<Object> callable(Runnable task)
Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result)
(2)对象提交执行
把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command); ExecutorService.submit(Runnable task);ExecutorService.submit(Callable<T> task))。execute()方法用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功。submit()用于提交需要返回值的任务。
(3)返回值
如果执行ExecutorService.submit(...),将返回一个实现Future接口的对象FutureTask对象,主线程可以执行FutureTask.get()方法等待任务执行完成,也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消任务的执行。
ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
corePoolSize:线程池的基本大小,新任务到来时会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,当需要执行的任务数大于线程池基本大小则不创建。如果调用线程池的prestartAllCoreThread()方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池最大数量,线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数则创建新线程,对于无界队列该参数无效。
KeepAliveTime:线程活动保持时间,工作线程空闲后保持存活的时间,如果任务多每个任务执行时间短可以调大时间,提高线程利用率
TimeUnit:线程活动保持时间单位,可选单位,DAYS,HOURS,MINUTES,MILLISECONDS,MICROSECONDS,NANOSECONDS
BlockingQueue<Runnable>:任务队列,等待执行任务的阻塞队列,ArrayBlockingQueue:基于数组结构的有界阻塞队列,按照FIFO排序,LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序,吞吐量高于前者,Executors.newFixedThreadPool()使用该队列。SysnchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于前者,Executors.newCachedThreadPool()使用该队列,PriorityBlockingQueue:一个具有优先级的无线阻塞队列。
ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个线程设置有意义的命名,可默认设置。
RejectedExecutionHandler:饱和策略,当队列和线程池满了,必须采取一种策略处理新提交的任务。默认AbortPolicy:直接抛出异常。CallerRunsPolicy:只用调用者所在线程来运行任务;DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务;DiscardPolicy:不处理,丢弃掉。
1)线程池判断核心线程池里的线程是否都在执行任务。否,创建一个新的线程执行任务,如果都在执行任务进入下一步;
2)线程池判断工作队列是否已经满了,没有则新提交的任务存储在工作队列里,满了进入下一步;
3)线程池判断线程池的线程是否处于工作状态。没有,创建新的工作线程来执行任务,满了交给饱和策略处理。
ThreadPoolExecutor执行execute()方法流程:
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(该步需要获取全局锁)
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue;
3)如果队列已满无法将任务加入队列,则创建新的线程来处理任务(该步骤获取全局锁)
4)如果创建新线程将使得当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandle.rejectedExecution()方法。
FixedThreadPool,创建使用固定线程数,为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载比较重的服务器
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
SingleThreadPool,创建使用单个线程,适用于保证顺序执行各个任务,并且在任意时间点都不会有多个线程活动的场景,可用于处理共享资源问题
public static ExecutorService newSingleThreadExecutor(){
return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
CachedThreadPool,创建会根据需要创建新线程,初始化一定数量的线程,当新任务到来时没有空闲线程则创建新线程,有空闲重用以前的线程
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0,Integer.Max_VALUE,60L,TimeUnit.SECONDS,new SysnchronousQueue<Runnable>());
}超过60秒空闲线程将会终止
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,用于在给定的延迟以后执行任务或者定期执行任务,比Timer灵活,Timer对应单个后台线程而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
ScheduledExecutorService中至少有2个方法可用于周期性执行任务。
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
我们可以使用该方法延迟执行任务,设置任务的执行周期。时间周期从线程池中首先开始执行的线程算起,所以假设period为1s,线程执行了5s,那么下一个线程在第一个线程运行完后会很快被执行。
比如下面的代码
for (int i = 0; i < 3; i++) { Thread.sleep(1000); WorkerThread worker = new WorkerThread("do heavy processing"); // schedule task to execute at fixed rate scheduledThreadPool.scheduleAtFixedRate(worker, 0, 10, TimeUnit.SECONDS);
Current Time = Tue Oct 29 16:10:00 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:10:01 IST 2013 pool-1-thread-2 Start. Time = Tue Oct 29 16:10:02 IST 2013 pool-1-thread-3 Start. Time = Tue Oct 29 16:10:03 IST 2013 pool-1-thread-1 End. Time = Tue Oct 29 16:10:06 IST 2013 pool-1-thread-2 End. Time = Tue Oct 29 16:10:07 IST 2013 pool-1-thread-3 End. Time = Tue Oct 29 16:10:08 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:10:11 IST 2013 pool-1-thread-4 Start. Time = Tue Oct 29 16:10:12 IST 2013
for (int i = 0; i < 3; i++) { Thread.sleep(1000); WorkerThread worker = new WorkerThread("do heavy processing"); scheduledThreadPool.scheduleWithFixedDelay(worker, 0, 1, TimeUnit.SECONDS); }
Current Time = Tue Oct 29 16:14:13 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:14:14 IST 2013 pool-1-thread-2 Start. Time = Tue Oct 29 16:14:15 IST 2013 pool-1-thread-3 Start. Time = Tue Oct 29 16:14:16 IST 2013 pool-1-thread-1 End. Time = Tue Oct 29 16:14:19 IST 2013 pool-1-thread-2 End. Time = Tue Oct 29 16:14:20 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:14:20 IST 2013 pool-1-thread-3 End. Time = Tue Oct 29 16:14:21 IST 2013 pool-1-thread-4 Start. Time = Tue Oct 29 16:14:21 IST 2013
标签:
原文地址:http://www.cnblogs.com/yanko/p/5414804.html