标签:获取 exce 否则插入 deque 长度 repo stack 排序 handler
特点:
ThreadPoolExecutor。Executor:将任务提交和任务执行进行解耦。(用户只需提供Runnable对象,由Executor完成线程的调度和任务的执行)ExecutorService:增加了为一批异步任务生成Future的方法。提供线程管控的能力(停止线程池的运行)AbstractExecutorService:上层的抽象类,将执行任务的流程串联起来,下层只要执行一个方法。ThreadPoolExecutor:管理线程和任务。runState(运行状态)workerCount(线程数量)ctl对两个变量一起维护:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));。ctl的高3位保存runState,低29位保存workerCount。execute()方法完成。流程:
workerCount<corePoolSize,则创建并启动一个线程并执行新提交的任务。workerCount>=corePoolSize,且线程池的阻塞队列未满,则任务添加到阻塞队列中。workerCount>=corePoolSize并且worker<maximumPoolSize,则线程池的阻塞队列已满,创建并启动一个线程来执行新提交的任务。workerCount>=maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务。(默认抛出异常)任务调度流程:

阻塞队列(Blocking Queue):
阻塞队列:

常见的阻塞队列:
| 名称 | 描述 |
|---|---|
| ArrayBlockingQueue | 一个用数组实现的有界阻塞队列,按照FIFO对任务排序,支持公平锁和非公平锁 |
| LinkedBlockingQueue | 基于链表结构的阻塞队列,按照FIFO排序任务.可以设置容量,不设置时为无界阻塞队列,最大长度为Integer.MAX_VALUE,newFixedThreadPool使用该队列. |
| DelayQueue | 延迟获取的无界队列,指定多久才能从队列中获取当前元素,只有延时期后才能从队列中获取元素,newScheduledThreadPool线程池使用该队列. |
| PriorityBlockingQueue | 支持线程优先级的无界队列,默认自然序排列,可自定义实现compareTo()方法指定元素的排序规则,不保证同一优先级的先后顺序 |
| SynchronousQueue | 不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移出操作,否则插入操作处于阻塞状态。支持公平锁和非公平锁。newCachedThreadPool线程池使用该队列,新任务到来时创建线程,有空闲线程则重复使用,线程空闲60秒会被回收。 |
| LinkedTransferQueue | 链表结构组成的无界阻塞队列,额外有transfer()和tryTransfer()方法 |
| LinkedBlockingDeque | 链表结构组成的双向阻塞队列,队列的头尾均可添加和移出元素,可减低锁的竞争 |
两种任务执行方式:
tryTask()方法实现线程从阻塞队列中获取任务:

当线程池的任务缓存队列已满,并且线程池中的线程数达到最大值时,需要拒绝新提交的任务。
可用实现接口RejectExecutionHadler来定制拒绝策略,或者使用四种已有策略:

为了管理线程的状态并维护线程的生命周期,设计工作线程Worker。
final Thread thread; // 当前Worker持有的线程
Runnable firstTask; // 初始化线程的任务(可为null)
volatile long completedTasks; // 完成的任务数
Worker执行任务的模型:

注:
firstTask为null,则线程启动时执行任务队列中的任务。若不为null,则执行firstTask这个任务。线程的状态:
shutdown()方法或tryTerminate()方法时调用interruptIdleWorkers()方法中断空闲线程。从而调用tryLock()判断线程池中是否有空闲线程。若是空闲的,则可以回收。
addWorker()方法增加线程.firstTaks:新增线程执行的第一个任务(可为null)core:true:判断当前活动线程数是否少于corePoolSize;false:判断当前活动线程数是否小于maximumPoolSize.新增线程流程:

线程的销毁:

run()方法会调用runWorker()方法.getTask()方法从阻塞队列中获取任务.getTask()结果为null,则跳出循环,执行processWorkerExit()方法,销毁线程.执行任务流程:

可以通过ThreadPoolExecutor来创建。
构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
参数:
corePoolSize:线程池核心线程数最大值maximumPoolSize:线程池最大线程数大小keepAliveTime:线程池非核心线程空闲的存活时间大小unit:线程空闲存活时间单位workQueue:存放任务的阻塞队列threadFactory:用于设置创建线程的工厂。handler:线程池饱和策略事件。线程池执行流程(对应于execute()方法)
corePoolSie时,线程池创建一个核心线程处理提交的任务。corePoolSie),新提交的任务被放在任务队列workQueue排队等待执行。corePoolSize时,并且任务队列workQueue已满,则判断是否到达maximumPoolSize(即是否达到最大线程数)。若没有,则创建非核心线程执行提交的任务。maxmumPoolSize时,新的任务采用拒绝策略处理。
AbortPolicy:抛出异常(默认)DiscardPolicy:抛弃任务,无异常。DiscardOldsPolicy:抛弃队列中最老的任务,将当前任务提交给线程池。CallerRunsPolicy:交给线程池调用所在的线程处理。try catch捕获异常.Future对象的get()方法接收抛出的异常,再处理.UncaughtExceptionHandler,在UncaughtExceptionHandler方法中处理异常.示例:
// 1. 通过try...catch捕获异常
try {
System.out.println(3/0);
} catch (Exception e){
System.out.println(e.getMessage());
}
// 2. sumbit执行,Future.get接收异常
Future<?> future = threadPool.submit(() -> {
System.out.println(3 / 0); // 产生异常
});
try {
future.get(); // 接收异常
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 3. 设置Thread.UncaughtExceptionHandler处理未检测的异常
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((t1, e) -> {
System.out.println(t1.getName() + " 抛出异常 " + e);
});
return t;
});
// 4. 重写ThreadPoolExecutor.afterExecute方法,处理传递的异常引用
class ExtendExecutor extends ThreadPoolExecutor {
public ExtendExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>){
try {
Object result = ((Future<?>) r).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
if (t != null)
System.out.println(t);
}
}
newFixedThreadPool:固定数目线程的线程池newCachedThreadPool:可缓存线程的线程池newSingleThreadExecutor:单线程的线程池newScheduledThreadPool:定时及周期执行的线程池newFixedThreadPool特点:
keepAliveTime为0工作机制:
coreSize.若小于,则创建核心线程执行任务.异常:
使用无界队列可能导致内存飙升.
若一个线程获取任务后,任务的执行时间比较长,导致队列的任务堆积过多,内存不足后抛出OOM.
使用场景:
适用于CPU密集型任务,确保CPU被工作线程使用的情况下,尽可能少的分配线程,适用与执行长期任务.
newCachedThreadPool特点:
提交的任务速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程.
空闲60秒的线程会被终止,长时间保持空闲的CachedThreadPool不会占用资源.
工作机制:
使用场景:
并发执行大量短期的小任务.
newSingleThreadExecutor特点:
工作机制:
使用场景:
串行执行任务的场景,一个任务接着一个任务地执行.
newScheduledThreadPool特点:
工作机制:
使用场景:
周期性执行任务的场景,需要限制线程数量的场景.
| 运行状态 | 描述 |
|---|---|
| RUNNING | 能够接受新提交的任务,也能处理阻塞队列中的任务 |
| SHUTDOWN | 关闭状态。不再接受新提交的任务,但可以继续处理阻塞队列中保存的任务 |
| STOP | 不能接受新任务,不处理队列中的任务,会中断正在处理任务的线程 |
| TIDYING | 所有任务都已终止,workCount为0 |
| TERMINATED | 在terminated()执行完后进入该状态 |
线程池生命周期:

参考:
标签:获取 exce 否则插入 deque 长度 repo stack 排序 handler
原文地址:https://www.cnblogs.com/truestoriesavici01/p/13217341.html