6 任务执行
大多数并发应用程序都是围绕"任务执行(Task Execution)"来构造的。理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。所以应该选择清晰的任务边界以及明确的任务执行策略。
A: 找出清晰的任务边界
大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。
B:
6.1 在线程中执行任务
6.1.1 串行地执行任务
一个接一个地执行,性能很糟糕。
6.1.2 显式地为任务创建线程
A:任务处理过程从主线程中分离出来,使得主循环能够更快地重新等待下一个到来的连接。
这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性。
B:任务可以并行处理,从而能同时服务多个请求。如果有多个处理器,或者任务由于某种
原因被阻塞,例如等待I/O完成、获取锁或者资源可用性等,程序的吞吐量将得到提高。
C:任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码。
6.1.3无限制创建线程的不足
开销高,资源耗尽,稳定性差。
6.2 Executor框架
每当看到下面这种形式的代码时: new Thread(runnable).start()。并且你希望获得一种更灵活的执行策略时,请考虑使用Executor来代替Thread.
Executo:执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
6.2.1 Executor的生命周期及方法
为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。在程序清单6-7中给出了ExecutorService中的生命周期管理方法。
Shutdown() 方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成—包括那些还未开始执行的任务。
shutdownNow() 方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
在ExecutorService关闭后提交的任务将执行饱和拒绝策略。等所有任务都完成后,ExecutorService将转人终止状态。
awaitTermination()等待ExecutorService到达终止状态。通常在调用awaitTermination之后会立即调用shutdown产生同步地关闭ExecutorService的效果。
isTerminated()轮询ExecutorService是否已经终止。
6.2.2 延迟任务与周期任务
Timer支持基于绝对时间而不是相对时间的调度机制,因此任务的执行对系统时钟变化很敏感,而ScheduLedThreadPoolExecutor只支持基于相对时间的调度。因此应该考虑使用Scheduled.ThreadPoolExecutor来代替它。JDK 5 以后很少用Timer。如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue, 为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。
6.2.3 携带结果的任务Callable与Future
Executor框架使用Runnable作为其基本的任务表示形式。但它不能返回一个值或抛出一个受检查的异常。许多任务实际上都是存在延迟的计算,对于这些任务,Callable是一种更好的抽象。它认为主人口点(即call)将返回一个值,并可能抛出一个异常。在Execute:中包含了一些辅助方法能将其他类型的任务封装为一个Callable。要使用Callable来表示无返回值的任务,可使用Callable<Void>。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在"完成"状态上。
get() 方法:
get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception。如果任务没有完成,那么get将阻塞并直到任务。完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。任务被取消,那么get将抛出CancellationException。
getCause()
如果get抛出了ExecutionException,可以通过getCause()来获得被封装的初始异常。
boolean cancel(Boolean mayInterruptlfRunning):试图取消该Future里关联的Callable任务
mayInterruptlfRunning参数为true表示任务线程可以在运行过程中中断。
V get(long timeout, TimeUnit unit):
返回Callable任务里的call方法的返回值,该方法让程序最多阻塞timeout和unit指定的时间。 如果经过指定时间后Callable任务依然没有返回值,将会抛出TimeoutException。
boolean isCancelled():如果在Callable任务正常完成前被取消,则返回true。
boolean isDone():如果Callable任务已经完成,则返回true
构造方法摘要 | ||
FutureTask(Callable<V> callable) | ||
FutureTask(Runnable runnable, V result) | ||
方法摘要 | ||
boolean | cancel(boolean mayInterruptIfRunning) | |
protected void | done() | |
get() | ||
get(long timeout, TimeUnit unit) | ||
boolean | isCancelled() | |
boolean | isDone() | |
void | run() | |
protected boolean | runAndReset() | |
protected void | ||
protected void | setException(Throwable t) |
可以通过许多种方法创建一个Future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。还可以显式地为某个指定的Runnable或Callable实例化一个FutureTasko(由于FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用它的run方法。) 从Java 6开始,ExecutorService实现可以改写AbstractExecutorService中的nea=rTaskFor方法,从而根据已提交的Runnable或Callable来控制Future的实例化过程。在默认实现中仅创建了一个新的FutureTask,如程序清单6-12所示。
在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程(请参见3.5节),即将Runnable或Callable从提交线程发布到最终执行任务的线程。类似地,在设置Future结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。
用例:
package com.del;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class TestCallable {
public static void main(String[] args) {
//1.创建Callable实现类的实例
ThreadDemo td = new ThreadDemo();
//2.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
FutureTask<Integer> result = new FutureTask<Integer>(td);
//3.FutureTask实现了Runnable, Future<V>,接口,Thread也实现了Runnable接口,所以 FutureTask可以入参
new Thread(result).start(); //一般是将callable任务提交到线程池
//4.接收线程运算后的结果
try {
//调用get()方法将导致线程阻塞,必须等到子线程结束才得到返回值
Integer sum = result.get();
System.out.println("call方法的返回值:"+sum);
} catch (Exception e) {
e.printStackTrace();
}
}
}
class ThreadDemo implements Callable<Integer> {
/**
* 重写call()方法
*/
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100000; i++) {
sum += i;
}
return sum;
}
}
我们还希望下载一张图片就显示一张图片(生产者—消费者模式?见6.3.2)
6.3 找出最优的并行性
6.3.1在异构任务并行化中存在的局限
FutureRenderer使用了两个任务,其中一个负责渲染文本,另一个负责下载图像。如果渲
染文本的速度远远高于下载图像的速度(可能性很大),那么程序的最终性能与串行执行时的性能差别不大,而代码却变得更复杂了。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分
配到多个任务中带来的真正性能提升。
6.3.2 CompletionService:Executor与BlockingQueue
如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService) .
CompletionService将Executor和BlockingQueue的功能融合在一起。你p.1一以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。 ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
方法摘要 | |
poll() | |
poll(long timeout, TimeUnit unit) | |
submit(Callable<V> task) | |
submit(Runnable task, V result) | |
take() |
ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用Future-Task中的done方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中,如程序清单6-14所示。take和 poll方法委托给了BlockingQueue,这些方法会在得出结果之前阻塞。
多个ExecutorCompletionService可以共享一个Executor。
6.3.3 限时任务的设置
.限时Future.get的一种典型应用。
.生成的页面中包括响应用户请,求的内容以及从广告服务器上获得的广告。它将获取广告的任务提交给一个Executor,然后计算剩余的文本页面内容,最后等待广告信息,直到超出指定的时间。(传递给get的timeout参数的计算方法是,将指定时限减去当前时间。这可能会得到负数,但java.util.concurrent中所有与时限相关的方法都将负数视为零,因此不需要额外的代码来处理这种情况。)如果get超时,那么将取消广告获取任务,并转而使用默认的广告信息。(Future.cancel的参数为true表示任务线程可以在运行过程中中断。)