标签:存在 char unit nts 基于 tar 要求 handle 提高
将应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,并提供一种自然的并行工作结构来提升并发性
理想情况下,能找出清晰的任务边界,各个任务之间是相互独立的,任务不依赖于其他任务的状态、结果或边界效应。
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应,而且当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。
串行地执行任务:
public class Demo { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while(true){ Socket connection = socket.accept(); handleRequest(connection); } } }
在web请求的处理中包含了一组不同的运算与I/O操作。服务器必须处理套接字I/O以读取请求和写回响应,这些操作通常会由于网络拥塞或联通性问题而被阻塞。
创建线程执行:以独立的客户请求为边界
public class Demo { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while(true){ final Socket connection = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(connection); } }; new Thread(task).start(); } } }
但是线程的创建与销毁并不是没有代价的。并且活跃的线程会消耗系统资源,尤其是内存,若可运行的线程数量多于可用处理器数量,那么有些线程将会闲置。大量空闲线程会占用许多内存,给垃圾回收期带来压力,而且大量线程在竞争CPU资源时还将产生其他的性能开销,如果已经有足够多的线程使所以cpu保持忙碌状态,那么再创建更多的线程反而会降低性能。
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制,在java类库中,任务执行的主要抽象不是Thread,而是Executor
public interface Executor{ void excute(Runnable command); }
虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息搜集、应用程序管理机制和性能检测等机制。
基于Executor的Web服务器
/** * 在TaskExecutionWebServer中,通过Executor,将请求任务的提交与任务的实际执行解耦开来, * 并且只需采用另一种不同的Executor实现,就可以改变服务器的行为。 */ public class TaskExecutionWebServer { private static final int NTHREADS = 100; private static final ExecutorService exec = Executors.newFixedThreadPool(100); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while(true){ final Socket connection = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(connection); } }; exec.execute(task); } } }
我们也可以很容易地改变Executor的行为,如为每一个请求都创建一个新线程:
public class TaskExecutionWebServer implements Executor{ public void excute(Runnable r){ new Thread(r).start(); }; }
也可以类似于单线程的行为,以同步的方式执行每个任务:
public class TaskExecutionWebServer implements Executor{ public void excute(Runnable r){ r.run(); }; }
线程池
线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务,通过重用现有的线程而不是创建新线程。
类库提供了灵活的线程池以及一些有用的默认配置,可以通过调用Executors中的静态方法来创建一个线程池:
通过使用Executor,可以实现各种调优、管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。
Executor的生命周期
JVM只有在所有非守护线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。
ExecutorService的生命周期有3种状态:运行、关闭和已经终止。
ExecutorService在初始创建时处于运行状态。
shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。
shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的额任务,并且不再启动队列中尚未开始执行的任务。
在ExecutorService关闭后提交的任务将由拒绝执行处理器来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。
等所有任务都完成后,ExecutorService转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。
public interface ExecutorService extends Executor{ void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; //... }
public class LifecycleWebServer{ private final ExecutorService exec = Executors.newFixedThreadPool(100); public void start() throws IOException{ ServerSocket socket = new ServerSocket(80); while(!exec.isShutdown()){ try{ final Socket conn = socket.accept(); exec.execute(new Runnable(){ public void run(){ handleRequest(conn); } }); }catch(RejectedExecutionException e){ if(!exec.isShutdown()){ log("task submission rejected",e); } } } } public void stop(){ exec.shutdown(); } void handleRequest(Socket connection){ Request req = readRequest(connection); if(isShutdownRequest(req)){ stop(); }else{ dispatchRequest(req); } } }
延迟任务与周期任务
Timer类负责管理延迟或周期任务,然而存在一些缺陷:
Timer在执行所有定时任务时只会创建一个线程。若某个任务的执行时间过长,那么将破坏其他TimerTask的定时准确性。
Timer线程并不捕获异常,当TimerTask抛出未检查的异常时将终止定时线程,Timer也不会恢复线程,因此已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。
Timer支持基于绝对时间而不是相对时间的调度机制,因此任务的执行对系统时钟变化很敏感,而ScheduledThreadPoolExecutor只支持基于相对时间的制度。
/** * 最简单的方法是对HTML文档进行串行处理,遇到文本标签时,绘制到缓存中,遇到图像引用时,先通过网络获取它,再绘制到缓存中。 * 这很容易实现,程序只需将输入中的每个元素处理一次,但这种方法用户体验很差,他们必须等待很长时间,直到显示所有文本。 * * 但另一种串行方法会更好一些,它先绘制文本元素,同时为图像预留出矩形占位空间。在处理完文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。 * 图像下载过程的大部分时间都是在等待I/O操作执行完成,在这期间CPU几乎不做任何工作。 * 因此,这种串行方式没有充分的利用CPU,使得用户在看到最终页面之前要等待很长时间。 * 通过将问题分解为多个独立的任务并发执行,能够获得更高的CPU利用率和相应灵敏度。 */ public class SingleThreadRender { void renderpage(CharSequence source){ renderText(source); List<ImageData> imageData = new ArrayList<ImageData>(); for(ImageData imageInfo : scanForImageInfo(source)){ imageData.add(imageInfo.downloadImage()); } for(ImageData data : imageData){ renderImage(data); } } }
Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
Runnable与Callable描述的都是抽象的任务。这些任务通常是有范围的,即都有一个明确的起始点,且最终会结束。
Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。
由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当他们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在完成状态上。
public interface Callable<V> { V call() throws Exception; } public interface Future<V>{ boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException,CancellationException; V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,CancellationException,TimeoutException; }
ExecutorService中所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。
还可以显示地为某个指定的Runnable或Callable实例化一个FutureTask(由于FutureTask实现了Runnable,因此塔可以提交给Executor来执行,或者调用它的run方法)。
Java6开始,ExecutorService实现可以改写AbstractExecutorService中的newTaskFor方法,从而根据已提交的Runnable或Callable来控制Future的实例化过程。
在默认实现中仅创建了一个新的FutureTask:
public class ThreadPoolExecutor { protected <T> RunnableFuture<T> newTaskFor(Callable<T> task){ return new FutureTask<T>(task); } }
在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程,即将Runnable或Callable从提交线程发布到最终执行任务的线程。类似的,在设置Future结果的过程中,也需要将这个结果从计算它的线程发布到任何通过get获得它的线程。
/** * 将渲染过程分解为两个任务,一个是渲染所有的文本(cpu),另一个是下载所有的图像(IO)。 * FutureRenderer使得渲染文本任务与下载图像数据的任务并发执行,当所有图像下载完,会显示到页面上。 * future.get()的调用处理了两个可能的问题:任务遇到一个Exception,或者调用get的线程在获得结果之前被中断。 * * FutureRenderer使用了两个任务,其中一个负责渲染文本,另一个负责下载图像。 * 但如果渲染文本的速度远远高于下载图像的速度,那么程序的最终性能与串行执行时的性能差别不大。 * 只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出程序的工作负载分配到多个任务中带来的真正性能提升。 */ public class FutureRenderer { private final ExecutorService executor = Executors.newFixedThreadPool(1); void renderPage(CharSequence source){ final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task = new Callable<List<ImageData>>(){ public List<ImageData> call(){ List<ImageData> result = new ArrayList<ImageData>(); for(ImageInfo imageInfo : imageInfos){ result.add(imageInfo.downloadImage); } return result; } } Future<List<ImageData>> future = executor.submit(task); renderText(source); try{ List<ImageData> imageData = future.get(); for(ImageData data : imageData){ renderImage(data); } }catch(InterruptedException e){ //重新设置线程的中断状态 Thread.currentThread().interrupt(); //由于不需要结果,因此取消任务 future.cancel(); }catch(ExecutionException e){ throw launderThrowable(e.getCause()); } } }
如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复调用get方法,同时将timeout指定为0,从而通过轮询来判断任务是否完成,这显然会很繁琐。
CompletionService将Executor和BlockingQueue的功能融合在一起。可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法获得已完成的结果,而这些结果会在完成时封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
利用BlockingQueue的阻塞性来获取任务结果Future
private class QueueingFuture<V> extends FutureTask<V> { QueueingFuture(Callable<V> c){ super(c); } QueueingFuture(Runnable t,V r){ super(t,r); } protected void done(){ completionQueue.add(this); } }
/** * ExecutorCompletionService的实现很简单,在构造函数中创建一个BlockingQueue来保存计算完成的结果。 * 当计算完成时,调用Future-Task中的done方法。 * 当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法, * 并将结果放入BlockingQueue中,将take和poll方法委托给了BlockingQueue,这些方法会在得出结果之前阻塞。 * * CompletionService从两个方面提高了页面渲染器性能:缩短总运行时间以及提高响应性。 * 为每一个image的下载都创建一个独立任务,并在线程池中执行它们,将串行地下载转换为并行下载过程,这将减少下载所有image的总时间。 * 另外在image完成下载后可以立刻显示出来。使用户获得一个更加动态和更加响应性的用户界面。 */ public class Renderer { private final ExecutorService executor; public Renderer(ExecutorService executor){ this.executor = executor; } public renderPage(CharSequence source){ List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor); for(final ImageInfo imageInfo : info){ completionService.submit(new Callable<ImageData>(){ public ImageData call(){ return imageInfo.downloadImage(); } }); renderText(source); try{ for(int t=0,n=info.size();t<n;t++){ Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); }catch(ExecutionException e){ throw launderThrowable(e.getCause()); } } } }
在获取不同公司报价的过程中,可能会调用web服务、访问数据库、执行一个EDI失误或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的消息。对于没有及时响应的服务提供者,页面可以忽略它们。
可以创建n个任务,将其提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法--invokeAll
invokeAll方法的参数为一组任务,并返回一组Future。这两个集合有着相同的结构。
invokeAll按照任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示的Callable关联起来。
当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定限时(未完成的任务会取消),invokeAll将返回。客户端可以调用get或者isCancelled来判断究竟是何种情况。
private class QuoteTask implements Callable<TravelQuote>{ private final TravelCompany company; private final TravelInfo travelInfo; //... public TravelQuote call() throws Exception{ return company.solicitQuote(travelInfo); } } public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,Comparator<TravelQuote> ranking,long time,TimeUnit unit) throws InterruptedException{ List<QuoteTask> tasks = new ArrayList<QuoteTask>(); for(TravelCompany company : companies){ tasks.add(new QuoteTask(company,travelInfo)); } List<Future<TravelQuote>> futures = exec.invokeAll(tasks,time,unit); List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size()); Iterator<QuoteTask> taskIte = tasks.iterator(); for(Future<TravelQuote> f : futures){ QuoteTask task = taskIte.next(); try{ quotes.add(f.get()); }catch(ExecutionException e){ quotes.add(task.getFailureQuote(e.getCause())); }catch(CancellationException e){ quotes.add(task.getTimeoutQuote(e)); } } Collections.sort(quotes, ranking); return quotes; }
#笔记内容来自《 java并发编程实战》
标签:存在 char unit nts 基于 tar 要求 handle 提高
原文地址:https://www.cnblogs.com/shanhm1991/p/9899171.html