标签:normal command protect 入参 工具 node 符号位 adl loading
package com.lf.threaddemo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test implements Runnable { @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } static ExecutorService service = Executors.newFixedThreadPool(3); public static void main(String[] args) { for (int i = 0; i < 100; i++) { service.execute(new Test()); } service.shutdown(); } }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public ThreadPoolExecutor(int corePoolSize, //核心线程数量 int maximumPoolSize, //最大线程数 long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间 TimeUnit unit, //存活时间单位 BlockingQueue<Runnable> workQueue, //保存执行任务的队列 ThreadFactory threadFactory,//创建新线程使用的工厂 RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//1.当前池中线程比核心数少,新建一个线程执行任务 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//2.核心池已满,但任务队列未满,添加到队列中 int recheck = ctl.get(); //任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了 if (! isRunning(recheck) && remove(command)) reject(command);//如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务 else if (workerCountOf(recheck) == 0)//如果之前的线程已被销毁完,新建一个线程 addWorker(null, false); } else if (!addWorker(command, false)) //3.核心池已满,队列已满,试着创建一个新线程 reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务 }
-1 的二进制计算方法 原码是 1000…001 . 高位 1 表示符号位。 然后对原码取反,高位不变得到 1111…110 然后对反码进行+1 ,也就是补码操作, 最后得到 1111…1111
retry: //goto 语句,避免死循环 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. 如果线程处于非运行状态,并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且 workQueue 为空,直接返回 false(表示不可添加 work 状态) 1. 线程池已经 shutdown 后,还要添加新的任务,拒绝 2. (第二个判断)SHUTDOWN 状态不接受新任务,但仍然会执行已经加入任务队列的任 务,所以当进入 SHUTDOWN 状态,而传进来的任务为空,并且任务队列不为空的时候,是允许添加 新线程的,如果把这个条件取反,就表示不允许添加 worker if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //自旋 int wc = workerCountOf(c);//获得 Worker 工作线程数 //如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))//通过 cas 来增加工作线程数,如果 cas 失败,则直接重试 break retry; c = ctl.get(); // Re-read ctl //再次获取 ctl 的值 if (runStateOf(c) != rs) //这里如果不想等,说明线程的状态发生了变化,继续重试 continue retry; // else CAS failed due to workerCount change; retry inner loop } } //上面这段代码主要是对 worker 数量做原子+1 操作,下面的逻辑才是正式构建一个 worker boolean workerStarted = false; //工作线程是否启动的标识 boolean workerAdded = false; //工作线程是否已经添加成功的标识 Worker w = null; try { w = new Worker(firstTask); //构建一个 Worker,这个 worker 是什么呢?我们可以看到构造方法里面传入了一个 Runnable 对象 final Thread t = w.thread; //从 worker 对象中取出线程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //这里有个重入锁,避免并发问题 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); //只有当前线程池是正在运行状态,[或是 SHUTDOWN 且 firstTask 为空],才能添加到 workers 集合中 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //任务刚封装到 work 里面,还没 start,你封装的线程就是 alive,几个意思?肯定是要抛异常出去的 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //将新创建的 Worker 添加到 workers 集合中 int s = workers.size(); //如果集合中的工作线程数大于最大线程数,这个最大线程数表示线程池曾经出现过的最大线程数 if (s > largestPoolSize) largestPoolSize = s; //更新线程池出现过的最大线程数 workerAdded = true;//表示工作线程创建成功了 } } finally { mainLock.unlock(); //释放锁 } if (workerAdded) {//如果 worker 添加成功 t.start();//启动线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); //如果添加失败,就需要做一件事,就是递减实际工作线程数(还记得我们最开始的时候增加了工作线程数吗) } return workerStarted;//返回结果 }
5. private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; //注意了,这才是真正执行 task 的线程,从构造函数可知是由ThreadFactury 创建的 /** Initial task to run. Possibly null. */ Runnable firstTask; //这就是需要执行的 task /** Per-thread task counter */ volatile long completedTasks; //完成的任务数,用于线程池统计 Worker(Runnable firstTask) { setState(-1); //初始状态 -1,防止在调用 runWorker(),也就是真正执行 task前中断 thread。 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用 //Worker 类的 tryRelease()方法,将 state 置为 0, 而 interruptIfStarted()中只有 state>=0 才允许调用中断 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //注意这个 while 循环,在这里实现了 [线程复用] // 如果 task 为空,则通过getTask 来获取任务 while (task != null || (task = getTask()) != null) { w.lock(); //上锁,不是为了防止并发执行任务,为了在 shutdown()时不终止正在运行的 worker线程池为 stop 状态时不接受新任务,不执行已经加入任务队列的任务,还中断正在执行的任务 //所以对于 stop 状态以上是要中断线程的 //(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)确保线程中断标志位为 true 且是 stop 状态以上,接着清除了中断标志 //!wt.isInterrupted()则再一次检查保证线程需要设置中断标志位 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//这里默认是没有实现的,在一些特定的场景中我们可以自己继承 ThreadpoolExecutor 自己重写 Throwable thrown = null; try { task.run(); //执行任务中的 run 方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //这里默认默认而也是没有实现 } } finally { //置空任务(这样下次循环开始时,task 依然为 null,需要再通过 getTask()取) + 记录该 Worker 完成任务数量 + 解锁 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //1.将入参 worker 从数组 workers 里删除掉; //2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组workers } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) {//自旋 int c = ctl.get(); int rs = runStateOf(c); // * 对线程池状态的判断,两种情况会 workerCount-1,并且返回 null //1. 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的) //2. 线程池状态为 stop(shutdownNow()会导致变成 STOP)(此时不用考虑 workQueue的情况) // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null;//返回 null,则当前 worker 线程会退出 } int wc = workerCountOf(c); // timed 变量用于判断是否需要进行超时控制。 // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时; // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; // 对于超过核心线程数量的这些线程,需要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //1. 线程数量超过 maximumPoolSize 可能是线程池在运行时被调用了 setMaximumPoolSize()被改变了大小,否则已经 addWorker()成功不会超过 maximumPoolSize //2. timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时.其实就是体现了空闲线程的存活时间 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在 //keepaliveTime 时间内没有获取到任务,则返回 null. //否则通过 take 方法阻塞式获取队列中的任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null)//如果拿到的任务不为空,则直接返回给 worker 进行处理 return r; timedOut = true;//如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收 } catch (InterruptedException retry) { timedOut = false;// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试 } }}
if (isRunning(c) && workQueue.offer(command)) {//2.核心池已满,但任务队列未满,添加到队列中 int recheck = ctl.get(); //任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了 if (! isRunning(recheck) && remove(command)) reject(command);//如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务 else if (workerCountOf(recheck) == 0)//如果之前的线程已被销毁完,新建一个线程 addWorker(null, false); } else if (!addWorker(command, false)) //3.核心池已满,队列已满,试着创建一个新线程 reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
package com.lf.threaddemo; import java.util.Date; import java.util.concurrent.*; public class Demo1 extends ThreadPoolExecutor { // 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间 private ConcurrentHashMap<String, Date> startTimes; public Demo1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.startTimes = new ConcurrentHashMap<>(); } @Override public void shutdown() { System.out.println("已经执行的任务数: " + this.getCompletedTaskCount() + "," + "当前活动线程数:" + this.getActiveCount() + ",当前排队线程 数:" + this.getQueue().size()); System.out.println(); super.shutdown(); } //任务开始之前记录任务开始时间 @Override protected void beforeExecute(Thread t, Runnable r) { startTimes.put(String.valueOf(r.hashCode()), new Date()); super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { Date startDate = startTimes.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、 // 已完成任务数量、任务总数、队列里缓存的任务数量、 // 池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止 System.out.print("任务耗时:" + diff + "\n"); System.out.print("初始线程数:" + this.getPoolSize() + "\n"); System.out.print("核心线程数:" + this.getCorePoolSize() + "\n"); System.out.print("正在执行的任务数量:" + this.getActiveCount() + "\n"); System.out.print("已经执行的任务 数:" + this.getCompletedTaskCount() + "\n"); System.out.print("任务总数:" + this.getTaskCount() + "\n"); System.out.print("最大允许的线程数:" + this.getMaximumPoolSize() + "\n"); System.out.print("线程空闲时 间:" + this.getKeepAliveTime(TimeUnit.MILLISECONDS) + "\n"); System.out.println(); super.afterExecute(r, t); } public static ExecutorService newCachedThreadPool() { return new Demo1(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } }
测试
package com.lf.threaddemo; import java.util.concurrent.ExecutorService; public class ThreadPoolTest implements Runnable { private static ExecutorService es = Demo1.newCachedThreadPool(); @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { for (int i = 0; i < 100; i++) { es.execute(new ThreadPoolTest()); } es.shutdown(); } }
package com.lf.threaddemo; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class CallableDemo implements Callable<String> { @Override public String call() throws Exception { //Thread.sleep(3000);//阻塞案例演示 return "hello world"; } public static void main(String[] args) throws ExecutionException, InterruptedException { CallableDemo callableDemo = new CallableDemo(); FutureTask futureTask = new FutureTask(callableDemo); new Thread(futureTask).start(); System.out.println(futureTask.get()); } }
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); // 当前的 Future 是否被取消,返回 true 表示已取消 boolean isCancelled(); // 当前 Future 是否已结束。包括运行完成、抛出异常以及取消,都表示当前 Future 已结束 boolean isDone(); // 获取 Future 的结果值。如果当前 Future 还没有结束,那么当前线程就等待, // 直到 Future 运行结束,那么会唤醒等待结果值的线程的。 V get() throws InterruptedException, ExecutionException; // 获取 Future 的结果值。与 get()相比较多了允许设置超时时间 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
state 的含义 表示 FutureTask 当前的状态,分为七种状态 private static final int NEW = 0; // NEW 新建状态,表示这个 FutureTask 还没有开始运行 // COMPLETING 完成状态, 表示 FutureTask 任务已经计算完毕了 // 但是还有一些后续操作,例如唤醒等待线程操作,还没有完成。 private static final int COMPLETING = 1; // FutureTask 任务完结,正常完成,没有发生异常 private static final int NORMAL = 2; // FutureTask 任务完结,因为发生异常。 private static final int EXCEPTIONAL = 3; // FutureTask 任务完结,因为取消任务 private static final int CANCELLED = 4; // FutureTask 任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求 private static final int INTERRUPTING = 5; // FutureTask 任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求 private static final int INTERRUPTED = 6;
run 方法
public void run() { // 如果状态 state 不是 NEW,或者设置 runner 值失败 // 表示有别的线程在此之前调用 run 方法,并成功设置了 runner 值 // 保证了只有一个线程可以运行 try 代码块中的代码。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) {/ 只有 c 不为 null 且状态 state 为 NEW 的情况 V result; boolean ran; try { result = c.call(); //调用 callable 的 call 方法,并获得返回结果 ran = true;//运行成功 } catch (Throwable ex) { result = null; ran = false; setException(ex); //设置异常结果, } if (ran) set(result);//设置结果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 节点是否已添加 for (;;) { // 如果当前线程中断标志位是 true, // 那么从列表中移除节点 q,并抛出 InterruptedException 异 常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 当状态大于 COMPLETING 时,表示 FutureTask 任务已结束。 if (q != null) q.thread = null; // 将节点 q 线程设置为 null,因为线程没有阻塞等待 return s; }// 表示还有一些后序操作没有完成,那么当前线程让出执行权 else if (s == COMPLETING) // cannot time out yet Thread.yield(); //表示状态是 NEW,那么就需要将当前线程阻塞等待。 // 就是将它插入等待线程链表中, else if (q == null) q = new WaitNode(); else if (!queued) // 使用 CAS 函数将新节点添加到链表中,如果添加失败,那么 queued 为 false, // 下次循环时,会继续添加,知道成功。 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) {// timed 为 true 表示需要设置超时 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); // 让当前线程等待 nanos 时间 } else LockSupport.park(this); } }
被阻塞的线程,会等到 run 方法执行结束之后被唤醒
private V report(int s) throws ExecutionException { Object x = outcome;//表示 call 的返回值 if (s == NORMAL) // 表示正常完结状态,所以返回结果值 return (V)x; // 大于或等于 CANCELLED,都表示手动取消 FutureTask 任务, // 所以抛出 CancellationException 异常 if (s >= CANCELLED) throw new CancellationException(); // 否则就是运行过程中,发生了异常,这里就抛出这个异常 throw new ExecutionException((Throwable)x); }
public class CallableDemo implements Callable<String> { @Override public String call() throws Exception { //Thread.sleep(3000);//阻塞案例演示 return "hello world"; } public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es=Executors.newFixedThreadPool(1); CallableDemo callableDemo=new CallableDemo(); Future future=es.submit(callableDemo); System.out.println(future.get()); } }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
标签:normal command protect 入参 工具 node 符号位 adl loading
原文地址:https://www.cnblogs.com/flgb/p/13062798.html