这篇博客一起来研究下使用并行流。借组多核处理器并行执行代码可以显著提高性能,但是并行编程可能十分复杂且容易出错,流API提供的好处之一是能够轻松可靠的并行执行一些操作。请求并行处理流,首先要获得一个并行流。
获取一个并行流有2个方法:
1,Collection定义的parallelStream()方法
2,对顺序流调用parallel()方法。
一下代码演示如果获取一个并行流:
public static void main(String[] args) throws Exception
{
List<Integer> list = new ArrayList<>(4);
list.add(1);
list.add(2);
list.add(3);
list.add(4);
//直接从集合中获取并行流
Stream<Integer> parallelStream = list.parallelStream();
//先获取一个顺序流,然后在顺序流的基础上获取一个并行流
Stream<Integer> stream = list.stream();
Stream<Integer> parallel = stream.parallel(); //Stream中有一个方法可以判断当前的流是不是并行流,一下代码输出全是true,也就是全部都是并行流
System.out.println(parallelStream.isParallel());
System.out.println(stream.isParallel());
System.out.println(parallel.isParallel());
}
获取并行流,有2点要注意,
1,对于并行流,只有在环境支持的情况下才可以实现并行处理
2,在一个顺序流的基础上调用parallel()方法,原来的顺序流也就变成了并行流了。如果调用该方法的流原来已经就是一个并行流了,那么就直接返回该调用流。
当然我们也可以将一个并行流转换成一个顺序流:在并行流上调用sequential()就可以啦。
public static void main(String[] args) throws Exception
{
List<Double> list = new ArrayList<>(4);
list.add(1.0);
list.add(2.0);
list.add(3.0);
list.add(4.0); //获取一个顺序流
Stream<Double> stream = list.stream();
System.out.println(stream.isParallel());
//顺序流上转换成一个并行流
Stream<Double> parallelStream = stream.parallel();
Stream<Double> unordered = parallelStream.unordered();//将流里面的元素设置无序
System.out.println(parallelStream.isParallel());
//并行流上转换成一个顺序流
Stream<Double> sequential = parallelStream.sequential();
System.out.println(sequential.isParallel());
}
- 处理并行流
获得并行流后,如果环境支持并行处理,那么在该流上发生的操作就可以并行执行。区别于顺序流,并行流的相关操作发生在不同的线程上。一般来说,应用到并行流上的任何操作都必须是无状态的,不干预的,并且具有关联性的。这样子可以确保在并行流上执行操作得到的结果,和在顺序流上执行相同操作得到的结果相同。
上一篇博客中,我们整理到了缩减操作,reduce()方法的第3个方法,就是专门用来指定如何合并并行结果的。
reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)在这个版本中,第三个函数将第二个函数得到的2个值合并起来。
以下代码使用并行流,计算一个集合中元素的积:
public static void main(String[] args) throws Exception
{
List<Integer> list = new ArrayList<>(4);
list.add(1);
list.add(2);
list.add(3);
list.add(4);
//直接从集合中获取并行流,然后执行缩减操作,下面的代码输出24
System.out.println(list.parallelStream().reduce(1, (a, b) -> a * b, (a, b) -> a * b));
}
处理并行流,有2点要注意:
1,在对并行流做缩减操作时,reduce()函数的第2个参数和第3个参数可以是做相同的操作,也可以是不同的操作,在有些情况下,这2个参数做的操作必须是不同的。来看下面这个例子:
public static void main(String[] args) throws Exception
{
List<Double> list = new ArrayList<>(4);
list.add(1.0);
list.add(2.0);
list.add(3.0);
list.add(4.0);
//直接从集合中获取并行流,然后执行缩减操作,下面的代码输出24
//下面的代码输出4.898979485566357,这里是顺序流
System.out.println(list.stream().reduce(1.0, (a, b) -> a * Math.sqrt(b)));
//下面的代码输出1.8612097182041991,这里是并行流,正确
System.out.println(list.parallelStream().reduce(1.0, (a, b) -> a * Math.sqrt(b), (a, b) -> a * b));
//下面的代码输出1.8612097182041991,这里是并行流,错误
System.out.println(list.parallelStream().reduce(1.0, (a, b) -> a * Math.sqrt(b), (a, b) -> a * Math.sqrt(b))); }
上面的代码中,累加器函数将2个元素的平方根相乘,但是合并器则将部分结果相乘,所以这2个函数是不同的,如果累加器函数和合并器函数是同一个函数,这将导致错误,因为当合并2个部分结果的时,相乘的是它们的平方根,而不是部分结果自身。值得注意的是,上面对reduce()方法的调用中,如果将流改成顺序流,操作将肯定得到正确的结果,所以我们在测试的时候也可以取值顺序流操作的结果来作为检验标准。
2,在使用并行操作时,关于流还有一点需要注意就是元素的位置。流可以时候有序的,也可以是无序的。一般来说,如果数据源是有序的,那么流也就是有序的。但是,在使用并行流的时候,有时候允许流是无序的这样子可以死获得性能上的提升。当并行流无序时,流的每个部分都可以被单独操作,而不是与其他部分协调。当操作的顺序不重要时,可以调用unordered()方法来指定无序行为。
其实有些api本身就是有序的或者说无序的,比如forEach()方法不一定保留并行流的顺序,但是在对并行流的每个元素执行操作时希望保留顺序的话,可以使用forEachOrdered()方法。