码迷,mamicode.com
首页 > 其他好文 > 详细

ForkJoin有参无返回值、有参有返回值实例

时间:2017-10-26 21:04:13      阅读:319      评论:0      收藏:0      [点我收藏+]

标签:protect   eal   set   join   cancel   xtend   null   mina   ota   

介绍:

  a . Fork/Join为JKD1.7引入,适用于对大量数据进行拆分成多个小任务进行计算的框架,最后把所有小任务的结果汇总合并得到最终的结果

  b . 相关类

public abstract class RecursiveTask<V> extends ForkJoinTask<V>;
public abstract class RecursiveAction extends ForkJoinTask<Void>;

  c . 其中RecursiveTask在执行有返回值的任务时使用,RecursiveAction在执行没有返回值的任务时使用

实例代码:

  ForkJoin有参无返回值        (继承AbstractService类)

  参数:map

public class UpdatePlayersTotalTimeTask extends RecursiveAction {

    private static final int THRESHOLD_NUM = 30;
    private Map<String,String> players;
    private PlayerTotalTimeService playerTotalTimeService;
    private MongoDao mongoDao;

    public UpdatePlayersTotalTimeTask(Map<String,String> players, MongoDao mongoDao) {
        this.players = players;
        this.mongoDao = mongoDao;
    }

    @Override
    protected void compute() {
        boolean canCompute = players.size() <= THRESHOLD_NUM;
        if (canCompute) {
            playerTotalTimeService = new PlayerTotalTimeService();
            playerTotalTimeService.updatePlayersTotalTime(players, mongoDao);
        } else {
            int middle = players.size() / 2;

            int i = 0;
            Map<String, String> leftMap = new HashMap<>();
            Map<String, String> rightMap = new HashMap<>();

            for (Map.Entry<String, String> entry : players.entrySet()) {
                if (i < middle) {
                    leftMap.put(entry.getKey(), entry.getValue());
                } else {
                    rightMap.put(entry.getKey(), entry.getValue());
                }
                i++;
            }

            UpdatePlayersTotalTimeTask leftTask = new UpdatePlayersTotalTimeTask(leftMap, mongoDao);
            UpdatePlayersTotalTimeTask rightTask = new UpdatePlayersTotalTimeTask(rightMap, mongoDao);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();
        }
    }

  //调用测试
public static void main(String[] args) throws InterruptedException { // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool ForkJoinPool forkjoinPool = new ForkJoinPool(); Map<String, String> map = new HashMap<>();
     map.put("key1","value1"); MongoDao mongoDao
= new MongoDaoImpl(); //生成一个计算任务 UpdatePlayersTotalTimeTask task = new UpdatePlayersTotalTimeTask(map, mongoDao); // 提交可分解的PrintTask任务 forkjoinPool.excute(task); forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束 // 关闭线程池 forkjoinPool.shutdown(); } }

  ForkJoin有参有返回值        (继承RecursiveTask<T>类)

  参数:set<String>   返回值:map<String,String>

 1 public class CalcRoomPlayersTotalTimeTask extends RecursiveTask<Map<String,String>> {
 2     private static final int THRESHOLD_NUM = 15;
 3     private Set<String> roomSet;
 4     private PlayerTotalTimeService playerTotalTimeService;
 5     private MongoDao mongoDao;
 6 
 7     public CalcRoomPlayersTotalTimeTask(Set<String> roomSet, MongoDao mongoDao) {
 8         this.roomSet = roomSet;
 9         this.mongoDao = mongoDao;
10     }
11 
12     @Override
13     protected Map<String,String> compute() {
14 
15         playerTotalTimeService = new PlayerTotalTimeService();
16         //如果任务足够小就计算任务
17         boolean canCompute = roomSet.size() <= THRESHOLD_NUM;
18         if (canCompute) {
19 
20             Map<String,String> playersResult = new HashMap<>();
21 
22             playersResult =  playerTotalTimeService.calcRoomPlayersTotalTime(roomSet, mongoDao);
23 
24             return playersResult;
25         } else {
26             // 如果任务大于阈值,就分裂成两个子任务计算
27             long middle = roomSet.size() / 2;
28             Set<String> leftSet = new HashSet<>();
29             Set<String> rightSet = new HashSet<>();
30 
31             long i = 0;
32             for (String room : roomSet) {
33                 if (i < middle) {
34                     leftSet.add(room);
35                 } else {
36                     rightSet.add(room);
37                 }
38                 i++;
39             }
40 
41             CalcRoomPlayersTotalTimeTask leftTask = new CalcRoomPlayersTotalTimeTask(leftSet,mongoDao);
42             CalcRoomPlayersTotalTimeTask rightTask = new CalcRoomPlayersTotalTimeTask(rightSet,mongoDao);
43 
44             // 执行子任务
45             invokeAll(leftTask,rightTask);
46 
47             HashMap<String,String> result = new HashMap<>();
48 
49             Map<String,String> leftResult = leftTask.join();
50             Map<String,String> rightResult = rightTask.join();
51 
52             result.putAll(leftResult);
53 
54             for (Map.Entry<String, String> entry : rightResult.entrySet()) {
55                 boolean contains = result.containsKey(entry.getKey());
56                 if(contains){
57                     String playerTotalTimeStr =  entry.getValue();
58                     Long playerTotalTimeLong = playerTotalTimeService.timeStringToLong(result.get(entry.getKey()))  + playerTotalTimeService.timeStringToLong(playerTotalTimeStr);
59                     playerTotalTimeStr = playerTotalTimeService.timeLongToString(playerTotalTimeLong);
60                     result.put(entry.getKey(),playerTotalTimeStr);
61                 }else {
62                     result.put(entry.getKey(),entry.getValue());
63                 }
64             }
65 
66             return result;
67         }
68     }
69 
70     public static void main(String[] args) throws InterruptedException {
71         // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
72         ForkJoinPool forkjoinPool = new ForkJoinPool();
73 
74         Set<String> roomSet = new HashSet<>();
75         Map<String,String> map = new HashMap<String,String>();
76         MongoDao mongoDao = new MongoDaoImpl();
77         //生成一个计算任务
78         CalcRoomPlayersTotalTimeTask task = new CalcRoomPlayersTotalTimeTask(roomSet, mongoDao);
79 
80         // 提交可分解的PrintTask任务
81         forkjoinPool.execute(task);
82         map = forkjoinPool.invoke(task);
83         forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
84         // 关闭线程池
85         forkjoinPool.shutdown();
86     }
87 }    

 

 说明:

  a .在有大量计算任务时,此框架方法可进行并行计算效率高,以上示例,可以根据具体的业务需求更改属性及相关方法用于匹配自己的业务逻辑

       b .JDK1.8后由于加入Stream流的操作,集合框架可以使用Collection<E> default Stream<E> parallelStream()的方法转换成并行流进行计算,此时效果与Fork/Join任务同效

       c .ForkJoinPool中的多种方法

1 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);//等待获取结果
2 public void execute(ForkJoinTask<?> task);//异步执行
3 public <T> T invoke(ForkJoinTask<T> task);//执行,获取Future

      d .ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方             法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

if(task.isCompletedAbnormally()) {
    System.out.println(task.getException());
}

 

注:部分内容引自https://segmentfault.com/a/1190000010209196

ForkJoin有参无返回值、有参有返回值实例

标签:protect   eal   set   join   cancel   xtend   null   mina   ota   

原文地址:http://www.cnblogs.com/milicool/p/7736048.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!