标签:and one man final this schedule inter 内容 exception
线程池简单来说就是一组线程的集合,通过线程池可以达到线程复用的目的,从而避免频繁创建线程和销毁过程中的开销。在应用上,线程可用在后端相关服务上。最常见的比如说数据库服务器,web服务器上。例如web服务器,可能会接收到很多很多断短时的http请求,如果我们为每一个http请求创建一个处理线程,那么势必会消耗大量服务器资源,严重时可能将服务器资源耗尽。当然我们也可以管理并复用自己已创建的线程,来限制资源的消耗,但是那势必会使业务逻辑变的复杂。JDK已经给我们提供了丰富的线程池工具类。使用这些工具类可以给我们带来极大的方便。简单了解其原理,对一个开发来说还是很有必要的。
如上图:最顶层是Executor接口,这个接口中只声明了一个execute方法。ExecutorService在其父类接口的基础上,又声明了submit、shutdown、invokerAll、invokeAny等方法。至于ScheduledExecutorService接口则声明了一些和定时任务相关的方法。比如schedule和scheduleAtFixedRate。线程池的核心实现类是在ThreadPoolExecutor,我们通过Executors调用newFixedThreadPool、newSingleThreadExector和newCachedThreadPool等创建的线程均是ThreadPoolExecutor类型。
线程池的核心实现类是ThreadPoolExecutor。这个类包含了几个重要属性,可以通过构造器设置参数值,构造方法如下:
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler)
构造方法的参数即线程池的核心参数,下面是每个参数的具体含义:
参数 | 说明 |
corePoolSize
|
核心线程数,当线程池中当线程数小于该值时会优先创建新线程处理新任务 |
maximumPoolSize
|
线程池所能创建当最大线程数 |
keepAliveTime
|
空闲线程的存活时间 |
workQueue |
任务队列,用于存放未执行的任务 |
threadFactory
|
线程工厂,可以设置比如线程名称等内容 |
handler |
拒绝策略。当线程池和任务队列满时,采用拒绝策略处理新来的任务,默认时AbortPolicy,即直接抛出异常 |
线程创建的数量主要和corePoolSize、maximumPoolSize两个参数有关,而线程的创建时机主要和corePoolSize、workQueue两个参数有关。下面列举4个线程的创建规则(线程池中无空闲线程)
序号 | 条件 | 动作 |
1 | 线程数量<corePoolSize | 创建线程 |
2 | 线程数量≥corePoolSize,且workQueue未满 | 任务放入队列,等待被执行 |
3 | 线程数量≥corePoolSize,且workQueue已满,线程数量<maximumPoolSize | 创建新线程,执行任务 |
4 | 线程数量≥maximumPoolSize,且workQueue已满 | 执行拒绝策略 |
因为系统的资源是有限的,所以需要适时的对空闲线程资源进行回收。这里的空闲线程包括两个部分,一部分是超出corePoolSize的空闲线程,一部分是corePoolSize内的核心线程,回收的时间由keepLiveTime确定。此外回收线程的前提是allowCoreThreadTimeOut属性被设置为true, public void allowCoreThreadTimeOut(boolean)
方法可以设置属性值。
JDK提供了同步队列、有界队列、无界队列、优先级队列来缓存任务,四种队列具体的实现类如下:
实现类 | 类型 | 说明 |
ArrayBlockingQueue | 有界队列 | 是基于数组的阻塞队列,按照FIFO原则对元素进行排序 |
SynchronousQueue | 同步队列 | 该队列比较特殊,它不存储元素,每次插入操作必须等待另一线程调用移除操作,否则插入操作就会一直阻塞 |
LinkedBlockingQueue | 无界队列 | 基于链表的阻塞队列,按照FIFO原则对元素进行排序 |
PriorityBlockingQueue | 优先级队列 | 具有优先级的阻塞队列 |
实现类 | 说明 |
AbortPolicy | 丢弃新任务,并抛出RejectExecutionException |
DiscardPolicy | 直接丢弃任务,不做任何操作 |
DiscardOldestPolicy | 丢弃队列队首的任务,执行新任务 |
CallerRunsPolicy | 由调用者执行新任务 |
线程池默认执行AbortPolicy策略,我们可以通过方法public void setRejectedExecutionHandler(RejectedExecutionHandler)
修改线程池决绝策略。
在线程池的实现上,线程的创建是通过线程工厂接口ThreadFactory接口的实现类来完成的。默认使用的是Exectors.defaultThreadFactory()方法返回的线程工厂实现类。 同上我们也可以调用方法setThreadFactory(ThreadFactory)方法来设置。
线程的复用是线程池的关键所在,这要求一个线程在执行完任务之后不能立即退出。对应到具体的实现上,就是一个线程执行完任务之后,会去任务队列获取新的任务,如果任务队列中没有任务,且线程池没有设置keepLiveTime,则这个线程就会一直阻塞下去,以此来达到线程复用的目的。
下面是线程创建和复用的的部分代码:
1 +----ThreadPoolExecutor.Worker.java 2 Worker(Runnable firstTask) { 3 setState(-1); 4 this.firstTask = firstTask; 5 // 调用线程工厂创建线程 6 this.thread = getThreadFactory().newThread(this); 7 } 8 9 // Worker 实现了 Runnable 接口 10 public void run() { 11 runWorker(this); 12 } 13 14 +----ThreadPoolExecutor.java 15 final void runWorker(Worker w) { 16 Thread wt = Thread.currentThread(); 17 Runnable task = w.firstTask; 18 w.firstTask = null; 19 w.unlock(); 20 boolean completedAbruptly = true; 21 try { 22 // 循环从任务队列中获取新任务 23 while (task != null || (task = getTask()) != null) { 24 w.lock(); 25 // If pool is stopping, ensure thread is interrupted; 26 // if not, ensure thread is not interrupted. This 27 // requires a recheck in second case to deal with 28 // shutdownNow race while clearing interrupt 29 if ((runStateAtLeast(ctl.get(), STOP) || 30 (Thread.interrupted() && 31 runStateAtLeast(ctl.get(), STOP))) && 32 !wt.isInterrupted()) 33 wt.interrupt(); 34 try { 35 beforeExecute(wt, task); 36 Throwable thrown = null; 37 try { 38 // 执行新任务 39 task.run(); 40 } catch (RuntimeException x) { 41 thrown = x; throw x; 42 } catch (Error x) { 43 thrown = x; throw x; 44 } catch (Throwable x) { 45 thrown = x; throw new Error(x); 46 } finally { 47 afterExecute(task, thrown); 48 } 49 } finally { 50 task = null; 51 w.completedTasks++; 52 w.unlock(); 53 } 54 } 55 completedAbruptly = false; 56 } finally { 57 // 线程退出后,进行后续处理 58 processWorkerExit(w, completedAbruptly); 59 } 60 }
通常情况下我们可以使用submit()方法来提交任务。提交的任务可能立即被执行,也可能被放入任务队列,也可能会被拒绝执行,处理的过程如下:
从上图可以看出,当一个新的任务被提交时,线程池的处理步骤如下:
1、判断当前核心线程是否已被全部使用,如果没有,则创建一个线程执行任务。否则,执行步骤2。
2、判断任务队列是否已满,如果没有,将任务放入队列缓存任务。否则,执行步骤3。
3、判断当前线程数是否超过最大线程数,如果没有,创建线程执行任务。否则,执行步骤4。
4、执行拒绝策略。
代码实现如下:
1 +---- AbstractExecutorService.java 2 public Future<?> submit(Runnable task) { 3 if (task == null) throw new NullPointerException(); 4 // 创建任务 5 RunnableFuture<Void> ftask = newTaskFor(task, null); 6 // 提交任务 7 execute(ftask); 8 return ftask; 9 } 10 11 +---- ThreadPoolExecutor.java 12 public void execute(Runnable command) { 13 if (command == null) 14 throw new NullPointerException(); 15 16 int c = ctl.get(); 17 // 如果工作线程数量 < 核心线程数,则创建新线程 18 if (workerCountOf(c) < corePoolSize) { 19 // 添加工作者对象 20 if (addWorker(command, true)) 21 return; 22 c = ctl.get(); 23 } 24 25 // 缓存任务,如果队列已满,则 offer 方法返回 false。否则,offer 返回 true 26 if (isRunning(c) && workQueue.offer(command)) { 27 int recheck = ctl.get(); 28 if (! isRunning(recheck) && remove(command)) 29 reject(command); 30 else if (workerCountOf(recheck) == 0) 31 addWorker(null, false); 32 }
33
// 添加工作者对象,并在 addWorker 方法中检测线程数是否小于最大线程数 35 else if (!addWorker(command, false)) 36 // 线程数 >= 最大线程数,使用拒绝策略处理任务 37 reject(command); 38 } 39 40 private boolean addWorker(Runnable firstTask, boolean core) { 41 retry: 42 for (;;) { 43 int c = ctl.get(); 44 int rs = runStateOf(c); 45 46 // Check if queue empty only if necessary. 47 if (rs >= SHUTDOWN && 48 ! (rs == SHUTDOWN && 49 firstTask == null && 50 ! workQueue.isEmpty())) 51 return false; 52 53 for (;;) { 54 int wc = workerCountOf(c); 55 // 检测工作线程数与核心线程数或最大线程数的关系 56 if (wc >= CAPACITY || 57 wc >= (core ? corePoolSize : maximumPoolSize)) 58 return false; 59 if (compareAndIncrementWorkerCount(c)) 60 break retry; 61 c = ctl.get(); // Re-read ctl 62 if (runStateOf(c) != rs) 63 continue retry; 64 // else CAS failed due to workerCount change; retry inner loop 65 } 66 } 67 68 boolean workerStarted = false; 69 boolean workerAdded = false; 70 Worker w = null; 71 try { 72 // 创建工作者对象,细节参考上一节所贴代码 73 w = new Worker(firstTask); 74 final Thread t = w.thread; 75 if (t != null) { 76 final ReentrantLock mainLock = this.mainLock; 77 mainLock.lock(); 78 try { 79 int rs = runStateOf(ctl.get()); 80 if (rs < SHUTDOWN || 81 (rs == SHUTDOWN && firstTask == null)) { 82 if (t.isAlive()) // precheck that t is startable 83 throw new IllegalThreadStateException(); 84 // 将 worker 对象添加到 workers 集合中 85 workers.add(w); 86 int s = workers.size(); 87 // 更新 largestPoolSize 属性 88 if (s > largestPoolSize) 89 largestPoolSize = s; 90 workerAdded = true; 91 } 92 } finally { 93 mainLock.unlock(); 94 } 95 if (workerAdded) { 96 // 开始执行任务 97 t.start(); 98 workerStarted = true; 99 } 100 } 101 } finally { 102 if (! workerStarted) 103 addWorkerFailed(w); 104 } 105 return workerStarted; 106 }
关闭线程池的方法有两种,第一种是调用shutDown()方法,第二种是调用shutDownNow()方法。两种方法的区别在于前者,会将线程池的状态设置为SHUTDOWN,并且中断空闲的线程。后者会将线程池的状态设置为STOP,并且尝试中断所有的线程。中断线程使用的是Thread.interrupt()方法,未响应的线程是无法中断的,最后shutdown()会将未执行的任务全部返回。线程池被关闭后就不能再提交新的任务了,新提交的任务会使用拒绝策略处理。
一般情况下我们会使用Exectors工具类来创建线程池。通过该类我们可以构建以下线程:
静态构造方法 | 说明 |
newFixedThreadPool(int threads) | 构建固定线程数的线程池,默认情况下,空闲线程不会被回收 |
newCachedThreadPool() | 创建线程数不固定的线程池,线程数随着任务量而变动,空闲线程超过60秒将被回收 |
newSingleThreadPool() | 创建线程数为1的线程池 |
newScheduledThreadPool(int corePoolSize) | 线程数为corePoolSize的线程池,可执行定时任务的线程池 |
标签:and one man final this schedule inter 内容 exception
原文地址:https://www.cnblogs.com/shuaixiaobing/p/11756300.html