标签:大于 rate 业务 无效 持久化存储 指定 可重入锁 ons 延迟
线程池是可以控制线程创建、释放、并通过某种策略尝试复用线程去执行任务的一种管理框架,从而实现线程资源与任务之间的一种平衡。
public interface Executor { void execute(Runnable command); }
public interface ExecutorService extends Executor { /** * 启动一次有序的关闭,之前提交的任务执行,但不接受新任务 */ void shutdown(); /** * 试图停止所有正在执行的任务,暂停处理正在等待的任务,返回一个等待执行的任务列表 * 这个方法不会等待正在执行的任务终止 */ List<Runnable> shutdownNow(); // 如果已经被shutdown,返回true boolean isShutdown(); // 如果所有任务都已经被终止,返回true boolean isTerminated(); // 在一个shutdown请求后,阻塞的等待所有任务执行完毕 // 或者到达超时时间,或者当前线程被中断 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交一个有返回值的任务,并返回一个Future代表等待的任务执行的结果, 等到任务成功执行,Future#get()方法会返回任务执行的结果 <T> Future<T> submit(Callable<T> task); // 提交一个可以执行的任务,返回一个Future代表这个任务, 等到任务执行结束,Future#get()方法会返回这个给定的result <T> Future<T> submit(Runnable task, T result); // 提交一个可执行的任务,返回一个Future代表这个任务, 等到任务成功执行,Future#get()方法会返回null Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
public class ThreadPoolExecutor extends AbstractExecutorService { }
public interface ScheduledExecutorService extends ExecutorService { /** * 在给定延时后,创建并执行一个一次性的Runnable任务,任务执行完毕后,ScheduledFuture#get()方法会返回null */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * 在给定延时后,创建并执行一个ScheduledFutureTask,ScheduledFuture 可以获取结果或取消任务 */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期,也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执 行,接着在 initialDelay + 2 * period 后执行,依此类推 * 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止,如果任何执行的任务超过了周期,随后的执行会延时,不会并发执行 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟 * 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { }
public static ExecutorService newCachedThreadPool(); public static ExecutorService newFixedThreadPool(int nThreads); public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize); public static ExecutorService newSingleThreadExecutor();
(1)newCachedThreadPool: 创建一个可缓存工作线程的线程池,默认存活时间是60s,线程数量可达到Integer.MAX_VALUE,内部使用SynchronousQueue作为阻塞队列;
public interface ThreadFactory { Thread newThread(Runnable r); }
很显然这个工厂模式(或工厂方法模式)的应用。工厂方法模式:定义一个用于创建对象的接口(如ThreadFactory),然子类决定将哪一个类实例化(比如Executors.DefaultThreadFactory),让一个类的实例化延迟到其子类。
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) { if (command == null) // 不允许提交空任务 throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 如果wc < corePoolSize则直接创建新任务执行 if (addWorker(command, true)) // 如果提交成功则直接退出,失败的原因有:(1)线程池已经shutdown(2)wc < corePoolSize之后,由于并发导致wc>=corePoolSize return; c = ctl.get();// 凡是需要再次使用ctl做判断时,都会再次调用ctl.get()获取最新值 } if (isRunning(c) && workQueue.offer(command)) { // 如果corePoolSize < wc < maximumPoolSize && 运行中,则入队列;如果队列满,则可能入队失败 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 如果线程池不是running状态,应该拒绝添加新任务,出队列;双重检测,回滚入队列;删除队列元素失败的场景可能是:刚好有一个线程执行了该任务。 reject(command); else if (workerCountOf(recheck) == 0) // 如果线程池中无活动的线程,则增加一个线程,可能不断从队列中获取任务并执行。 addWorker(null, false); // 添加一个线程 } else if (!addWorker(command, false)) // 如果队列满了,则创建新线程运行任务, reject(command); // 如果失败,则说明线程池shutdown或者饱和了 }
具体流程图如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 外层循环,负责判断线程池状态 int c = ctl.get(); int rs = runStateOf(c); <span style="white-space:pre"> </span> // 线程池状态值的大小为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED // 有如下几种场景是可以直接拒绝增加新线程的:rs至少是SHUTDOWN状态,以下3个条件任意一个是false(1)rs==SHUTDOWN,false的情况是:线程池已经超过了SHUTDOWn状态,可能是STOP,TIDYING,TERMINATED其中之一,即线程已经终止了(2)firstTask==null,隐含rs==SHUTDOWN,false:firstTask!=null,场景是线程池已经shutdown,还要添加新的任务,拒绝(3)!workQueue.isEmpty(),隐含rs==SHUTDOWN && firstTask==null,false:wq为空,当firstTask为空是为了创建一个没有任务的线程,从wq中获取任务,如果wq为空,就没有添加新线程的必要了。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 内层循环,负责workerCount + 1 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 如果wc > 线程池最大上限CAPACITY,或者wc > corePoolSize(maximumPoolSize) return false; if (compareAndIncrementWorkerCount(c)) // CAS操作,使得wc + 1,成功则跳出retry循环,标示累加成功 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 如果之前的CAS操作失败,需要从新获取线程池状态,如果与之前不一致,跳出内循环,继续去外层循环判断 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; // 线程是否启动成功 boolean workerAdded = false; // 线程是否增加成功 Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果线程池在运行,或者线程池已经shutdown且firstTask==null(可能是wq中有未执行完成的任务,攒国家没有初始化的worker去执行队列中的任务) if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 如果添加成功,则启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 如果启动失败 addWorkerFailed(w); } return workerStarted; }
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 此处将state状态设置为0,允许中断 boolean completedAbruptly = true; // true:异常退出,false:正常退出 try { while (task != null || (task = getTask()) != null) { // 如果firstTask == null,则从队列中获取。 w.lock();// 不是为了防止并发,而是为了在shutdown状态下,不能终止正在运行的worker(worker是不可重入锁) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 任务执行 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 完成任务 + 1 w.unlock(); // 解锁 } } completedAbruptly = false;// 正常执行 } finally { processWorkerExit(w, completedAbruptly); // 处理worker退出 } }
private Runnable getTask() { boolean timedOut = false; // 调用最新的poll()是否超时? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // (1)如果线程池状态为>=stop(2)线程池状态为SHUTDOWN并且队列为空 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 默认allowCoreThreadTimeOut=false,所以当workerCount > corePoolSize时 // 这个分支包括4种情况:(1)wc > maximumPoolSize && wc > 1 (2) wc > maximumPoolSize && workQueue.isEmpty() // (3)timed && timedOut && wc > 1 (4) timed && timedOut && workQueue.isEmpty() if ((wc > maximumPoolSize || (timed && timedOut)) // wc > && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果workerCount > corePoolSize,则超时获取任务,否则阻塞获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 如果r == null肯定是超时获取的任务 return r; timedOut = true; // 只要超时获取那一直都是这个状态 } catch (InterruptedException retry) { timedOut = false; // 如果在阻塞过程中,被interrrupt,重置timedOut为false } } }
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果true说明worker是异常退出,如果是false标示worker是正常退出并且worker没有可执行的task,不用-1,因为在getTask里面已经-1 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 统计 completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate();// 在对线程池有负效益的操作时,都需要尝试终止线程池。 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 如果状态是running和shutdown if (!completedAbruptly) { // 如果线程是异常退出,则直接addWorker, int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // allowCoreThreadTimeOut默认是false,即min默认为corePoolSize if (min == 0 && ! workQueue.isEmpty()) // allowCoreThreadTimeOut=false标示核心线程即使空闲也要保持存活,如果=true,则标示使用keepAliveTime标示存活时间 min = 1; // 如果队列不空,那么至少也要保持一个线程存在 if (workerCountOf(c) >= min) // 如果线程池中线程数量>=min,则直接返回,否则需要addWorker return; // replacement not needed } addWorker(null, false); } }
// 终止线程池,之前提交的任务会执行完,但是新提交的任务会拒绝,重复调用无影响,该方法不会等待线程池真正的终止。 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN);// 设置线程状态,该方法只允许设置stop和shutdown,tidying和terminated状态在tryTerminate()方法中设置,内部是利用CAS+循环设置线程状态 interruptIdleWorkers(); // 中断所有的空闲worker onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); // 尝试终止线程池 }
/ onlyOne如果为true,最多interrupt一个worker;空闲线程:等待任务的线程,即中断在等待任务的线程(没有上锁),中断唤醒继续循环,会判断线程池状态退出获取task private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { // 运行中的worker,是占用锁的,而worker是非重入锁,即运行中的worker不能中断。 try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
// 以下场景线程池变为terminated,(1)shutdown && wc == 0 && wq.isEmpty()(2)stop状态 // 这个方法必须在任何可能导致线程池终止的情况下被调用,如减少worker数量等。 final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 3种情况不需要中断线程池(1)running(2)状态是tidying或terminated(3)SHUTDOWN并且队列不空 return; if (workerCountOf(c) != 0) { // 只有SHUTDOWN并且队列空或者stop状态能到这里 interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的空闲worker return; } // 如果SHUTDOWN队列空,运行的worker也没有了,则可以terminated了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // tidying状态 try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0));// terminated状态 termination.signalAll();//真正的终止了线程池,唤醒因为调用了awaitTermination()方法而阻塞的线程。 } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
// 粗鲁的终止线程池,并返回等待执行的任务列表 public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); // 中断所有线程,包括运行的。 tasks = drainQueue(); // 返回等待处理的任务 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
// 中断所有worker private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // state是否大于零,即worker是否已经开始运行并且还未interrupt try { t.interrupt(); } catch (SecurityException ignore) { } } }
// 等待线程池终止 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; // 终止返回true if (nanos <= 0) // 超时返回false return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
exec.shutdown(); try{ while(!exec.awaitTermination(500, TimeUnit.MILLISECONDS)) { LOGGER.debug("Waiting for terminate"); } } catch (InterruptedException e) { //中断处理 }
标签:大于 rate 业务 无效 持久化存储 指定 可重入锁 ons 延迟
原文地址:http://www.cnblogs.com/lujiango/p/7581119.html