文章目录
Pre
我们已经看到了新的 Stream 接口可以以声明性方式处理数据集,无需显式实现优化来为数据集的处理加速。到目前为止,最重要的好处是可以对这些集合执行操作流水线,能够自动利用计算机上的多个内核。
在Java 7之前,并行处理数据集合非常麻烦。
- 第一,你得明确地把包含数据的数据结构分成若干子部分。
- 第二,你要给每个子部分分配一个独立的线程。
- 第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来
Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错 。
Stream 接口可以很轻松的就能对数据集执行并行操作。它允许你声明性地将顺序流变为并行流。 另外我们也要关注流是如何在幕后应用Java 7引入的分支/合并框架的。
同时了解并行流内部是如何工作的很重要,避免因误用而得到意外的(很可能是错的)结果。
什么是并行流
前面我们简要地提到了 Stream 接口可以让你非常方便地处理它的元素:可以通过对收集源调用 parallelStream 方法来把集合转换为并行流。
并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
这样一来,就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。
引入
用一个简单的例子来试验一下这个思想。
假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成一个无限大的数字流,把它限制到给定的数目,然后用对两个数字求和的 BinaryOperator 来归约这个流
用更为传统的Java术语来说,这段代码与下面的迭代等价
这似乎是利用并行处理的好机会,特别是n很大的时候。那怎么入手呢?
你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?
其实根本用不着担心,用并行流的话,这问题就简单多了!
将顺序流转化为并行流
你可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用 parallel 方法:
在上面的代码中,对流中所有数字求和的归纳过程的执行方式和下图差不多
不同之处在于 Stream 在内部分成了几块。因此可以对不同的块独立并行进行归纳操作,如下图所示
最后,同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。
请注意,在现实中,对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。
类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流。
请注意,你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如,你可以这样做:
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
但最后一次 parallel 或 sequential 调用会影响整个流水线。在本例中,流水线会并行执行,因为最后调用的是它。
配置并行流使用的线程池
看看流的 parallel 方法,你可能会想,并行流用的线程是?哪儿来的?有多少个?怎么自定义这个过程呢?
并行流内部使用了默认的 ForkJoinPool ,它默认的线 程 数 量 就是 你 的 处 理器 数 量 , 这个 值 是 由 Runtime.getRuntime().availableProcessors()
得到的。
但 是 可 以 通 过 系统属性 java.util.concurrent.ForkJoinPool.common.parallelism
来改变线程?大小,例如
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
这是一个全局参数,因此它将影响代码中所有的并行流。反过来说,目前还无法专门为某个并行流指定这个值。一般而言,让 ForkJoinPool 的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则不建议修改它。
回到刚才的题目,我们说过,在多核处理器上运行并行版本时,会有显著的性能提升。
现在我们已经用三种不同的方式(迭代式、顺序归纳和并行归纳)做完全相同的操作,那看看谁最快吧!
测量流性能
写个方法来测试下
private static long measureSumPerformance(Function<Long, Long> adder, long limit) {
long fastest = Long.MAX_VALUE;
// 运行10次,找最快的一次
for (int i = 0; i < 10; i++) {
long startTimestamp = System.currentTimeMillis();
adder.apply(limit);
long duration = System.currentTimeMillis() - startTimestamp;
if ((duration < fastest)) {
fastest = duration;
}
}
return fastest;
}
这个方法接受一个函数和一个 long 作为参数。它会对传给方法的 long 应用函数10次,记录每次执行的时间,并返回最短的一次执行时间。
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/3/21 11:38
* @mark: show me the code , change the world
*/
public class ParallelTest {
public static void main(String[] args) {
System.out.println(measureSumPerformance(ParallelTest::adderByNormal,10000000) + "ms");
System.out.println(measureSumPerformance(ParallelTest::adderByStream,10000000)+ "ms");
System.out.println(measureSumPerformance(ParallelTest::adderByStreamParallel,10000000)+ "ms");
}
private static long measureSumPerformance(Function<Long, Long> adder, long limit) {
long fastest = Long.MAX_VALUE;
// 运行10次,找最快的一次
for (int i = 0; i < 10; i++) {
long startTimestamp = System.currentTimeMillis();
adder.apply(limit);
long duration = System.currentTimeMillis() - startTimestamp;
if ((duration < fastest)) {
fastest = duration;
}
}
return fastest;
}
public static Long adderByStream(Long limit){
return Stream.iterate(1L,i->i+1)
.limit(limit)
.reduce(0L, Long::sum);
}
public static Long adderByStreamParallel(Long limit){
return Stream.iterate(1L,i->i+1)
.limit(limit)
.parallel()
.reduce(0L, Long::sum);
}
public static Long adderByNormal(Long limit){
Long result = 0L;
for (int i = 0; i <= limit; i++) {
result += i ;
}
return result ;
}
}
结果
看到结果
求和方法的并行版本比顺序版本不但没有提速,还慢了一些? ???
主要由两个问题
- iterate 生成的是装箱的对象,必须拆箱成数字才能求和
- 我们很难把 iterate 分成多个独立块来并行执行
第二个问题更有意思一点,因为我们必须意识到某些流操作比其他操作更容易并行化。具体来说, iterate 很难分拆成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果,如下图所示。
这意味着,在这个iterate 特定情况下归纳进程不是像我们刚才描述的并行计算那样进行的;整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流拆分为小块来并行处理。
把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上
这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。如果用得不对(比如采用了一个不易并行化的操作,如 iterate ),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的 parallel 操作时,了解背后到底发生了什么是很有必要的。
优化:合理利用多核处理器
那到底要怎么利用多核处理器,用流来高效地并行求和呢?
LongStream.rangeClosed 的方法。这个方法与 iterate 相比有两个优点。
LongStream.rangeClosed
直接产生原始类型的 long 数字,没有装箱拆箱的开销。LongStream.rangeClosed
会生成数字范围,很容易拆分为独立的小块。例如,范围1到20可分为1到5、6到10、11到15和16~20
让我们先看一下它用于顺序流时的性能如何,看看拆箱的消耗到底要不要紧:
public static Long adderByLongStreamRangeClosed(Long limit){
return LongStream.rangeClosed(1,limit)
.reduce(0,Long::sum);
}
执行一下
System.out.println(measureSumPerformance(ParallelTest::adderByLongStreamRangeClosed,10000000)+ "ms");
这个数值流比前面那个用 iterate 工厂方法生成数字的顺序执行版本要快得多,因为数值流避免了非针对性流那些没必要的自动装箱和拆箱操作。
由此可见,选择适当的数据结构往往比并行化算法更重要。但要是对这个新版本应用并行流呢?
public static Long adderByLongStreamRangeClosedParallel(Long limit){
return LongStream.rangeClosed(1,limit)
.parallel()
.reduce(0,Long::sum);
}
调用下
System.out.println(measureSumPerformance(ParallelTest::adderByLongStreamRangeClosedParallel,10000000)+ "ms");
这…
终于,我们得到了一个比顺序执行更快的并行归纳,因为这一次归纳操作可以像刚才并行计算的那个流程图那样执行了。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。
尽管如此,请记住,并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。然而,在使用并行 Stream 加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。