码迷,mamicode.com
首页 > 编程语言 > 详细

Fork/Join-Java并行计算框架

时间:2017-09-03 00:30:44      阅读:251      评论:0      收藏:0      [点我收藏+]

标签:直接   interrupt   pen   execution   import   public   ret   问题   返回   

Java在JDK7之后加入了并行计算的框架Fork/Join,可以解决我们系统中大数据计算的性能问题。Fork/Join采用的是分治法,Fork是将一个大任务拆分成若干个子任务,子任务分别去计算,而Join是获取到子任务的计算结果,然后合并,这个是递归的过程。子任务被分配到不同的核上执行时,效率最高。伪代码如下:

Result solve(Problem problem) {

    if (problem is small)

        directly solve problem

    else {

        split problem into independent parts

        fork new subtasks to solve each part

        join all subtasks

        compose result from subresults

    }

}

Fork/Join框架的核心类是ForkJoinPool,它能够接收一个ForkJoinTask,并得到计算结果。ForkJoinTask有两个子类,RecursiveTask(有返回值)和RecursiveAction(无返回结果),我们自己定义任务时,只需选择这两个类继承即可。
 
下面来看一个实例:计算一个超大数组所有元素的和。代码如下:

import java.util.Arrays;

import java.util.Random;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveTask;

/**

 * @author: shuang.gao  Date: 2015/7/14 Time: 8:16

 */public class SumTask extends RecursiveTask<Integer> {

 

    private static final long serialVersionUID = -6196480027075657316L;

 

    private static final int THRESHOLD = 500000;

 

    private long[] array;

 

    private int low;

 

    private int high;

 

    public SumTask(long[] array, int low, int high) {

        this.array = array;

        this.low = low;

        this.high = high;

    }

 

    @Override

    protected Integer compute() {

        int sum = 0;

        if (high - low <= THRESHOLD) {

            // 小于阈值则直接计算

            for (int i = low; i < high; i++) {

                sum += array[i];

            }

        } else {

            // 1. 一个大任务分割成两个子任务

            int mid = (low + high) >>> 1;

            SumTask left = new SumTask(array, low, mid);

            SumTask right = new SumTask(array, mid + 1, high);

 

            // 2. 分别计算

            left.fork();

            right.fork();

 

            // 3. 合并结果

            sum = left.join() + right.join();

        }

        return sum;

    }

 

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        long[] array = genArray(1000000);

 

        System.out.println(Arrays.toString(array));

 

        // 1. 创建任务

        SumTask sumTask = new SumTask(array, 0, array.length - 1);

 

        long begin = System.currentTimeMillis();

 

        // 2. 创建线程池

        ForkJoinPool forkJoinPool = new ForkJoinPool();

 

        // 3. 提交任务到线程池

        forkJoinPool.submit(sumTask);

 

        // 4. 获取结果

        Integer result = sumTask.get();

 

        long end = System.currentTimeMillis();

 

        System.out.println(String.format("结果 %s 耗时 %sms", result, end - begin));

    }

 

    private static long[] genArray(int size) {

        long[] array = new long[size];

        for (int i = 0; i < size; i++) {

            array[i] = new Random().nextLong();

        }

        return array;

    }

}

Fork/Join-Java并行计算框架

标签:直接   interrupt   pen   execution   import   public   ret   问题   返回   

原文地址:http://www.cnblogs.com/m2492565210/p/7468393.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!