标签:policy 回收 方式 目录结构 code card exception 无法 ons
引言
一、线程池工作流程图
二、线程池的运行原理
三、线程池的7个参数
四、常用4个阻塞队列
五、四个拒绝策略语义以及测试用例
六、Executors工具类
6.1. Executors提供的三种线程池
6.2 实际开发中应该怎样设定合适线程池?
七、线程池提交任务的2种
八、总结
? 我们为什么要使用线程池,它可以给我们带来什么好处?要想合理使用线程池,我们需要对线程池的工作原理有深入的理解和认识,让我们一起来看看吧。
好处:
? 1、处理响应快,不用每次任务到达,都需要等待初始化创建线程再执行,到了就能拿来用。
? 2、资源复用,减少系统资源消耗,减低创建和销毁线程的销毁。
? 3、可方便对线程池中线程统一分配,调优,监控,提高系统稳定性。
误区: 有没有人之前和我一样,以为当线程池中线程达到最大线程数后,才将任务加入阻塞队列?
先讲个小故事,一个银行网点的服务过程:
如某银行网点,总共有窗口5个,提供固定座椅3个供客人休息, 在非工作日窗口并不是全都开放,而是安排轮值窗口,比如开放2个窗口给客户办理业务。当客户1,2 进网点办理业务,可直接去窗口办理,后又来了3位客户,这三位客户只能取号在座椅等待, 这时如果再来3位客户,这时座椅不够坐了,大堂经理为了尽快给客户办理,只好增派人手,开放其他3个窗口;
这时5个窗口全部开放为客户办理业务,座椅还有3位客户排号等待;这时正值客流高峰期,如果再来客户办理业务,网点接待不过来,为了不让客户等待太长时间,这时可以对再来客户劝说选择其他时间过来,或者去其他就近网点办理。当客户高峰过去,客户逐渐稀少,这时临时增派人手的窗口工作人员就可以关闭窗口,只保留轮值2个窗口继续提供服务。
类比银行的服务过程,线程池的执行原理与之相似:
线程池中一开始没有线程,在有新任务加入进来,才创建核心线程处理任务,(针对某些业务需求,可以线程池预热执行prestartAllCoreThreads()方法,可以在线程池初始化后就创建好所有的核心线程)。当多个任务进来,线程池中的线程来不及处理完手上任务,就创建新的线程去处理,当线程数达到核心线程数( corePoolSize),就不再创建新的线程了,再有多的任务添加进来,加入阻塞队列等待;这里核心线程就如银行网点的轮值窗口,阻塞队列就如网点中的座椅, 但是网点中座椅是有限的,而线程池中的阻塞队列有可能接近无限,下文会详细讲述几种队列,这里假定线程池中队列也是有限的,在新加入的任务在阻塞队列中已经装不下的时候,这时就得加派人手,如果线程池中还没有达到最大线程数,创建新的线程来处理任务,如果线程池已经达到最大线程数,如网点办理窗口都开放了,等候区的椅子也坐满了客户,这时就得执行拒绝策略,不再接收新的任务;实际的拒绝策略方式更灵活,这里如此便于理解,下文再深入探讨。当线程处理完阻塞队列中任务,新加入的任务减少,或者没有任务添加,线程池中的非核心线程在空闲一定时间(keepAliveTime)后就被回收,可以节约资源。核心线程不会被回收,等待处理新加入的任务。
类比关系:
线程池 --> 银行网点
线程 --> 办理业务的窗口
任务 --> 客户
阻塞队列 --> 等候区的座椅
核心线程数 --> 轮值的窗口
最大线程数 --> 网点可以开放的所有窗口
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1) ArrayBlockingQueue 底层数组
2) LinkedBlockingQueue 底层链表
3) SynchronousQueue 不存储元素的队列, 没有容量的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作,较难理解,见下文示例分析。
4)PriorityBlockingQueue: 优先级排序队列,优先级高的任务先被执行,可能会导致优先级低的始终执行不到,导致饥饿现象。
注:在Executos工具类提供的三种线程池中, FixedThreadPool,SingleThreadExecutor都使用的LinkedBlockingQueue 链表结构的队列, CachedThreadPool使用的SynchronousQueue没有容量的队列。
1、AbortPolicy: 直接抛出异常 (默认方式)
2、CallerRunsPolicy: 抛给调用者去执行任务,如谁创建了线程池提交任务进来,那就找谁去执行,如主线程
3、DiscardOldestPolicy: 丢弃在队列中时间最长的,即当前排在队列首位的(先进来的任务),开发中是有适用业务场景,如等待最久的任务已经不具有再执行的意义了,如时效性比较强的业务。或者业务可允许一些任务。
4、DiscardPolicy: 新加入的任务直接丢弃,也不抛异常,直接不处理。
示例如下:
提交9个任务,超出线程池可最大容纳量8个
package ThreadPoolExcutor;
import java.util.concurrent.*;
/**
* @author zdd
* 2019/12/1 11:16 上午
* Description: 测试线程池4中拒绝策略
*/
public class ExcutorTest1 {
public static void main(String[] args) {
// System.out.println("cpu number:"+ Runtime.getRuntime().availableProcessors());
//实际开发中 自己创建线程池
// 核心线程数2,最大线程数5,阻塞队列容量3,即最大可容纳8个任务,再多就要执行拒绝策略。
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
//提交9个任务,超出线程池可最大容纳量8个
for (int i = 0; i < 9; i++) {
final int index =i+1;
//此时任务实际还未被提交,打印只是为了方便可见。
System.out.println("任务"+index +"被提交");
executorService.execute(()-> {
try {
//休眠1s,模拟处理任务
TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ " 执行任务" +index);
}) ;
}
executorService.shutdown();
}
}
执行结果:直接抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() //线程池采用该策略
执行结果:可见任务9被调用者主线程执行
new ThreadPoolExecutor.DiscardOldestPolicy())
执行过程: 任务1,2提交直接创建核心线程执行,任务3,4,5依次被放入阻塞队列中,任务6,7,8再提交创建非核心线程执行,此时任务9提交进来,执行拒绝策略,将阻塞队列中排在首位的任务3丢弃,放入任务9。
执行结果: 可见任务3被丢弃了,未执行。
new ThreadPoolExecutor.DiscardPolicy() //修改此处策略
执行结果: 可见有9个任务被提交,实际就8个任务被执行,任务9直接被丢弃
Executors, Executor,ExecutorService, ThreadPoolExecutor 之间的关系?
如下类图所示:
Executors是一个工具类,就如集合中Collections 一样,可以提供一些辅助的方法便于我们日常开发,如帮助创建线程池等。
在线程池中核心的类是上图颜色标识的ThreadPoolExecutor和 SchduledThreadPoolExecutor 两个类
ThreadPoolExecutor:创建线程池核心类,可以根据业务自定义符合需求的线程池。
SchduledThreadPoolExecutor:用于操作一些需要定时执行的任务,或者需要周期性执行的任务,如Timer类的功能,但是比Timer类更强大,因为Timer运行的多个TimeTask 中,只要其中之一没有被捕获处理异常,其他所有的都将被停止运行,SchduledThreadPoolExecutor没有这个问题。
Exectutos为我们提供了FixedThreadPool, SingleThreadExecutor, CachedThreadPool 三种线程池,
实际工作中如何使用线程池、用jdk工具类Excutors提供的三类,还是自己写,为什么?
//1,固定数量线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//2,可见队列默认大小非常大
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
解析:
Integer.MAX_VALUE = 2^31 -1,大概21亿,近似无界队列
1)核心线程数== 最大线程数
2)阻塞队列近似无界
3)由于1,2,空闲线程的最大生存时间(keepAliveTime)也是无效的,不会创建其他非核心线程
存在问题:网上有推荐使用该种方式创建线程池,因为有一个无界的阻塞队列,在生产环境出现业务突刺(访问高峰,任务突然暴增等),不会出现任务丢失;可一旦出现该种情况,阻塞队列就算无界,服务器资源,如内存等也是有限的,也无法处理如此多的任务,有OOM(内存溢出)的风险,也不是推荐的方法。
**/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
解析:
1)核心线程 =最大线程数=1,线程池中仅有1个线程
2)采用无界阻塞队列
1,2,可以实现所有的任务被唯一的线程有序地处理执行。
**/
//线程最大线程数近似无界
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
解析:
1)核心线程数 ==0
2)最大线程数无界
3)采用没有容量的阻塞队列
4)空闲线程可存活60s,超过60s无新任务进来就会被回收。
5)如果主线程提交任务的速度大于线程处理任务的速度时,会不断创建新的线程,因最大线程数时无界的,极端情况有可能耗尽cup和内存资源。
6)SynchronousQueue 队列既然没有容量,是怎样是机制实现添加任务和线程获取任务去执行的呢?
那要实现添加和获取任务的配对:即 offer()和 poll() 方法的配对
从添加角度看:主线程添加任务到线程池中(调用SynchronousQueue.offer(task)),当前没有空闲线程可用,则创建新线程处理,有空闲线程给它执行。
从获取角度看:线程池中线程处理完手上任务后,去阻塞队列获取新的任务(调用SynchronousQueue.poll()方法),没有任务空闲的线程在SynchronousQueue中等待最多60s,即空闲线程去队列中等待任务提交,在这期间主线程没有新任务提交,线程就会被回收,如有新任务提交则处理执行。免于被回收的厄运; 当线程池中较长时间没有新任务添加,整个线程池都空闲时,线程都会被回收,此时没有线程存在,可节约资源。
**/
在分析了Executors工具类提供的创建三种线程池, 虽然简单易用,但在实际开发中我们却不建议使用,因此我们需要根据公司业务来自己创建线程池。在阿里巴巴的Java开发手册中也强制不让使用Executors去创建线程池,都有OOM的风险。如:
cpu 密集型任务:尽量创建少一些线程 , cpu个数+1
IO 密集型任务: 线程数可以多一些,cup个数*2
//可获取当前设备的cpu个数
Runtime.getRuntime().availableProcessors()
void execute(Runnable command)
//分割线 ---
Future<?> submit(Runnable task)
Future<T> submit(Callable<T> task)
示例: 使用线程池的submit提交异步任务,主线程调用 FutureTask的get() 方法,在异步任务未执行完毕前,主线程阻塞等待,异步任务执行结束,获取到返回结果。
适用场景 :当一个线程需要开启另一个线程去执行异步任务,而需要异步任务的返回结果,存在数据依赖关系,在实际开发中,可将一次任务拆分为多个子任务,开启多个线程去并发执行,最后异步获取结果,能有效提高程序执行效率。
代码如下:
package ThreadPoolExcutor;
import java.util.concurrent.*;
/**
* @author zdd
* 2019/12/5 7:22 下午
* Description:测试 Callable与FutureTask的简单实用,执行异步任务
*/
public class ThreadPoolSubmitTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1,创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
Callable callableTask = new Callable() {
@Override
public Object call() throws Exception {
try {
//1,一个异步任务,模拟执行一个比较耗时的业务。休眠3s
TimeUnit.SECONDS.sleep(3);
System.out.println("休眠3s结束! ");
} catch (InterruptedException e) {
e.printStackTrace();
}
//2,返回执行结果
return "ok!";
}};
FutureTask<String> futureTask = new FutureTask(callableTask);
threadPool.submit(futureTask);
// 2,主线程想要获取 异步任务的执行结果
System.out.println(futureTask.get());
//3,关闭线程池
threadPool.shutdown();
}
}
执行结果:主线程阻塞等待直至获取到结果
休眠3s结束!
ok!
开发中推荐使用线程池创建线程, 可减少线程的创建和销毁的时间和系统资源的开销,合理使用线程池,可节约资源,减少每次都创建线程的开销,可实现线程的重复使用。本文从线程池的内部工作原理开始介绍、以及Jdk为我们默认提供的三类线程池的特点和缺陷、线程池中常用的3种阻塞队列、以及4种拒绝策略、开发中我们推荐自定义线程池、线程池2种提交任务的方式等;在了解线程池后,开发中我们能够避免踩坑,也能有效让它为我们所用,提升开发效率,节约资源。
参考资料:
1、Java 并发编程的艺术 - 方腾飞
2、Java 开发手册
标签:policy 回收 方式 目录结构 code card exception 无法 ons
原文地址:https://www.cnblogs.com/flydashpig/p/12003704.html