码迷,mamicode.com
首页 > 其他好文 > 详细

并行流处理数据

时间:2020-01-01 18:49:31      阅读:82      评论:0      收藏:0      [点我收藏+]

标签:compute   turn   参数   多个   final   最好   需要   reduce   range   

一个接受数字n作为参数,并返回从1到n的所有数字之和。

    public static int intSum(int n) {
        return Stream.iterate(1, i -> i + 1)
                .limit(n)
                .reduce(0, Integer::sum);
    }

 

这是一个简单的顺序流,如果n极大,那么显然单线程是有问题的,所以引入了并行概念。

    public static int parallelIntSum(int n) {
        return Stream.iterate(1, i -> i + 1)
                .limit(n)
                .parallel()
                .reduce(0, Integer::sum);
    }

图示

技术图片

 

 

 

我们可以混合使用sequence和parallel吗?NO! 举一个错误的例子。

    public static int complexSum(int n) {
        return IntStream.iterate(0, i -> i + 1)
                .parallel()
                .filter(i -> i % 2 == 0)
                .sequential()
                .map(x -> x * 2)
                .parallel()
                .reduce(0, Integer::sum);
    }

其实这个例子只会执行最后一个parallel。

 

这种并发的资源如何分配呢?

并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().available- Processors()得到的。 

但是你可以通过java.util.concurrent.ForkJoinPool.common. parallelism来改变线程大小,如所: 

   System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); 

这是一个全局设置,因此它将代码中所有的并行流。让ForkJoinPool的大小等于处理器数量是个不错的默认值, 非你有很好的理由,则我们建你不要修改它。

 

实际上前面举得并行例子真的快吗?不,因为有两个问题导致它很慢:

  iterate实际生成的是boxed对象,进行运算需要拆箱

  我们很难把iterate分成多个块来并行

 

使用更加针对的方法

    public static int parallelIntSum(int n) {
        return IntStream.rangeClosed(1, n)
                .parallel()
                .reduce(0, Integer::sum);
    }

其实这个不需要刻意记忆,就是我们日常的静态编程,我们对流也进行一次类型化就好了,实际上我觉得这个工作可以交给底层处理,但是可能是为了区分动态编程还是把控制交给使用者。

 

实际上,并行化是有开销的,首先是流的分割,再就是子流的线程分配,再就是数据的合并...最好是进行一定的测试确保性能,不建议直接使用。

 

对于并行流的使用要确保使用不能改变共享状态。

    public static class Accumulator {
        private long total = 0;
        public void add(long value) {
            total += value;
        }
    }
    public static long sideEffectParallelSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
        return accumulator.total;
    }

这是完全错误的,因为total是所有操作Accumulator线程的共享属性,不是原子操作。

 

虽然顺序流到并行流现在看起来很简单,但是要确保是否有必要。比如需要考虑的一个因素,背后的数据结构支持流拆分的性能如何

技术图片

 

 

fork/join框架


import static lambdasinaction.chap7.ParallelStreamsHarness.FORK_JOIN_POOL;

public
class ForkJoinSumCalculator extends RecursiveTask<Long> { public static final long THRESHOLD = 10_000; private final long[] numbers; private final int start; private final int end; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2); leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end); Long rightResult = rightTask.compute(); Long leftResult = leftTask.join(); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return FORK_JOIN_POOL.invoke(task); } }

这是自行实现的一个框架,实际上来讲这个Pool我们是单例的,因为不适合使用者随意更改。这其实就是一个分治的线程级别的实现。

 技术图片

 

 

使用的注意地方,最关键的就是join会阻塞,所以你要保证所有的子任务都ok后再调用join,否则阻塞会导致性能问题。

 

实际上的线程是多个的,但是从算法图解上看,并不是一个理想的并行方案,因此有了工行窃取。

技术图片

 

 end

并行流处理数据

标签:compute   turn   参数   多个   final   最好   需要   reduce   range   

原文地址:https://www.cnblogs.com/CherryTab/p/12129378.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!