标签:class code java http tar com
JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。举个例子:现在要向服务器发送HTTP请求,服务端对于每个请求都需要做很多额外操作,很消耗时间,则可以将每个请求接受之后,提交到CompletionService异步处理,等执行完毕之后,在返回给客户端
package com.yf.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CompletionServiceTest { private ExecutorService threadPool = Executors.newCachedThreadPool(); private CompletionService<Response> completionService = new ExecutorCompletionService<Response>( Executors.newCachedThreadPool()); public CompletionServiceTest() { new Thread() { public void run() { while (true) { try { Future<Response> f = completionService.take(); /** * 获取响应信息,返回给客户端 * 如果completionService任务队列为空,此处将阻塞 */ Response resp = f.get(); System.out.println(resp.getId()); } catch (Exception e) { System.out.println("Exception happened:"+e.getMessage()); } } }; }.start(); } class Request{ private int rid; private String body; public int getRid() { return rid; } public void setRid(int rid) { this.rid = rid; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class Response { private int id; private String body; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class HTTPExecutor { public Future<Response> execute(final Request request) { Future<Response> f = threadPool.submit(new Callable<Response>() { public Response call() throws Exception { Response response = new Response(); Thread.currentThread().sleep(3000); response.setId(request.getRid()); response.setBody("response"); return response; } }); return f; } } public void submitHTTP(final Request request) { completionService.submit(new Callable<Response>() { public Response call() throws Exception { return new HTTPExecutor().execute(request).get(); } }); } public static void main(String[] args) { CompletionServiceTest t = new CompletionServiceTest(); for (int i = 0; i < 10; i++) { /** * 发送10个HTTP请求 */ Request request =t.new Request(); request.setRid(i); request.setBody("request"); t.submitHTTP(request); } } }
关键代码如下:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
浅析Java CompletionService,布布扣,bubuko.com
标签:class code java http tar com
原文地址:http://blog.csdn.net/yangfei001/article/details/30312719