标签:
Executor框架分离了任务的创建和执行。JAVA SE5的java.util.concurrent包中的执行器(Executor)管理Thread对象,从而简化了并发编程。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程 池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等.
1.Executor接口
public interface Executor{
void execute(Runnable task);
};
Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService.执行已提交的Runnable任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():
Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ... 内存一致性效果:线程中将Runnable
对象提交到 Executor之前的操作 happen-before其执行开始(可能在另一个线程中)。
2.ExecutorService
public interface ExecutorService extends Executor { void shutdown();//关闭方法,调用后执行之前提交的任务,不再接受新的任务 List<Runnable> shutdownNow();//从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表 boolean isShutdown();// 判断执行器是否已经关闭 boolean isTerminated();//关闭后所有任务是否都已完成 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task);//提交一个Callable任务 <T> Future<T> submit(Runnable task, T result);//提交一个Runable任务,result要返回的结果 Future<?> submit(Runnable task);//提交一个任务 /** *执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** *执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** *执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。 */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** *执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果. */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。
ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。
1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务
2 public interface Runnable {
3 // run方法就是它所有的内容,就是实际执行的任务
4 public abstract void run();
5 }
6 // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
7 public interface Callable<V> {
8 // 相对于run方法的带有返回值的call方法
9 V call() throws Exception;
10 }
3.Future:代表异步任务执行的结果
public interface Future<V> { //尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。 boolean cancel(boolean mayInterruptIfRunning); // 返回代表的任务是否在完成之前被取消了 boolean isCancelled(); //如果任务已经完成,返回true boolean isDone(); //获取异步任务的执行结果(如果任务没执行完将等待 V get() throws InterruptedException, ExecutionException; //获取异步任务的执行结果(有最常等待时间的限制) 26 * 27 * timeout表示等待的时间,unit是它时间单位 28 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
4.ScheduledExecutorService
//可以安排指定时间或周期性的执行任务的ExecutorService public interface ScheduledExecutorService extends ExecutorService { //在指定延迟后执行一个任务,只执行一次 public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit); //与上面的方法相同,只是接受的是Callable任务 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); //创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit // 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务, // 时间单位都是unit.每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), // initialDelay + 2 * (任务运行时间+delay)... public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
5.AbstractExecutorService
//提供ExecutorService的默认实现 public abstract class AbstractExecutorService implements ExecutorService { //为指定的Runnable和value构造一个FutureTask, value表示默认被返回的Future protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } //为指定的Callable创建一个FutureTask protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // 提交Runnable任务 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 通过newTaskFor方法构造RunnableFuture,默认的返回值是null RunnableFuture<Object> ftask = newTaskFor(task, null); // 调用具体实现的execute方法 execute(ftask); return ftask; } // 提交Runnable任务 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); //通过newTaskFor方法构造RunnableFuture,默认的返回值是result RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } //提交Callable任务 public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } // doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用 // tasks是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed,long nanos) throws InterruptedException, ExecutionException, TimeoutException { // tasks空判断 if (tasks==null) throw new NullPointerException(); // 任务数量 int ntasks=tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); // 创建对应数量的Future返回集 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this); try { // 执行异常 ExecutionException ee = null; // System.nanoTime()根据系统计时器当回当前的纳秒值 long lastTime = (timed)? System.nanoTime(): 0; // 获取任务集的遍历器 Iterator<? extends Callable<T>> it = tasks.iterator(); // 向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中 futures.add(ecs.submit(it.next0)); // 修改任务计数器 --ntasks; // 活跃任务计数器 int active =1; for (;;) { // 获取并移除代表已完成任务的Future,如果不存在,返回null Future<T> f = ecs.poll(); if (f == null) { // 没有任务完成,且任务集中还有未提交的任务 if (ntasks > 0) { // 剩余任务计数器减 --ntasks; // 提交任务并添加结果 futures.add(ecs.submit(it.next())); // 活跃任务计数器加 ++active; } // 没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环 else if (active==0) break; else if (timed) { // 获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); // 计算剩余可用时间 long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } else // 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。 // 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回 f = ecs.take(); } // f不为空说明有一个任务完成了 if (f != null) { // 已完成一个任务,所以活跃任务计数减 --active; try { // 返回该任务的结果 return f.get(); } catch (InterruptedException ie) { throw ie; } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } // 如果没有成功返回结果则抛出异常 if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务 for (Future<T> f : futures) f.cancel(true); } } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { // 非定时任务的doInvokeAny调用 return doInvokeAny(tasks, false,0); } catch (TimeoutException cannotHappen) { assert false; return null; } } // 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } // 无超时设置的invokeAll方法 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { // 空任务判断 if (tasks == null) throw new NullPointerException(); // 创建大小为任务数量的结果集 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); // 是否完成所有任务的标记 boolean done = false; try { // 遍历并执行任务 for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } // 遍历结果集 for (Future<T> f : futures) { // 如果某个任务没完成,通过f调用get()方法 if (!f.isDone()) { try { // get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } // 标志所有任务执行完成 done = true; // 返回结果 return futures; } finally { // 假如没有完成所有任务(可能是发生异常等情况),将任务取消 if (!done) for (Future<T> f : futures) f.cancel(true); } } // 超时设置的invokeAll方法 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { // 需要执行的任务集为空或时间单位为空,抛出异常 if (tasks == null || unit == null) throw new NullPointerException(); // 将超时时间转为纳秒单位 long nanos = unit.toNanos(timeout); // 创建任务结果集 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); // 是否全部完成的标志 boolean done = false; try { // 遍历tasks,将任务转为RunnableFuture for (Callable<T> t : tasks) futures.add(newTaskFor(t)); // 记录当前时间(单位是纳秒) long lastTime = System.nanoTime(); // 获取迭代器 Iterator<Future<T>> it = futures.iterator(); // 遍历 while (it.hasNext()) { // 执行任务 execute((Runnable)(it.next())); // 记录当前时间 long now = System.nanoTime(); // 计算剩余可用时间 nanos -= now - lastTime; // 更新上一次执行时间 lastTime = now; // 超时,返回保存任务状态的结果集 if (nanos <= 0) return futures; } for (Future<T> f : futures) { // 如果有任务没完成 if (!f.isDone()) { // 时间已经用完,返回保存任务状态的结果集 if (nanos <= 0) return futures; try { // 获取计算结果,最多等待给定的时间nanos,单位是纳秒 f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } // 计算可用时间 long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } } // 修改是否全部完成的标记 done = true; // 返回结果集 return futures; } finally { // 假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消 if (!done) for (Future<T> f : futures) f.cancel(true); } } }
标签:
原文地址:http://www.cnblogs.com/wxgblogs/p/5435646.html