标签:executor void image 16px 用户 jdk 两种 call() 移除
讲线程池之前我们先看一下线程的实现方式: Thread,Runnable,Callable
// 实现Runnable接口的类将被Thread执行,表示一个基本的任务
public interface Runnable {
// run方法就是它所有的内容,就是实际执行的任务
public abstract void run()?
}
//Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容 public interface Callable<V> { // 相对于run方法的带有返回值的call方法 V call() throws Exception? }
在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。
(1)降低资源的消耗。通过重用存在的线程,减少线程创建,消亡的开销,提高性能。
(2)提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
(3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
ThreadPoolExecutor执行示意图 :
ThreadPoolExecutor执行execute方法分下面4种情况:
(1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
(2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
(3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要取全局锁)。
(4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
a. ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
b. LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
c. SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
d. priorityBlockingQuene:具有优先级的无界阻塞队列;
a. AbortPolicy:直接抛出异常,默认策略;
b. CallerRunsPolicy:用调用者所在的线程来执行任务;
c. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
d. DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
public class PolicySample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3,
5,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
Future<String> future = null;
List<Future<String>> list = new ArrayList<Future<String>>();
for (int i=0;i<20;i++){
list.add(pool.submit(new CallTask()));
}
if(!list.isEmpty()){
for (Future<String> f : list){
System.out.println(f.get());
}
}
}
}
(1)RUNNING
a. 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
b. 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
(2)SHUTDOWN
a. 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
b. 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
(3)STOP
a. 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
b. 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
(4)TIDYING
a. 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
b. 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
(5)TERMINATED
a. 状态说明:线程池彻底终止,就变成TERMINATED状态。
b. 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
进入TERMINATED的条件如下:
I. 线程池不是RUNNING状态;
II. 线程池状态不是TIDYING状态或TERMINATED状态;
III. 如果线程池状态是SHUTDOWN并且workerQueue为空;
IV. workerCount为0;
V. 设置TIDYING状态成功。
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
下图为它的继承与实现
从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))?
private static final int COUNT_BITS = Integer.SIZE 3?
private static final int CAPACITY = (1 << COUNT_BITS) 1?
ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
public void execute(Runnable command) { if (command == null) throw new NullPointerException()? //clt记录着runState和workerCount; 高3位为状态,低29位是线程数 int c = ctl.get()? /* 1.workerCountOf方法取出低29位的值,表示当前活动的线程数; * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; * 并把任务添加到该线程中。 */ if (workerCountOf(c) < corePoolSize) { /* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断; * 如果为true,根据corePoolSize来判断; * 如果为false,则根据maximumPoolSize来判断 */ if (addWorker(command, true)) return? // 如果添加失败,则重新获取ctl值 c = ctl.get()? } //2. 如果当前线程池是运行状态并且任务添加到队列中 if (isRunning(c) && workQueue.offer(command)) { // 重新获取ctl值 int recheck = ctl.get()? // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了, // 这时需要移除该command,执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回 if (!isRunning(recheck) && remove(command)) reject(command)? //获取线程池中的有效线程数,如果数量是0,则执行addWorker方法 else if (workerCountOf(recheck) == 0) /*这里传入的参数表示: * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动; * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为 maximumPoolSize,添加线程时根据maximumPoolSize来判断。 */ addWorker(null, false)? } /* * 如果执行到这里,有两种情况: * 1. 线程池已经不是RUNNING状态; * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且 workQueue已满。 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 如果失败则拒绝该任务 */ else if (!addWorker(command, false)) reject(command)? }
标签:executor void image 16px 用户 jdk 两种 call() 移除
原文地址:https://www.cnblogs.com/yufeng218/p/13174604.html