标签:分析 比较 fixed back 存在 新建 next adf 活着
线程池源码也是面试经常被提问到的点,我会将全局源码做一分析,然后告诉你面试考啥,怎么答。
简洁的答两点就行。
降低系统资源消耗。
提高线程可控性。
1.创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
2.(JDK8新增)会根据所需的并发数来动态创建和关闭线程。能够合理的使用CPU进行对任务进行并发操作,所以适合使用在很耗时的任务。
注意返回的是ForkJoinPool对象。
1 public static ExecutorService newWorkStealingPool(int parallelism) { 2 return new ForkJoinPool 3 (parallelism, 4 ForkJoinPool.defaultForkJoinWorkerThreadFactory, 5 null, true); 6 }
什么是ForkJoinPool:
1 public ForkJoinPool(int parallelism, 2 ForkJoinWorkerThreadFactory factory, 3 UncaughtExceptionHandler handler, 4 boolean asyncMode) { 5 this(checkParallelism(parallelism), 6 checkFactory(factory), 7 handler, 8 asyncMode ? FIFO_QUEUE : LIFO_QUEUE, 9 "ForkJoinPool-" + nextPoolId() + "-worker-"); 10 checkPermission(); 11 }
使用一个无限队列来保存需要执行的任务,可以传入线程的数量;不传入,则默认使用当前计算机中可用的cpu数量;使用分治法来解决问题,使用fork()和join()来进行调用。
3.创建一个可缓存的线程池,可灵活回收空闲线程,若无可回收,则新建线程。
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
4.创建一个单线程的线程池。
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
5.创建一个定长线程池,支持定时及周期性任务执行。
1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 2 return new ScheduledThreadPoolExecutor(corePoolSize); 3 }
Executor结构:
一个运行新任务的简单接口
扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法
对ExecutorService接口的抽象类实现。不是我们分析的重点。
Java线程池的核心实现。
1 // AtomicInteger是原子类 ctlOf()返回值为RUNNING; 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 3 // 高3位表示线程状态 4 private static final int COUNT_BITS = Integer.SIZE - 3; 5 // 低29位表示workerCount容量 6 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 7 8 // runState is stored in the high-order bits 9 // 能接收任务且能处理阻塞队列中的任务 10 private static final int RUNNING = -1 << COUNT_BITS; 11 // 不能接收新任务,但可以处理队列中的任务。 12 private static final int SHUTDOWN = 0 << COUNT_BITS; 13 // 不接收新任务,不处理队列任务。 14 private static final int STOP = 1 << COUNT_BITS; 15 // 所有任务都终止 16 private static final int TIDYING = 2 << COUNT_BITS; 17 // 什么都不做 18 private static final int TERMINATED = 3 << COUNT_BITS; 19 20 // 存放任务的阻塞队列 21 private final BlockingQueue<Runnable> workQueue;
值的注意的是状态值越大线程越不活跃。
1 public ThreadPoolExecutor(int corePoolSize,//线程池初始启动时线程的数量 2 int maximumPoolSize,//最大线程数量 3 long keepAliveTime,//空闲线程多久关闭? 4 TimeUnit unit,// 计时单位 5 BlockingQueue<Runnable> workQueue,//放任务的阻塞队列 6 ThreadFactory threadFactory,//线程工厂 7 RejectedExecutionHandler handler// 拒绝策略) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.acc = System.getSecurityManager() == null ? 16 null : 17 AccessController.getContext(); 18 this.corePoolSize = corePoolSize; 19 this.maximumPoolSize = maximumPoolSize; 20 this.workQueue = workQueue; 21 this.keepAliveTime = unit.toNanos(keepAliveTime); 22 this.threadFactory = threadFactory; 23 this.handler = handler; 24 }
在向线程池提交任务时,会通过两个方法:execute和submit。
本文着重讲解execute方法。submit方法放在下次和Future、Callable一起分析。
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 // clt记录着runState和workerCount 5 int c = ctl.get(); 6 //workerCountOf方法取出低29位的值,表示当前活动的线程数 7 //然后拿线程数和 核心线程数做比较 8 if (workerCountOf(c) < corePoolSize) { 9 // 如果活动线程数<核心线程数 10 // 添加到 11 //addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断 12 if (addWorker(command, true)) 13 // 如果成功则返回 14 return; 15 // 如果失败则重新获取 runState和 workerCount 16 c = ctl.get(); 17 } 18 // 如果当前线程池是运行状态并且任务添加到队列成功 19 if (isRunning(c) && workQueue.offer(command)) { 20 // 重新获取 runState和 workerCount 21 int recheck = ctl.get(); 22 // 如果不是运行状态并且 23 if (! isRunning(recheck) && remove(command)) 24 reject(command); 25 else if (workerCountOf(recheck) == 0) 26 //第一个参数为null,表示在线程池中创建一个线程,但不去启动 27 // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize 28 addWorker(null, false); 29 } 30 //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize 31 else if (!addWorker(command, false)) 32 //如果失败则拒绝该任务 33 reject(command); 34 }
总结一下它的工作流程:
当workerCount < corePoolSize
,创建线程执行任务。
当workerCount >= corePoolSize
&&阻塞队列workQueue
未满,把新的任务放入阻塞队列。
当workQueue
已满,并且workerCount >= corePoolSize
,并且workerCount < maximumPoolSize
,创建线程执行任务。
当workQueue已满,workerCount >= maximumPoolSize
,采取拒绝策略,默认拒绝策略是直接抛异常。
通过上面的execute方法可以看到,最主要的逻辑还是在addWorker方法中实现的,那我们就看下这个方法:
主要工作是在线程池中创建一个新的线程并执行
参数定义:
firstTask
the task the new thread should run first (or null if none). (指定新增线程执行的第一个任务或者不执行任务)
core
if true use corePoolSize as bound, else maximumPoolSize.(core如果为true则使用corePoolSize绑定,否则为maximumPoolSize。 (此处使用布尔指示符而不是值,以确保在检查其他状态后读取新值)。)
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 5 int c = ctl.get(); 6 // 获取运行状态 7 int rs = runStateOf(c); 8 9 // Check if queue empty only if necessary. 10 // 如果状态值 >= SHUTDOWN (不接新任务&不处理队列任务) 11 // 并且 如果 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空) 12 if (rs >= SHUTDOWN && 13 ! (rs == SHUTDOWN && 14 firstTask == null && 15 ! workQueue.isEmpty())) 16 // 返回false 17 return false; 18 19 for (;;) { 20 //获取线程数wc 21 int wc = workerCountOf(c); 22 // 如果wc大与容量 || core如果为true表示根据corePoolSize来比较,否则为maximumPoolSize 23 if (wc >= CAPACITY || 24 wc >= (core ? corePoolSize : maximumPoolSize)) 25 return false; 26 // 增加workerCount(原子操作) 27 if (compareAndIncrementWorkerCount(c)) 28 // 如果增加成功,则跳出 29 break retry; 30 // wc增加失败,则再次获取runState 31 c = ctl.get(); // Re-read ctl 32 // 如果当前的运行状态不等于rs,说明状态已被改变,返回重新执行 33 if (runStateOf(c) != rs) 34 continue retry; 35 // else CAS failed due to workerCount change; retry inner loop 36 } 37 } 38 39 boolean workerStarted = false; 40 boolean workerAdded = false; 41 Worker w = null; 42 try { 43 // 根据firstTask来创建Worker对象 44 w = new Worker(firstTask); 45 // 根据worker创建一个线程 46 final Thread t = w.thread; 47 if (t != null) { 48 // new一个锁 49 final ReentrantLock mainLock = this.mainLock; 50 // 加锁 51 mainLock.lock(); 52 try { 53 // Recheck while holding lock. 54 // Back out on ThreadFactory failure or if 55 // shut down before lock acquired. 56 // 获取runState 57 int rs = runStateOf(ctl.get()); 58 // 如果rs小于SHUTDOWN(处于运行)或者(rs=SHUTDOWN && firstTask == null) 59 // firstTask == null证明只新建线程而不执行任务 60 if (rs < SHUTDOWN || 61 (rs == SHUTDOWN && firstTask == null)) { 62 // 如果t活着就抛异常 63 if (t.isAlive()) // precheck that t is startable 64 throw new IllegalThreadStateException(); 65 // 否则加入worker(HashSet) 66 //workers包含池中的所有工作线程。仅在持有mainLock时访问。 67 workers.add(w); 68 // 获取工作线程数量 69 int s = workers.size(); 70 //largestPoolSize记录着线程池中出现过的最大线程数量 71 if (s > largestPoolSize) 72 // 如果 s比它还要大,则将s赋值给它 73 largestPoolSize = s; 74 // worker的添加工作状态改为true 75 workerAdded = true; 76 } 77 } finally { 78 mainLock.unlock(); 79 } 80 // 如果worker的添加工作完成 81 if (workerAdded) { 82 // 启动线程 83 t.start(); 84 // 修改线程启动状态 85 workerStarted = true; 86 } 87 } 88 } finally { 89 if (! workerStarted) 90 addWorkerFailed(w); 91 } 92 // 返回线启动状态 93 return workerStarted;
因为workers是HashSet类型的,不能保证线程安全。
那w = new Worker(firstTask);
如何理解呢
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable
可以看到它继承了AQS并发框架还实现了Runnable。证明它还是一个线程任务类。那我们调用t.start()事实上就是调用了该类重写的run方法。
tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。
线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。
1 public void run() { 2 runWorker(this); 3 }
run方法又调用了runWorker方法:
1 final void runWorker(Worker w) { 2 // 拿到当前线程 3 Thread wt = Thread.currentThread(); 4 // 拿到当前任务 5 Runnable task = w.firstTask; 6 // 将Worker.firstTask置空 并且释放锁 7 w.firstTask = null; 8 w.unlock(); // allow interrupts 9 boolean completedAbruptly = true; 10 try { 11 // 如果task或者getTask不为空,则一直循环 12 while (task != null || (task = getTask()) != null) { 13 // 加锁 14 w.lock(); 15 // If pool is stopping, ensure thread is interrupted; 16 // if not, ensure thread is not interrupted. This 17 // requires a recheck in second case to deal with 18 // shutdownNow race while clearing interrupt 19 // return ctl.get() >= stop 20 // 如果线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断 21 // 其实就是保证两点: 22 // 1. 线程池没有停止 23 // 2. 保证线程没有中断 24 if ((runStateAtLeast(ctl.get(), STOP) || 25 (Thread.interrupted() && 26 runStateAtLeast(ctl.get(), STOP))) && 27 !wt.isInterrupted()) 28 // 中断当前线程 29 wt.interrupt(); 30 try { 31 // 空方法 32 beforeExecute(wt, task); 33 Throwable thrown = null; 34 try { 35 // 执行run方法(Runable对象) 36 task.run(); 37 } catch (RuntimeException x) { 38 thrown = x; throw x; 39 } catch (Error x) { 40 thrown = x; throw x; 41 } catch (Throwable x) { 42 thrown = x; throw new Error(x); 43 } finally { 44 afterExecute(task, thrown); 45 } 46 } finally { 47 // 执行完后, 将task置空, 完成任务++, 释放锁 48 task = null; 49 w.completedTasks++; 50 w.unlock(); 51 } 52 } 53 completedAbruptly = false; 54 } finally { 55 // 退出工作 56 processWorkerExit(w, completedAbruptly); 57 }
总结一下runWorker方法的执行过程:
while循环中,不断地通过getTask()方法从workerQueue中获取任务
如果线程池正在停止,则中断线程。否则调用3.
调用task.run()执行任务;
如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);
这个流程图非常经典:
除此之外,ThreadPoolExector
还提供了tryAcquire
、tryRelease
、shutdown
、shutdownNow
、tryTerminate
、等涉及的一系列线程状态更改的方法有兴趣可以自己研究。大体思路是一样的,这里不做介绍。
tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。
线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。
shutdown方法与getTask方法存在竞态条件.(这里不做深入,建议自己深入研究,对它比较熟悉的面试官一般会问)
创建线程池的五个方法。
线程池的五个状态
execute执行过程。
runWorker执行过程。(把两个流程图记下,理解后说个大该就行。)
比较深入的问题就是我在文中插入的问题。
…期望大家能在评论区补充。
声明:图片来源于网络,侵删。
标签:分析 比较 fixed back 存在 新建 next adf 活着
原文地址:https://www.cnblogs.com/javazhiyin/p/10605511.html