标签:大于 rate 业务 无效 持久化存储 指定 可重入锁 ons 延迟
线程池是可以控制线程创建、释放、并通过某种策略尝试复用线程去执行任务的一种管理框架,从而实现线程资源与任务之间的一种平衡。

public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
/**
* 启动一次有序的关闭,之前提交的任务执行,但不接受新任务
*/
void shutdown();
/**
* 试图停止所有正在执行的任务,暂停处理正在等待的任务,返回一个等待执行的任务列表
* 这个方法不会等待正在执行的任务终止
*/
List<Runnable> shutdownNow();
// 如果已经被shutdown,返回true
boolean isShutdown();
// 如果所有任务都已经被终止,返回true
boolean isTerminated();
// 在一个shutdown请求后,阻塞的等待所有任务执行完毕
// 或者到达超时时间,或者当前线程被中断
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// 提交一个有返回值的任务,并返回一个Future代表等待的任务执行的结果, 等到任务成功执行,Future#get()方法会返回任务执行的结果
<T> Future<T> submit(Callable<T> task);
// 提交一个可以执行的任务,返回一个Future代表这个任务, 等到任务执行结束,Future#get()方法会返回这个给定的result
<T> Future<T> submit(Runnable task, T result);
// 提交一个可执行的任务,返回一个Future代表这个任务, 等到任务成功执行,Future#get()方法会返回null
Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public class ThreadPoolExecutor extends AbstractExecutorService {
}
public interface ScheduledExecutorService extends ExecutorService {
/**
* 在给定延时后,创建并执行一个一次性的Runnable任务,任务执行完毕后,ScheduledFuture#get()方法会返回null
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 在给定延时后,创建并执行一个ScheduledFutureTask,ScheduledFuture 可以获取结果或取消任务
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期,也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执 行,接着在 initialDelay + 2 * period 后执行,依此类推
* 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止,如果任何执行的任务超过了周期,随后的执行会延时,不会并发执行
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟
* 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
}
public static ExecutorService newCachedThreadPool(); public static ExecutorService newFixedThreadPool(int nThreads); public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize); public static ExecutorService newSingleThreadExecutor();
(1)newCachedThreadPool: 创建一个可缓存工作线程的线程池,默认存活时间是60s,线程数量可达到Integer.MAX_VALUE,内部使用SynchronousQueue作为阻塞队列;

public interface ThreadFactory {
Thread newThread(Runnable r);
}
很显然这个工厂模式(或工厂方法模式)的应用。工厂方法模式:定义一个用于创建对象的接口(如ThreadFactory),然子类决定将哪一个类实例化(比如Executors.DefaultThreadFactory),让一个类的实例化延迟到其子类。

static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) {
if (command == null) // 不允许提交空任务
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 如果wc < corePoolSize则直接创建新任务执行
if (addWorker(command, true)) // 如果提交成功则直接退出,失败的原因有:(1)线程池已经shutdown(2)wc < corePoolSize之后,由于并发导致wc>=corePoolSize
return;
c = ctl.get();// 凡是需要再次使用ctl做判断时,都会再次调用ctl.get()获取最新值
}
if (isRunning(c) && workQueue.offer(command)) { // 如果corePoolSize < wc < maximumPoolSize && 运行中,则入队列;如果队列满,则可能入队失败
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 如果线程池不是running状态,应该拒绝添加新任务,出队列;双重检测,回滚入队列;删除队列元素失败的场景可能是:刚好有一个线程执行了该任务。
reject(command);
else if (workerCountOf(recheck) == 0) // 如果线程池中无活动的线程,则增加一个线程,可能不断从队列中获取任务并执行。
addWorker(null, false); // 添加一个线程
}
else if (!addWorker(command, false)) // 如果队列满了,则创建新线程运行任务,
reject(command); // 如果失败,则说明线程池shutdown或者饱和了
}
具体流程图如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 外层循环,负责判断线程池状态
int c = ctl.get();
int rs = runStateOf(c);
<span style="white-space:pre"> </span> // 线程池状态值的大小为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
// 有如下几种场景是可以直接拒绝增加新线程的:rs至少是SHUTDOWN状态,以下3个条件任意一个是false(1)rs==SHUTDOWN,false的情况是:线程池已经超过了SHUTDOWn状态,可能是STOP,TIDYING,TERMINATED其中之一,即线程已经终止了(2)firstTask==null,隐含rs==SHUTDOWN,false:firstTask!=null,场景是线程池已经shutdown,还要添加新的任务,拒绝(3)!workQueue.isEmpty(),隐含rs==SHUTDOWN && firstTask==null,false:wq为空,当firstTask为空是为了创建一个没有任务的线程,从wq中获取任务,如果wq为空,就没有添加新线程的必要了。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { // 内层循环,负责workerCount + 1
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 如果wc > 线程池最大上限CAPACITY,或者wc > corePoolSize(maximumPoolSize)
return false;
if (compareAndIncrementWorkerCount(c)) // CAS操作,使得wc + 1,成功则跳出retry循环,标示累加成功
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 如果之前的CAS操作失败,需要从新获取线程池状态,如果与之前不一致,跳出内循环,继续去外层循环判断
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false; // 线程是否启动成功
boolean workerAdded = false; // 线程是否增加成功
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 如果线程池在运行,或者线程池已经shutdown且firstTask==null(可能是wq中有未执行完成的任务,攒国家没有初始化的worker去执行队列中的任务)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 如果添加成功,则启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果启动失败
addWorkerFailed(w);
}
return workerStarted;
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 此处将state状态设置为0,允许中断
boolean completedAbruptly = true; // true:异常退出,false:正常退出
try {
while (task != null || (task = getTask()) != null) { // 如果firstTask == null,则从队列中获取。
w.lock();// 不是为了防止并发,而是为了在shutdown状态下,不能终止正在运行的worker(worker是不可重入锁)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();// 任务执行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 完成任务 + 1
w.unlock(); // 解锁
}
}
completedAbruptly = false;// 正常执行
} finally {
processWorkerExit(w, completedAbruptly); // 处理worker退出
}
}
private Runnable getTask() {
boolean timedOut = false; // 调用最新的poll()是否超时?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // (1)如果线程池状态为>=stop(2)线程池状态为SHUTDOWN并且队列为空
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 默认allowCoreThreadTimeOut=false,所以当workerCount > corePoolSize时
// 这个分支包括4种情况:(1)wc > maximumPoolSize && wc > 1 (2) wc > maximumPoolSize && workQueue.isEmpty()
// (3)timed && timedOut && wc > 1 (4) timed && timedOut && workQueue.isEmpty()
if ((wc > maximumPoolSize || (timed && timedOut)) // wc >
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果workerCount > corePoolSize,则超时获取任务,否则阻塞获取任务
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null) // 如果r == null肯定是超时获取的任务
return r;
timedOut = true; // 只要超时获取那一直都是这个状态
} catch (InterruptedException retry) {
timedOut = false; // 如果在阻塞过程中,被interrrupt,重置timedOut为false
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果true说明worker是异常退出,如果是false标示worker是正常退出并且worker没有可执行的task,不用-1,因为在getTask里面已经-1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try { // 统计
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();// 在对线程池有负效益的操作时,都需要尝试终止线程池。
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 如果状态是running和shutdown
if (!completedAbruptly) { // 如果线程是异常退出,则直接addWorker,
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // allowCoreThreadTimeOut默认是false,即min默认为corePoolSize
if (min == 0 && ! workQueue.isEmpty()) // allowCoreThreadTimeOut=false标示核心线程即使空闲也要保持存活,如果=true,则标示使用keepAliveTime标示存活时间
min = 1; // 如果队列不空,那么至少也要保持一个线程存在
if (workerCountOf(c) >= min) // 如果线程池中线程数量>=min,则直接返回,否则需要addWorker
return; // replacement not needed
}
addWorker(null, false);
}
}
// 终止线程池,之前提交的任务会执行完,但是新提交的任务会拒绝,重复调用无影响,该方法不会等待线程池真正的终止。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);// 设置线程状态,该方法只允许设置stop和shutdown,tidying和terminated状态在tryTerminate()方法中设置,内部是利用CAS+循环设置线程状态
interruptIdleWorkers(); // 中断所有的空闲worker
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试终止线程池
}
/ onlyOne如果为true,最多interrupt一个worker;空闲线程:等待任务的线程,即中断在等待任务的线程(没有上锁),中断唤醒继续循环,会判断线程池状态退出获取task
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // 运行中的worker,是占用锁的,而worker是非重入锁,即运行中的worker不能中断。
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
// 以下场景线程池变为terminated,(1)shutdown && wc == 0 && wq.isEmpty()(2)stop状态
// 这个方法必须在任何可能导致线程池终止的情况下被调用,如减少worker数量等。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 3种情况不需要中断线程池(1)running(2)状态是tidying或terminated(3)SHUTDOWN并且队列不空
return;
if (workerCountOf(c) != 0) { // 只有SHUTDOWN并且队列空或者stop状态能到这里
interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的空闲worker
return;
}
// 如果SHUTDOWN队列空,运行的worker也没有了,则可以terminated了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // tidying状态
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));// terminated状态
termination.signalAll();//真正的终止了线程池,唤醒因为调用了awaitTermination()方法而阻塞的线程。
}
return;
}
} finally {
mainLock.unlock();
} // else retry on failed CAS
}
}
// 粗鲁的终止线程池,并返回等待执行的任务列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); // 中断所有线程,包括运行的。
tasks = drainQueue(); // 返回等待处理的任务
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 中断所有worker
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // state是否大于零,即worker是否已经开始运行并且还未interrupt
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
// 等待线程池终止
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true; // 终止返回true
if (nanos <= 0) // 超时返回false
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
exec.shutdown();
try{
while(!exec.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for terminate");
}
} catch (InterruptedException e) {
//中断处理
}
标签:大于 rate 业务 无效 持久化存储 指定 可重入锁 ons 延迟
原文地址:http://www.cnblogs.com/lujiango/p/7581119.html