标签:time nano worker count ict RKE 优先级 正数 并且
ScheduledThreadPoolExecutor 是能够在给定的延时之后、或周期性执行被提交任务的线程池
/**
* 线程池关闭时是否需要继续执行周期性任务
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* 线程池关闭时是否需要执行已经存在的延时任务
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* 执行 ScheduledFutureTask.cancel 操作时是否需要将任务从任务队列中移除
*/
volatile boolean removeOnCancel;
/**
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
private static final AtomicLong sequencer = new AtomicLong();
/**
* 默认的线程超时时间为 10 毫秒
*/
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
/** 当前任务需要在指定的纳秒时间后执行 */
private volatile long time;
/**
* 重复任务的执行周期,以纳秒为单位
* 正数表示以 fixed-rate 模式执行
* 负数表示以 fixed-delay 模式执行
* 0 表示不需要重复执行
*/
private final long period;
/** 需要重新入队的周期性任务 */
RunnableScheduledFuture<V> outerTask = this;
/**
* 当前任务在延迟队列中的索引,以支持快速删除
*/
int heapIndex;
/**
* 创建一个在 triggerTime 执行的一次性任务
*/
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}
/**
* 创建一个在 triggerTime 第一次执行,并以 period 为周期的周期性任务
*/
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long period, long sequenceNumber) {
super(r, result);
this.time = triggerTime;
this.period = period;
this.sequenceNumber = sequenceNumber;
}
/**
* 创建一个在 triggerTime 执行的一次性任务
*/
ScheduledFutureTask(Callable<V> callable, long triggerTime,
long sequenceNumber) {
super(callable);
this.time = triggerTime;
this.period = 0;
this.sequenceNumber = sequenceNumber;
}
/**
* 读取当前任务的延时时间
* created by ZXD at 9 Dec 2018 T 20:40:27
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
/**
* 当前任务是否是周期性任务
*/
@Override
public boolean isPeriodic() {
return period != 0;
}
/**
* 计算周期性任务的下一次触发时间
*/
private void setNextRunTime() {
final long p = period;
if (p > 0) {
// 基于上次记录的时间进行延时,可能已经超时
time += p;
} else {
// 基于 System.nanoTime() 进行延时
time = triggerTime(-p);
}
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
@Override
public void run() {
// 1)当前任务是否能在线程池中执行
if (!canRunInCurrentRunState(this)) {
// 不能执行,则将其取消
cancel(false);
// 2)如果不是周期性任务
} else if (!isPeriodic()) {
// 则执行该任务
super.run();
// 3)如果是周期性任务,运行任务并且重置状态
} else if (super.runAndReset()) {
// 计算周期性任务的下一次触发时间
setNextRunTime();
// 重新将任务加入到延时队列中
reExecutePeriodic(outerTask);
}
}
}
/**
* 基于二叉堆实现的延迟优先级队列,队列元素只能是 RunnableScheduledFuture 实例。
*/
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 保持元素的数组
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 互斥锁
private final ReentrantLock lock = new ReentrantLock();
// 元素总数
private int size;
/**
* 在队列头部阻塞等待任务的线程
*/
private Thread leader;
/**
* 是否有任务可用
*/
private final Condition available = lock.newCondition();
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 当前任务是否能在线程池中运行
if (canRunInCurrentRunState(task)) {
// 将任务加入到延时队列中
super.getQueue().add(task);
// 又进行一次判断,如果不能运行目标任务则尝试从延时队列中删除它
if (canRunInCurrentRunState(task) || !remove(task)) {
ensurePrestart();
return;
}
}
// 任务不能运行,则将其取消
task.cancel(false);
}
/**
* 在指定的延时后执行目标任务
*/
@Override
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null) {
throw new NullPointerException();
}
// 创建一个单次延时任务
final RunnableScheduledFuture<Void> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
// 加入延时队列中执行
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池处于 SHUTDOWN 及以上状态
if (isShutdown()) {
// 拒绝执行任务
reject(task);
} else {
// 将任务加入延时优先级队列
super.getQueue().add(task);
/**
* 当前任务不能执行 && 将其从延时队列中移除成功
*/
if (!canRunInCurrentRunState(task) && remove(task)) {
// 则取消该任务
task.cancel(false);
} else {
/**
* 尝试启动一个工作者线程来处理延时任务
* 1)当前工作者线程 < 核心线程数
* 2)当前工作者线程 == 0
*/
ensurePrestart();
}
}
}
/**
* 是否能在当前线程池状态下运行
*/
boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
// 线程池处于 RUNNING 状态,则允许运行
if (!isShutdown()) {
return true;
}
// 线程池已经停止,则不允许运行
if (isStopped()) {
return false;
}
/**
* 线程池处于 SHUTDOWN 状态正在停止
* 1)当前任务是周期任务,continueExistingPeriodicTasksAfterShutdown 默认为 false
* 2)当前任务是一次性任务,executeExistingDelayedTasksAfterShutdown 默认为 false
* 如果任务已经超时,则执行它。
*/
return task.isPeriodic()
? continueExistingPeriodicTasksAfterShutdown
: executeExistingDelayedTasksAfterShutdown
|| task.getDelay(NANOSECONDS) <= 0;
}
/**
* 尝试启动一个工作者线程来处理延时任务
*/
void ensurePrestart() {
final int wc = ThreadPoolExecutor.workerCountOf(ctl.get());
if (wc < corePoolSize) {
addWorker(null, true);
} else if (wc == 0) {
addWorker(null, false);
}
}
/**
* 在以 unit 为单位的 initialDelay 延时后执行第一次任务,并在
* initialDelay + period,initialDelay + 2 * period 等时间点周期性执行
*/
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 任务和时间单位不能为 null
if (command == null || unit == null) {
throw new NullPointerException();
}
// 执行周期必须 > 0
if (period <= 0L) {
throw new IllegalArgumentException();
}
final ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period),
sequencer.getAndIncrement());
final RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
/**
* 在以 unit 为单位的 initialDelay 延时后执行第一次任务,并在当次任务执行完成之后
* 在 delay 延时之后再次执行。
*/
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null) {
throw new NullPointerException();
}
if (delay <= 0L) {
throw new IllegalArgumentException();
}
final ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<>(command,
null,
triggerTime(initialDelay, unit),
-unit.toNanos(delay),
sequencer.getAndIncrement());
final RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
ScheduledThreadPoolExecutor 源码分析
标签:time nano worker count ict RKE 优先级 正数 并且
原文地址:https://www.cnblogs.com/zhuxudong/p/10093436.html