码迷,mamicode.com
首页 > 其他好文 > 详细

jdk8之CompletableFuture与CompletionService

时间:2020-01-29 14:23:41      阅读:142      评论:0      收藏:0      [点我收藏+]

标签:src   tar   抛出异常   callable   hand   main   相对   completed   port   

  JDK 8的CompletionService相对于之前版本的Future而言,其优势是能够尽可能快的得到执行完成的任务。例如有4个并发任务要执行,正常情况下通过Future.get()获取,通常只能按照提交的顺序获得结果,如果最后提交的最先完成的话,总执行时间会长很多。而通过CompletionService能够降低总执行时间,如下所示:

package com.hundsun.ta.base.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author zjhua
 * @description
 * @date 2020/1/28 21:07
 */
public class CompletionServiceTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        testFuture();
        testCompletionService();
    }

    //结果的输出和线程的放入顺序 有关(如果前面的没完成,就算后面的哪个完成了也得等到你的牌号才能输出!),so阻塞耗时
    public static void testFuture() throws InterruptedException, ExecutionException {
        long beg = System.currentTimeMillis();
        System.out.println("testFuture()开始执行:" + beg);
        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future<String>> result = new ArrayList<Future<String>>();
        for (int i = 5; i > 0; i--) {
            Future<String> submit = executor.submit(new Task(i));
            result.add(submit);
        }
        executor.shutdown();
        for (int i = 0; i < 5; i++) {//一个一个等待返回结果
            Thread.sleep(500);
            System.out.println("线程" + i + "执行完成:" + result.get(i).get());
        }
        System.out.println("testFuture()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis()-beg));
    }

    //结果的输出和线程的放入顺序 无关(谁完成了谁就先输出!主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序),so很大大缩短等待时间
    private static void testCompletionService() throws InterruptedException, ExecutionException {
        long beg = System.currentTimeMillis();
        System.out.println("testFuture()开始执行:" + beg);
        ExecutorService executor = Executors.newCachedThreadPool();
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);
        for (int i = 5; i > 0; i--) {
            completionService.submit(new Task(i));
        }
        executor.shutdown();
        for (int i = 0; i < 5; i++) {
            // 检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
            Future<String> future = completionService.take(); //这一行没有完成的任务就阻塞
            Thread.sleep(500);
            System.out.println("线程" + i + "执行完成:" + future.get());   // 这一行在这里不会阻塞,引入放入队列中的都是已经完成的任务
        }
        System.out.println("testFuture()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis() - beg));
    }

    private static class Task implements Callable<String> {

        private volatile int i;

        public Task(int i) {
            this.i = i;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(i*500);
            return "任务 : " + i;
        }

    }
}
// 执行结果
testFuture()开始执行:1580217876088
线程0执行完成:任务 : 5
线程1执行完成:任务 : 4
线程2执行完成:任务 : 3
线程3执行完成:任务 : 2
线程4执行完成:任务 : 1
testFuture()执行完成:1580217880596,4508
testFuture()开始执行:1580217880596
线程0执行完成:任务 : 1
线程1执行完成:任务 : 2
线程2执行完成:任务 : 3
线程3执行完成:任务 : 4
线程4执行完成:任务 : 5
testFuture()执行完成:1580217883605,3009

使用传统的Future,需要执行4.5秒,使用CompleteService,则只需要3秒。但是如果子线程执行完成后不需要执行其他任务,则意义不是很大。

除了上述场景外,CompleteService还适合于N选1的场景,例如同时从两个渠道查询数据,返回任何一个可用的即可,从Future就实现不了。

CompletionService的定义如下:

技术图片

其实现也比较简单,利用了ThreadPoolExecutor。

技术图片

看完CompleteService,再来看CompleteFuture。它实现了Future接口和CompletionStage接口(他代表某个异步或同步计算的阶段,也就是计算流水线的一个节点,这样多个CompletionStage可以作为和过滤器一样链式执行,一个计算单元完成后出发下一个计算单元),和CompleteService的区别在于CompleteFuture知道当前完成的是谁,并采用编程式回调提高代码可读性,CompleteService只知道哪个最快完成了,具体是谁需要应用自己去关联上下文。同时在编程模式上,很大程度上利用了JDK 8的Lambda表达式,这样一个完整服务的多个步骤就能够和同步的的写法一样自然,不用为了实现异步处理而将逻辑合并为一个超大的方法。在并行处理中,如果每个分片的处理时间相差比较大,例如有些1分钟,有些3分钟,有些10秒钟,这样将每个服务的粒度细分为很多个子步骤,每个服务的子步骤通过CompleteFuture串联起来,整体的完成时间就能够下降,每个分片的处理完成时间也将趋于接近。同时在异常的处理上,CompleteFuture也要友好的多。

技术图片

 

 下面来看一个例子:

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-" + count++);
    }
});
static void thenApplyAsyncWithExecutorExample() {
// 简单的异步执行 CompletableFuture
<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }

异常处理:

static void completeExceptionallyExample() {
    CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));  // 模拟抛出异常
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    assertEquals("message upon cancel", exceptionHandler.join());
}

链式调用:

public void completableFutureApplyAsync() {
 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
 ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
 CompletableFuture<Integer> completableFuture = 
 CompletableFuture
      .supplyAsync(this::findAccountNumber,newFixedThreadPool)//will run on thread obtain from newFixedThreadPool
      .thenApplyAsync(this::calculateBalance,newSingleThreadScheduledExecutor) //will run on thread obtain from newSingleThreadScheduledExecutor
      .thenApplyAsync(this::notifyBalance);//will run on thread obtain from common pool
   Integer balance = completableFuture.join();
    assertEquals(Integer.valueOf(balance), Integer.valueOf(100));
    }

  就实际应用而言,CompletableFuture的作用更加有价值的地方在于其他的一些方法,比如allOf、anyOf、xxxToEither等需要多对一的场景,他们可以大大简化代码。

参考:

https://dzone.com/articles/20-examples-of-using-javas-completablefuture

jdk8之CompletableFuture与CompletionService

标签:src   tar   抛出异常   callable   hand   main   相对   completed   port   

原文地址:https://www.cnblogs.com/zhjh256/p/11829397.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!