7. Java8新特性-并行数据处理(parallel)

在JDK7之前,并行处理数据集合非常麻烦。首先需要自己明确的把包含数据的数据结构分成若干个子部分,第二需要给每个子部分分配一个独立的线程;第三需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分合并起来。

Doug Lea 在JDK7中引入了fork/join框架,让这些操作更稳定,更不易出错。

本节主要内容:
1. 用并行流并行处理数据
2. 并行流的性能分析
3. fork/join框架
4. 使用Spliterator分割流

学完本节期望能达到:
1. 熟练使用并行流,来加速业务性能
2. 了解流内部的工作原理,以防止误用的情况
3. 通过Spliterator控制数据块的划分方式

并行流

可以通过对数据源调用parallelStream方法来将源转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样可以自动将工作负荷转到多核中并行处理。

考虑下面一个实现:给定正整数n,计算 1 + 2 + … n的和。
使用stream的实现:

private static long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);
}

将上面的顺序流转换为并行流,实现如下:

private static long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
}

即通过调用方法parallel可将顺序流转换为并行流。

但需要注意的是流仅在终端操作时才开始执行,所以当前流是顺序流还是并行流以最靠近终端操作的流类型为准,示例:

list.stream().parallel().filter(e -> e.age > 20).sequential().map(...).parallel().collect(...);

此种情况并不会按预想的先使用并行流执行过滤,再按顺序流执行映射转换。而是整个流水线操作都按并行流执行。

配置并行流使用的线程池

并行流内部使用了默认的ForkJoinPool, 它默认的线程数量就是处理器的数量(Runtime.getRuntime().availableProcessors())。也可以通过设置系统属性来改变它(System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”))。但它是一个全局设置,会影响所有的并行流,一般而言线程数等于处理器数量是一个合理的数值,不需要修改。

测试流性能

一般而言,同一个功能给我们的感觉是并行流性能会比顺序流性能更好。然而在软件工程中,优化性能的黄金准则是:测量。我们开发了程序,用来测量4种写法的累加,看看性能如何:

@Slf4j
public class SumSample {

    /**
     * 顺序流、并行流性能测试
     * 实现1~1亿整型数字累加
     *
     */
    public static void main(String[] args) {

        CostUtil.cost(() -> log.info("==> for: 1 + ... + 100_000_000, result: {}", forSum(100_000_000)));

        log.info("================================================================================");

        CostUtil.cost(() -> log.info("==> sequential: 1 + ... + 100_000_000, result: {}", sequentialSum(100_000_000)));

        log.info("================================================================================");

        CostUtil.cost(() -> log.info("==> parallel: 1 + ... + 100_000_000, result: {}", parallelSum(100_000_000)));

        log.info("================================================================================");

        CostUtil.cost(() -> log.info("==> longParallel: 1 + ... + 100_000_000, result: {}", longParallelSum(100_000_000)));


    }

    /**
     * 内部迭代方式实现累加
     */
    private static long forSum(long n) {
        long result = 0;
        for (int i = 1; i <= n; i ++) {
            result += i;
        }

        return result;
    }

    /**
     * 顺序流实现累加
     */
    private static long sequentialSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);
    }

    /**
     * 并行流实现累加
     */
    private static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
    }

    /**
     * long原生流范围实现累加
     */
    private static long longParallelSum(long n) {
        return LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum);
    }
}
// result:
2022-01-18 10:53:59.035 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> for: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:53:59.039 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 58
2022-01-18 10:53:59.039 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-================================================================================
2022-01-18 10:54:00.459 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> sequential: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:54:00.459 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 1420
2022-01-18 10:54:00.459 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-================================================================================
2022-01-18 10:54:04.627 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> parallel: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:54:04.628 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 4167
2022-01-18 10:54:04.628 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-================================================================================
2022-01-18 10:54:04.688 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> longParallel: 1 + ... + 100_000_000, result: 5000000050000000
2022-01-18 10:54:04.688 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 60

使用四种方法实现1~1亿个数的累加,这是在i7 2.4GHz 6core/12threads CPU的执行结果。让人很意外,并非是并行流性能最好,反而是最差的,最朴实的for循环单线程性能最佳。

