除了顺序流外,Java 8中也可以对集合对象调用parallelStream方法或者对顺序流调用parallel方法来生成并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样在使用流处理数据规模较大的集合对象时可以充分的利用多核CPU来提高处理效率。不过在一些情况下,并行流未必会比顺序流快,甚至会慢很多,所以了解如何高效的使用并行流也至关重要。此外,我们也可以调用流的sequential方法,将并行流转换为顺序流。
测试并行流的性能
举个例子,对1到1000的整数求和,观察顺序流和并行流的处理时间:
public class StreamExample {
public static void main(String[] args) {
StreamExample.test((n) -> LongStream.rangeClosed(1L, n).reduce(0L, Long::sum), 1000L);
StreamExample.test((n) -> LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum), 1000L);
}
static void test(LongConsumer c, Long n) {
long start = System.currentTimeMillis();
c.accept(n);
long end = System.currentTimeMillis();
System.out.println("处理时间:" + (end - start) + "msc");
}
}
运行结果:
处理时间:9msc
处理时间:484msc
结果和我们预期的不一致,这是因为在处理数据集规模不大的情况下,将流并行化所带来的额外开销比逻辑代码开销还大。我们将数据集扩大:
StreamExample.test((n) -> LongStream.rangeClosed(1L, n).reduce(0L, Long::sum), 1000000000L);
StreamExample.test((n) -> LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum), 1000000000L);
运行结果:
处理时间:2775msc
处理时间:725msc
对于较小的数据量,选择并行流不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
接着对比下使用并行流处理包装类型的求和与原始类型的求和运行时间对比:
StreamExample.test((n) -> Stream.iterate(1L, a -> a + 1L).limit(n).reduce(0L, Long::sum), 1000000000L);
StreamExample.test((n) -> LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum), 1000000000L);
运行结果:
处理时间:21915msc
处理时间:920msc
因为iterate生成的是包装类型的对象,必须拆箱成原始类型才能求和,而且我们很难把iterate分成多个独立块来并行执行。所以可以看到来两者间的运行效率差了将近24倍!
在实际中应避免频繁拆装箱;有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元 素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性 能好,因为它不一定要按顺序来执行。
再看一个例子:
public class StreamExample {
public static void main(String[] args) {
ArrayList<Long> arrayList = Stream.iterate(1L, a -> a + 1L).limit(10000000L).collect(toCollection(ArrayList::new));
LinkedList<Long> linkedList = Stream.iterate(1L, a -> a + 1L).limit(10000000L).collect(toCollection(LinkedList::new));
StreamExample.test(() -> arrayList.parallelStream().reduce(0L, Long::sum));
StreamExample.test(() -> linkedList.parallelStream().reduce(0L, Long::sum));
}
static void test(Runner r) {
long start = System.currentTimeMillis();
r.run();
long end = System.currentTimeMillis();
System.out.println("处理时间:" + (end - start) + "msc");
}
}
@FunctionalInterface
interface Runner {
void run();
}
上面代码对比了使用并行流处理ArrayList和使用并行流处理LinkedList的性能对比,运行结果如下:
处理时间:1258msc
处理时间:7933msc
之所以出现这个结果,是因为ArrayList的拆分效率比LinkedList高得多,前者用不着遍历就可以平均拆分,而后者则必须遍历。
使用并行流要考虑流背后的数据结构是否易于分解。用range方法创建的原始类型流也可以快速分解。
下表列出了流的数据源和可分解性:
数据源 | 可分解性 |
---|---|
ArrayList | 很好 |
LinkedList | 很差 |
IntStream.range | 很好 |
Stream.iterate | 很差 |
HashSet | 好 |
TreeSet | 好 |
总结
总而言之,使用并行流应该考虑以下几点:
-
留意拆装箱成本;
-
流中依赖于元素顺序的操作,在并行流上执行的代价非常大;
-
考虑流的流水线操作总成本,对于较小的数据量,并不适合使用并行流;
-
考虑流背后的数据结构是否易于分解,不易分解的数据结构不适合使用并行流。