标签:poll 怎么 super 完成后 blocking 源码 对象 不可用 原理
当我们需要同时处理一批任务时,并需要在任务完成时,可以获得任务的结果时,我们该怎么办呢。
public class CompletionServiceDemo implements Callable<Integer> { Random r = new Random(); public static void main(String[] args) throws ExecutionException, InterruptedException { new CompletionServiceDemo().process(); } public void process() throws InterruptedException, ExecutionException { Executor executor = Executors.newFixedThreadPool(2); CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor); int taskCount = 5; for (int i = 0; i < taskCount; i++) { completionService.submit(this); } int sum = 0; for (int i = 0; i < taskCount; i++) { Integer res = completionService.take().get(); sum += res; } System.out.println(sum); } @Override public Integer call() throws Exception { int i = r.nextInt(500); Thread.sleep(i); return i; } }
ExecutorCompletionService封装了Executor和BlockingQueue,。首先提交Callable任务到executor,然后将任务封装成QueueingFuture,它是FutureTask子类,然后重写done方法,即在线程执行完将Future对象置入BlockingQueue中。done方法是个回调方法,当FutureTask执行完后会设置result对象,然后就会回调done方法。take和poll方法委托给了 BlockingQueue,它会在结果不可用时阻塞。这样哪个任务先执行完,就能先获得哪个任务对应的结果了
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final BlockingQueue<Future<V>> completionQueue; ... private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } ... public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } .... public Future<V> take() throws InterruptedException { return completionQueue.take(); } ... }
标签:poll 怎么 super 完成后 blocking 源码 对象 不可用 原理
原文地址:https://www.cnblogs.com/hello---word/p/11105297.html