原因:

  1. iterate生成的是装箱对象,必须拆箱成数字才能求和。 这一点好理解因为iterate生成的流元素是Long类型,进行累加计算下一个流元素需要先拆箱,计算完再装箱。
  2. iterate很难分成多个独立的块来并行执行。原因是应用这个函数都要依赖前一次应用的结果,即本质上iterate需要顺序执行。虽然标记了流是并行流,但并不意味着一定能并行执行,反而增加了额外开销,影响了性能。

通过上面的比较需要意识到:并行编程比较复杂,有时候甚至违反直觉。如果用的不对(如本例,采用了一个不易并行化的操作iterate),甚至会让性能更差。所以了解parallel方法背后的执行细节非常必要。

LongStream.rangeClosed 代替 iterate

仅高效求和的示例,可用LongStream.rangeClosed高效替代iterate实现并行计算。它的优点是:

  1. LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销
  2. LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。

通过示例演示它的并行执行性能比同样是并行流的iterate版本要快了70倍。可见它有效利用了并行。

为什么并行流还是比for慢?

上面的执行结果可以看出LongStream.rangeClosed的性能还是比for略慢一点,原因是:

并行化是有代价的,并行过程中需要对流做递归划分,把流的归纳操作分配到不同的线程,最后合并。且多个核心之间移动数据的代价也很大。

正确使用并行流

使用并行流加速性能需要确保用对,如果计算结果是错误的,再快也没意义。
误用并行流而产生错误的首要原因是使用的算法改变了某些共享状态。 如下面示例:

class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}

public static long sideEffectSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
        return accumulator.total;
    }

//result:
2022-01-18 11:40:16.943 [main] INFO  win.elegentjs.java8.stream.parallel.SumSample-==> sideEffectSum: 1 + ... + 100_000_000, result: 1037016191509285
2022-01-18 11:40:16.944 [main] INFO  win.elegentjs.util.CostUtil-==> cost time: 40

从上面示例看出虽然很快,但结果是错误的。 原因是total += value非原子操作,出现了竞态条件。如果使用同步来修复,就失去了并行的意义。 所以写并行流时一定要考虑多个线程是否会修改共享对象的可变状态。

高效使用并行流

一些高效使用并行流的建议:

  1. 如果有疑问,进行测试。并行流并不总是更快,且有时候跟直觉不一致。 使用适当的基准进行测试来检查其性能。
  2. 留意自动拆装箱。 频繁的自动拆装箱非常损耗性能。此种情况时尽量使用原始数据流来应对: IntStream, LongStream, DoubleStream。
  3. 有些操作天生并行流的性能就比顺序流差,如依赖元素顺序的操作:limit(), findFirst()等。
  4. 需要考虑流操作流水线的总计算成本。 设N为元素的总数,Q是一个元素通过流水线的大致处理成本,则N * Q 是对总成本的粗略估计。 Q值越高意味着使用并行流时的性能更好的可能性更大。 (使用for循环计算1 … N比并行流块原因就是Q太小,虽然N已经够大了)
  5. 对于较小的数据量,选择并行流几乎从来都不是最优的。因为并行本身开销就大,如果元素不多无法覆盖并行本身的开销。
  6. 需要考虑背后的数据结构是否易于分解。如用range工厂方法创建的原始流可以快速分解。 后面可以自定义Spliterator来完全掌控分解过程。
  7. 还要考虑终端操作中合并步骤的代价大小,如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价会超过通过并行得到的性能提升。

一些常见的数据源的可分解性汇总:
7. Java8新特性-并行数据处理(parallel)

Fork/Join框架

想要正确的使用并行流,了解它背后的实现原理至关重要。 并行流背后就是采用的Fork/Join框架。
// TODO: 待补充

Spliterator

// TODO: 待补充

小结

  1. 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
  2. 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的
    行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
  3. 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,
    或处理单个元素特别耗时的时候。
  4. 从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎
    总是比尝试并行化某些操作更为重要。
  5. 分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程
    上执行,然后将各个子任务的结果合并起来生成整体结果。
  6. Spliterator定义了并行流如何拆分它要遍历的数据。
上一篇:C# Parellel.For 和 Parallel.ForEach


下一篇:常用的JVM参数选项