通常我们使用JAVA来开发一个简单的并发应用时,会创建一些Runnable对象,然后创建对应的Thread对象来执行他们,但是,如果需要开发一个程序需要运行大量并发任务的时候,这个方法显然不合适。Java提供了执行器框架(Executor Framework)来解决这些问题。
Executor Framework机制分离了任务的创建和执行。通过执行器,仅需要实现Runnable接口的对象,然后把这个对象发送给执行器即可。执行器通过创建所需要的线程来负责这些Runnable对象的创建、实例化以及运行。执行器使用了线程池来提高应用程序的性能。当发送一个任务执行器时,执行器会尝试使用线程池中的线程来执行这个任务,避免了不断地创建和销毁线程而导致系统性能下降。
执行器框架另一个重要的优势是Callable接口。这个接口的主方法是call(),可以返回结果。当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个对象来控制Callable对象的状态和结果。
1、创建线程执行器。
使用执行器框架(Executor Framework)的第一步是创建ThreadPoolExecutor对象。可以使用ThreadPoolExecutor类提供的四个构造器或者使用Executor工厂类来创建ThreadPoolExecutor对象。一旦有了执行器,就可以将Runnable或者Callable对象发送给它去执行了。下面将用实例来演示Java创建线程执行器。
package org.concurrency.executorframework; import java.util.Date; import java.util.concurrent.TimeUnit; /** * @author Administrator * 定义一个任务类,实现Runnable接口 * 只是定义,不执行 */ public class Task implements Runnable { private Date initDate;//存储任务创建时间 private String name;//存储任务的名称 public Task() { } public Task(String name) { initDate = new Date(); this.name = name; } @Override public void run() { // TODO Auto-generated method stub System.out.printf("%s: Task %s Created on: %s\n",Thread.currentThread().getName(),name,initDate); System.out.printf("%s: Task %s Started on: %s\n",Thread.currentThread().getName(),name,initDate); try { Long duration = (long)(Math.random()*10); System.out.printf("%s: Task %s: Doing a task during %d seconds\n",Thread.currentThread().getName(),name,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("%s: Task %s: Finished on: %s\n",Thread.currentThread().getName(),name,new Date()); } } package org.concurrency.executorframework; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; /** * @author Administrator * 它将执行通过执行器接收到的每一个任务。 */ public class Server { private ThreadPoolExecutor executor; public Server() { /*线程执行器的创建有两个方式: * 一个是直接使用ThreadPoolExecutor的构造器来实现 * 一个是通过Executors工厂类来构造执行器和其他相关对象。 * 但是由于TheadPoolExecutor构造器在使用上的复杂性,推荐使用Executors工厂类类创建。 * 这里使用了Executors工厂类的newCacheThreadPoolExecutor()方法来创建一个缓存线程池 * 返回一个ExecutorService对象,因此被强制转换成ThreadPoolExecutor类型。 * 使用线程池的优点是减少新建线程所花费的时间。此类缓存池的缺点是,如果发送过多任务给执行器,系统的复合会过载。 * 当且仅当线程的数量是合理的,或者线程只会运行很短的时间时,适合采用缓存线程池类。 * */ executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); } /** * 创建了执行器之后,就可以使用执行器的execute()方法来发送Runnable或者Callable类型的任务。 * 这里的Task是实现了Runnable接口的对象。 * 这里也有一些执行器相关的日志信息: * getPoolSize():返回执行器线程池中实际的线程数 * getActiveCount():返回执行器中正在执行任务的线程数 * getCompleteTaskCount():返回执行器中已经完成的任务数 * */ public void executeTask(Task task){ System.out.printf("Server: A new task hs arrived\n"); executor.execute(task); System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize()); System.out.printf("Server: Active Count: %d\n",executor.getActiveCount()); System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount()); } /** * 执行器以及ThreadPoolExecutor类一个重要的特性是,通常需要显示地区结束,如果不这样做,那么执行器将继续执行。 * 为了完成执行器的执行,可以使用ThreadPoolExecutor类的shutdown()方法。当执行器执行完所有待运行的任务,它将结束执行。 * 如果再shutdown()方法之后,有新的任务发送给执行器,那么会报出RejectExecutionException异常。 * */ public void endServer(){ executor.shutdown(); } } package org.concurrency.executorframework; /** * @author Administrator * main主程序,循环创建Task */ public class Task_Main { public static void main(String[] args) { // TODO Auto-generated method stub Server server = new Server(); for(int i = 0;i < 100;i++){ Task task = new Task("Task"+i); server.executeTask(task); } server.endServer(); } }
ThreadPoolExecutor类提供了其他结束执行器的方法:
shutdownNow():这个方法会立即关闭执行器。执行器将不再执行那些正在等待执行的任务。这个方法将返回等待执行的任务列表。调用时,正在执行的任务将继续执行,但这个方法不等待这个任务的完成。
isTerminated():如果调用了shutdown()或shutdownNow()方法,并且执行器完成了关闭过程,那么这个方法将返回true。
isShutdown():如果调用了shutdown()方法,则返回true。
awaitTermination(long timeout,TimeUnit unit):这个方法将阻塞所调用的线程,知道执行器完成任务或者达到所指定的timeout值。
2、创建固定大小的线程执行器
当使用Executors类的newCachedThreadPool()方法创建的ThreadPoolExecutor时,执行器运行过程中将碰到线程数量问题。如果线程池中没有空闲的线程可用,那么执行器将为接收到的每一个任务创建一个新的线程,当发送大量的任务给执行器并且任务需要持续较长的时间时,系统将会超负荷,应用程序也将随之不佳。
为了避免这个问题,Executors工厂类提供了一个方法来床架一个固定大小的线程执行器。这个执行器有一个线程数的最大值,如果发送超过这个最大值的任务给执行器,执行器将不会创建额外的线程,剩下的任务将被阻塞直到执行器有空闲的线程可用。这个特性可以保证执行器不会给应用程序带来性能不佳的问题。
可以对上述示例进行修改
public Server() { executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); } public void executeTask(Task task){ System.out.printf("Server: A new task hs arrived\n"); executor.execute(task); System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize()); System.out.printf("Server: Active Count: %d\n",executor.getActiveCount()); System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount()); System.out.printf("Server: Task Count: %d\n",executor.getTaskCount()); }
在这个示例中使用了Executors工厂类的newFixedThreadPool()方法来创建执行器。这个方法创建了具有线程数量最大值的执行器。如果发送超过线程数的任务给执行器,剩余的任务将被阻塞知道线程池里有空闲的线程来处理他们。
3、在执行器中执行任务并返回结果
执行器框架(Executor Framework)的优势之一是,可以运行并发任务并返回结果。Callable:这个接口声明了call()方法。可以在这个方法里实现任务的具体逻辑操作。Callable接口是一个泛型接口,这意味着必须声明call()方法返回的数据类型。Future:这个接口声明了一些方法来获取由Callable对象产生的结果,并管理它们的状态。
package org.concurrency.executorframework.callable; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class FactorialCalculator implements Callable<Integer> { private Integer number;//存储任务即将用来计算的数字 public FactorialCalculator(Integer number) { this.number = number; } @Override public Integer call() throws Exception { // TODO Auto-generated method stub int result = 1; if(number ==0 || number ==1){ result = 1; }else{ for(int i =2;i<number;i++){ result *= i; TimeUnit.MILLISECONDS.sleep(20); } } System.out.printf("%s: %d\n",Thread.currentThread().getName(),result); return result; } public static void main(String[] args) { /*通过Executors工厂类的newFixedThreadPool()方法创建ThreadPoolExecutor执行器来运行任务。这里最多创建2个线程*/ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); List<Future<Integer>> resultList = new ArrayList<Future<Integer>>(); Random random = new Random(); for(int i=0;i<10;i++){ int number = random.nextInt(10); FactorialCalculator calculator = new FactorialCalculator(number); Future<Integer> result = executor.submit(calculator); resultList.add(result); } do{ System.out.printf("Main: Number of Completed Tasks:%d\n",executor.getCompletedTaskCount()); for(int i=0;i<resultList.size();i++){ Future<Integer> result = resultList.get(i); System.out.printf("Main: Task %d: %s\n",i,result.isDone()); try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }while(executor.getCompletedTaskCount() < resultList.size()); System.out.printf("Main: Results\n"); for(int i =0;i<resultList.size();i++){ Future<Integer> result = resultList.get(i); Integer number = null; try { number = result.get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("Main: Task %d\n",i,number); } executor.shutdown(); } }
在本节中我们学习了如何使用Callable接口来启动并发任务并返回结果。我们编写了FactorialCaculator类,它实现了带有泛型参数Integer类型的Callable接口。因此,这个Integer类型将作为调用call()方法时返回的类型。
我们通过submit()方法发送一个Callable对象给执行去执行,这个submit()方法接收Callable对象作为参数,并返回Future对象。Future对象可以用于以下两个目的。
控制任务状态:可以取消任务或者检查任务是否已经完成。为了达到这个目的,可使用isDone()方法来检查任务是否已经完成。
公国call()方法获取返回结果。为了达到这个目的,可以使用get()方法。这个方法一直等待直到Callable对象的call()方法执行完成并返回结果。如果get()方法在等待结果时中断了,则会抛出异常。如果call()方法抛出异常,那个get()也会抛出异常。
4、运行多个任务并处理第一个结果
并发编程中比较常见的一个问题是,当采用多个并发任务解决一个问题时,往往只关系这些任务的第一个结果。例如允许两种验证机制,只要有一种验证机制成功,那么就验证通过。这主要是用到了ThreadPoolExecutor类的invokeAny()方法。
5、运行多个任务并处理所有结果。
执行器框架(Executor Framework)允许执行并发任务而不需要去考虑线程创建和执行。它还提供了可以用来控制在执行器中执行任务的状态和获取任务结果的Future类。
6、在执行器中周期性执行任务。
执行器框架提供了ThreadPoolExecutor类,通过线程池来执行并发任务从而避免了执行所有线程的创建操作。当一个任务给执行器后,根据执行器的配置,它将尽快地执行这个任务。当任务执行结束后,这个任务就会从执行器中删除;如果想再次执行这个任务,则需要再次发送这个任务到执行器。
但是执行器框架提供了ScheduledThreadPoolExecutor类来执行周期性的任务。通过Executors工厂类的newScheduledThreadPoolExecutor()方法创建ScheduledThreadPoolExecutor执行器对象。这个方法接收一个表示线程中的线程数类作参数。一旦有了可以执行周期性的执行器,就可以发送任务给这个执行器。使用scheduledAtFixedRate()方法发送任务。scheduledAtFixedRate()方法返回一个ScheduledFuture对象,ScheduledFuture接口则扩展了Future接口,于是它带有了定时任务的相关操作方法。使用getDelay()方法返回任务到下一次执行时所要等待的剩余时间。我们将通过一个实例来演示周期性执行任务
package org.concurrency.executorframework.scheduled; import java.util.Date; /** * @author Administrator * 创建任务线程 */ public class Task implements Runnable { private String name; public Task(String name) { this.name = name; } // @Override // public String call() throws Exception { // // TODO Auto-generated method stub // System.out.printf("%s: Starting at : %s\n",name,new Date()); // return "Hello,world"; // } @Override public void run() { // TODO Auto-generated method stub System.out.printf("%s: Starting at : %s\n",name,new Date()); // return "Hello,world"; } } package org.concurrency.executorframework.scheduled; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author Administrator * 主线程类 */ public class Main { public static void main(String[] args) { // TODO Auto-generated method stub /*使用scheduledThreadPoolExecutor()方法创建ScheduledExecutorService对象,并转化为ScheduledThreadPoolExecutor * 这个方法接收一个表示线程数量的整数作为参数。 * */ ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); System.out.printf("Main: Starting at: %s\n",new Date()); Task task = new Task("Task"); /* * 使用scheduledAtFixedRate()方法发送任务。这个方法接收四个参数 * 1.被周期执行的任务 * 2.执行第一次任务执行后的延时时间 * 3.两次执行的时间周期 * 4.第2个和第3个参数的时间单位 * 两次执行之间的周期是指任务咋两次执行开始的时间间隔。 * 这期间可能会存在多个任务实例 * */ ScheduledFuture<?> result = executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS); for(int i = 0;i<10;i++){ System.out.printf("Main: Delay: %d\n",result.getDelay(TimeUnit.MILLISECONDS)); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } executor.shutdown(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.printf("Main: Finished at: %s\n",new Date()); } }
执行结果截图:
7、在执行器中取消任务。如果需要取消已经发送给执行器的任务,则需要使用Future接口的cancle()方法来执行取消操作。
8、在执行器中控制任务的完成。FutureTask类中提供了一个名为done()方法,允许在执行器中的任务执行结束后还可以执行一些代码。例如生成报表,通过邮件发送结果或释放一些系统资源等。我们可以可以覆盖FutureTask类的done()方法来控制任务的完成。
9、在执行器中分离任务的启动和出结果的处理
通常情况下,使用执行器执行并发任务时,将Runnable或Callable任务发送给执行器,并获得Future对象来控制任务。此外,还会碰到如下情形,需要在一个对象里发送任务给执行器,然后唉另一个对象里处理结果。对于这种情况,Java API提供了CompletionService类。
CompletionService类有一个方法用来发送任务给执行器,还有一个方法为下一个已经执行结束的任务获取Future对象。
我们将通过一个实例学习如何使用CompletionService类,在执行器中分离任务的启动与结果的处理。
package org.concurrency.executorframework.callable; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * @author Administrator * */ public class ReportGenerator implements Callable<String> { /*用来表示数据和报告*/ private String sender; private String title; public ReportGenerator(String sender, String title) { super(); this.sender = sender; this.title = title; } @Override public String call() throws Exception { // TODO 让线程休眠一段随机时间 long duration = (long) (Math.random()*10); System.out.printf("%s_%s: ReportGenerator:Generating a report during %d seconds\n",this.sender,this.title,duration); TimeUnit.SECONDS.sleep(duration); String ret = sender+":"+title; return ret; } } package org.concurrency.executorframework.callable; import java.util.concurrent.CompletionService; /** * @author Administrator * 用来模拟请求报告 */ public class ReportRequest implements Runnable { private String name; private CompletionService<String> service; public ReportRequest(String name, CompletionService<String> service) { this.name = name; this.service = service; } @Override public void run() { // TODO 创建了ReportGenerator对象,并使用submit()方法将此对对象发送给CompletionService。 ReportGenerator reportGenerator = new ReportGenerator(name, "Report"); service.submit(reportGenerator); } } package org.concurrency.executorframework.callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * @author Administrator * 这个类将获取ReportGenerator任务的结果 */ public class ReportProcessor implements Runnable { private CompletionService<String> service; private boolean end; public ReportProcessor(CompletionService<String> service) { this.service = service; end = false; } @Override public void run() { // TODO 获取下一个已经完成任务的Future对象;当然这个任务是采用CompletionService来完成 /*当 *完成服务*任务结束,这些任务中的一个任务就执行结束了,完成服务中存储着Future对象,用来空载它在队列中的队形 * 调用poll()方法访问这个队列,查看是否有任务已经完成,如果有就返回队列中的第一个元素,即一个任务执行完成后的Future对象。 * 当poll()方法返回Future对象后,它将从队列中删除这个Future对象。 * */ while(!end){ try { Future<String> result = service.poll(20, TimeUnit.SECONDS); if(result != null){ String report = result.get(); System.out.println("ReportReciver:Report Received:"+ report); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("ReportSender: End"); } } public void setEnd(boolean end) { this.end = end; } } package org.concurrency.executorframework.callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author Administrator * 线程启动类 */ public class Main { public static void main(String[] args) { // TODO Auto-generated method stub ExecutorService executor = Executors.newCachedThreadPool(); CompletionService<String> service = new ExecutorCompletionService<>(executor); ReportRequest faceRequest = new ReportRequest("Face", service); ReportRequest onlineRequest = new ReportRequest("Online", service); Thread faceThread = new Thread(faceRequest); Thread onlineThread = new Thread(onlineRequest); ReportProcessor processor = new ReportProcessor(service); Thread senderThread = new Thread(processor); System.out.println("Main: Staring the Threads"); faceThread.start(); onlineThread.start(); senderThread.start(); try { System.out.println("Main: Waiting for the reportgenerators."); faceThread.join(); onlineThread.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Main: Shutting down the executor."); executor.shutdown(); try { /*调用awaitTerminated()方法等待所有任务执行结束*/ executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } processor.setEnd(true); System.out.println("Main:Ends"); } }
执行结果截图:
10、处理在执行器中被拒绝的任务。
当我们想结束执行器的执行时,调用shutdown()方法来表示执行器应当结束,但是,执行器只有等待正在运行的任务或者等待执行的任务结束后,才能真正结束。如果在此期间发送给一个任务给执行器,这个任务会被拒绝,ThreadPoolExecutor提供了一套机制来处理被拒绝的任务。这些任务实现了RejectExecutionHandler接口。
本文出自 “阿酷博客源” 博客,请务必保留此出处http://aku28907.blog.51cto.com/5668513/1788500
原文地址:http://aku28907.blog.51cto.com/5668513/1788500