标签:get call 操作 异常 ued 最好 important abr sync
public interface Executor {
void execute(Runnable command);
}
执行提交的Runnable任务。其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。
ExecutorService继承自Executor,下面挑几个方法介绍:
void shutdown();
启动有序关闭线程池,在此过程中执行先前提交的任务,但不接受任何新任务。如果线程池已经关闭,调用此方法不会产生额外的效果。此方法不等待以前提交的任务完成执行,可以使用awaitTermination去实现。
List<Runnable> shutdownNow();
尝试停止所有正在积极执行的任务, 停止处理等待的任务,并返回等待执行的任务列表。 此方法不等待以前提交的任务完成执行,可以使用awaitTermination去实现。除了尽最大努力停止处理积极执行的任务外,没有任何保证。例如,典型的实现是:通过Thread#interrupt取消任务执行,但是任何未能响应中断的任务都可能永远不会终止。
boolean isShutdown();
返回线程池关闭状态。
boolean isTerminated();
如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用了shutdown或shutdownNow,否则isTerminated永远不会返回true。
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
线程阻塞阻塞,直到所有任务都在shutdown请求之后执行完毕,或者超时发生,或者当前线程被中断(以先发生的情况为准)。
<T> Future<T> submit(Callable<T> task);
提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。 Future的 get方法将在成功完成任务后返回任务的结果。
安排命令在给定的延迟之后运行,或者定期执行,继承自ExecutorService接口由以下四个方法组成:
//在给定延迟之后启动任务,返回ScheduledFuture
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//创建并执行一个周期性操作,该操作在给定的初始延迟之后首次启动,然后在给定的周期内执行;
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只会通过执行器的取消或终止而终止。
//如果此任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//创建并执行一个周期性操作,该操作在给定的初始延迟之后首次启动,然后在一次执行的终止和下一次执行的开始之间使用给定的延迟。
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只会通过执行器的取消或终止而终止。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
public interface ThreadFactory {
Thread newThread(Runnable r);
}
按需创建新线程的对象。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
返回任务结果也可能抛出异常。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
Future表示异步计算的结果。方法用于检查计算是否完成,等待计算完成并检索计算结果。只有当计算完成时,才可以使用方法get检索结果,如果需要,可以阻塞,直到准备好为止。取消由cancel方法执行。还提供了其他方法来确定任务是否正常完成或被取消。一旦计算完成,就不能取消计算。
public interface Delayed extends Comparable<Delayed> {
//在给定的时间单位中返回与此对象关联的剩余延迟
long getDelay(TimeUnit unit);
}
一种混合风格的接口,用于标记在给定延迟之后应该执行的对象。
public interface ScheduledFuture<V> extends Delayed, Future<V> {}
新任务进来时:
构造方法:
public ThreadPoolExecutor(
int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数说明:
有4个ThreeadPoolExecutor内部类。
最好自定义饱和策略,实现RejectedExecutionHandler接口,如:记录日志或持久化存储不能处理的任务。
线程池的内部状态由AtomicInteger修饰的ctl表示,其高3位表示线程池的运行状态,低29位表示线程池中的线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
主池控制状态ctl是一个原子整数,包含两个概念字段:
为了将这两个字段打包成一个整型,所以将workerCount限制为(2^29)-1个线程,而不是(2^31)-1个线程。
workerCount是工作线程数量。该值可能与实际活动线程的数量存在暂时性差异,例如,当ThreadFactory在被请求时无法创建线程,以及退出的线程在终止前仍在执行bookkeeping时。 用户可见的池大小报告为工作线程集的当前大小。
runState提供了生命周期,具有以下值:
为了允许有序比较,这些值之间的数值顺序很重要。运行状态会随着时间单调地增加,但不需要达到每个状态。转换:
当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
下面看以下其他状态信息:
//Integer.SIZE为32,COUNT_BITS为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//2^29-1 最大线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
* 111 0 0000 0000 0000 0000 0000 0000 0000
* -1 原码:0000 ... 0001 反码:1111 ... 1110 补码:1111 ... 1111
* 左移操作:后面补 0
* 111 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
* 000 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在* 运行的任务;
* 001 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 即高3位为010,所有任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法;
* 010 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 即高3位为011,terminated()方法执行完毕;
* 011 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//根据ctl计算runState
private static int runStateOf(int c) {
//2^29 = 001 0 0000 0000 0000 0000 0000 0000 0000
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
//假设c为 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
// 最终值: 001 0 0000 0000 0000 0000 0000 0000 0000
return c & ~CAPACITY;
}
//根据ctl计算 workerCount
private static int workerCountOf(int c) {
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//假设c = 000 0 0000 0000 0000 0000 0000 0000 0001 1个线程
//最终值: 000 0 0000 0000 0000 0000 0000 0000 0001 1
return c & CAPACITY;
}
// 根据runState和workerCount计算ctl
private static int ctlOf(int rs, int wc) {
//假设 rs: STOP 001 0 0000 0000 0000 0000 0000 0000 0000
//假设 wc: 000 0 0000 0000 0000 0000 0000 0000 0001 1个线程
//最终值: 001 0 0000 0000 0000 0000 0000 0000 0001
return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//RUNNING状态为负数,肯定小于SHUTDOWN,返回线程池是否为运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//试图增加ctl的workerCount字段值。
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//尝试减少ctl的workerCount字段值。
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//递减ctl的workerCount字段。这只在线程突然终止时调用(请参阅processWorkerExit)。在getTask中执行其他递减。
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
Doug Lea大神的设计啊,感觉计算机的基础真的是数学。
Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口。
维护了以下三个变量,其中completedTasks由volatile修饰。
//线程这个工作程序正在运行。如果工厂失败,则为空。
final Thread thread;
//要运行的初始任务。可能是null。
Runnable firstTask;
//线程任务计数器
volatile long completedTasks;
构造方法:
//使用ThreadFactory中给定的第一个任务和线程创建。
Worker(Runnable firstTask) {
//禁止中断,直到运行工作程序
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
既然实现了Runnable接口,必然实现run方法:
//Delegates main run loop to outer runWorker
public void run() {
//核心
runWorker(this);
}
先看一眼执行流程图,再看源码,会更清晰一点:
首先来看runWorker(Worker w)源码:
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取第一个任务
Runnable task = w.firstTask;
//第一个任务位置置空
w.firstTask = null;
//因为Worker实现了AQS,此处是释放锁,new Worker()是state==-1,此处是调用Worker类的 release(1)方法,将state置为0。Worker中interruptIfStarted()中只有state>=0才允许调用中断
w.unlock();
//是否突然完成,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
boolean completedAbruptly = true;
try {
//先处理firstTask,之后依次处理其他任务
while (task != null || (task = getTask()) != null) {
//获取锁
w.lock();
//如果池停止,确保线程被中断;如果没有,请确保线程没有中断。这需要在第二种情况下重新检查,以处理清除中断时的shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
//自定义实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//自定义实现
afterExecute(task, thrown);
}
} finally {
task = null;
//任务完成数+1
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//Worker的结束后的处理工作
processWorkerExit(w, completedAbruptly);
}
}
下面再来看上述源码中的getTask()与processWorkerExit(w, completedAbruptly)方法:
根据当前配置设置执行阻塞或定时等待任务,或者如果该worker因为任何原因必须退出,则返回null,在这种情况下workerCount将递减。
返回空的情况:
private Runnable getTask() {
// Did the last poll() time out?
boolean timedOut = false;
for (; ; ) {
//获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
//仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//递减ctl的workerCount字段
decrementWorkerCount();
return null;
}
//获取workerCount数量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程超时控制
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//尝试减少ctl的workerCount字段
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果有超时控制,则使用带超时时间的poll,否则使用take,没有任务的时候一直阻塞,这两个方法都会抛出InterruptedException
Runnable r = timed ?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :workQueue.take();
//有任务就返回
if (r != null)
return r;
//获取任务超时,肯定是走了poll逻辑
timedOut = true;
} catch (InterruptedException retry) {
//被中断
timedOut = false;
}
}
}
为垂死的worker进行清理和bookkeeping。仅从工作线程调用。除非completedAbruptly被设置,否则假定workerCount已经被调整以考虑退出。此方法从工作集中移除线程,如果线程池由于用户任务异常而退出,或者运行的工作池小于corePoolSize,或者队列非空但没有工作池, 则可能终止线程池或替换工作池。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If abrupt, then workerCount wasn't adjusted
// true:用户线程运行异常,需要扣减
// false:getTask方法中扣减线程数量
if (completedAbruptly)
//递减ctl的workerCount字段。
decrementWorkerCount();
//获取主锁,锁定
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新完成任务计数器
completedTaskCount += w.completedTasks;
//移除worker
workers.remove(w);
} finally {
//解锁
mainLock.unlock();
}
// 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
tryTerminate();
int c = ctl.get();
// 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
if (runStateLessThan(c, STOP)) {
// 正常退出,计算min:需要维护的最小线程数量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min ==0 或者workerQueue为空,min = 1
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 如果线程数量大于最少数量min,直接返回,不需要新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一个没有firstTask的worker
addWorker(null, false);
}
}
提交有两种:
任务执行流程图:
三步处理:
public void execute(Runnable command) {
//任务为空,抛出异常
if (command == null)
throw new NullPointerException();
//获取线程控制字段的值
int c = ctl.get();
//如果当前工作线程数量少于corePoolSize(核心线程数)
if (workerCountOf(c) < corePoolSize) {
//创建新的线程并执行任务,如果成功就返回
if (addWorker(command, true))
return;
//上一步失败,重新获取ctl
c = ctl.get();
}
//如果线城池正在运行,且入队成功
if (isRunning(c) && workQueue.offer(command)) {
//重新获取ctl
int recheck = ctl.get();
//如果线程没有运行且删除任务成功
if (!isRunning(recheck) && remove(command))
//拒绝任务
reject(command);
//如果当前的工作线程数量为0,只要还有活动的worker线程,就可以消费workerQueue中的任务
else if (workerCountOf(recheck) == 0)
//第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
addWorker(null, false);
} else if (!addWorker(command, false))
//如果线程池不是running状态 或者 无法入队列,尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
reject(command);
}
下面详细看一下上述代码中出现的方法:addWorker(Runnable firstTask, boolean core)。
检查是否可以根据当前池状态和给定的界限(核心或最大值)添加新worker,如果是这样,worker计数将相应地进行调整,如果可能,将创建并启动一个新worker, 并将运行firstTask作为其第一个任务。 如果池已停止或有资格关闭,则此方法返回false。如果线程工厂在被请求时没有创建线程,则返回false。如果线程创建失败,要么是由于线程工厂返回null,要么是由于异常 (通常是Thread.start()中的OutOfMemoryError)),我们将回滚。
private boolean addWorker(Runnable firstTask, boolean core) {
//好久没见过这种写法了
retry:
//线程池状态与工作线程数量处理,worker数量+1
for (; ; ) {
//获取当前线程池状态与线程数
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空。如果池子处于SHUTDOWN,STOP,TIDYING,TERMINATED的时候 不处理提交的任务,判断线程池是否可以添加worker线程
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
//线程池处于工作状态
for (; ; ) {
//获取工作线程数量
int wc = workerCountOf(c);
//如果线程数量超过最大值或者超过corePoolSize或者超过maximumPoolSize 拒绝执行任务
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//试图增加ctl的workerCount字段
if (compareAndIncrementWorkerCount(c))
//中断外层循环
break retry;
// Re-read ctl
c = ctl.get();
//如果当前线程池状态已经改变
if (runStateOf(c) != rs)
//继续外层循环
continue retry;
//否则CAS因workerCount更改而失败;重试内循环
}
}
//添加到worker线程集合,并启动线程,工作线程状态
boolean workerStarted = false;
boolean workerAdded = false;
//继承AQS并实现了Runnable接口
Worker w = null;
try {
//将任务封装
w = new Worker(firstTask);
//获取当前线程
final Thread t = w.thread;
if (t != null) {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
//全局锁定
mainLock.lock();
try {
//持锁时重新检查。退出ThreadFactory故障,或者在获取锁之前关闭。
int rs = runStateOf(ctl.get());
//如果当前线程池关闭了
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//测试该线程是否活动。如果线程已经启动并且还没有死,那么它就是活的。
if (t.isAlive())
throw new IllegalThreadStateException();
//入工作线程池
workers.add(w);
int s = workers.size();
//跟踪最大的池大小
if (s > largestPoolSize)
largestPoolSize = s;
//状态
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//如果工作线程加入成功,开始线程的执行,并设置状态
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判断工作线程是否启动成功
if (!workerStarted)
//回滚工作线程创建
addWorkerFailed(w);
}
//返回工作线程状态
return workerStarted;
}
再分析回滚工作线程创建逻辑方法:addWorkerFailed(w)。
回滚工作线程创建,如果存在,则从worker中移除worker, 递减ctl的workerCount字段。,重新检查终止,以防这个worker的存在导致终止。
private void addWorkerFailed(Worker w) {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果存在,则从worker中移除worker
if (w != null)
workers.remove(w);
//递减ctl的workerCount字段。
decrementWorkerCount();
//重新检查终止
tryTerminate();
} finally {
mainLock.unlock();
}
}
其中的tryTerminate()方法:
如果是SHUTDOWN或者STOP 且池子为空,转为TERMINATED状态。如果有条件终止,但是workerCount不为零,则中断空闲worker,以确保关机信号传播。必须在任何可能使终止成为可能的操作之后调用此方法--在关机期间减少worker数量或从队列中删除任务。该方法是非私有的,允许从ScheduledThreadPoolExecutor访问。
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
//如果线程池处于运行中,或者阻塞队列中仍有任务,返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
//还有工作线程
if (workerCountOf(c) != 0) {
//中断空闲工作线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//设置ctl状态TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//方法在执行程序终止时调用,默认什么都不执行
terminated();
} finally {
//完成terminated()方法,状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒所有等待条件的节点
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
//方法在执行程序终止时调用,默认什么都不执行
protected void terminated() {}
为给定的命令调用被拒绝的执行处理程序。
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
标签:get call 操作 异常 ued 最好 important abr sync
原文地址:https://www.cnblogs.com/clawhub/p/12064471.html