JDK8中,提供了并行流和串行流,使用parallel()和sequential()来处理,parallel()为并行流sequential()为串行流,两者可以相互转换,以最后一个为准
LongStream.rangeClosed(0,1000000).sequential().parallel().reduce((x,y)->x+y);
以上代码示例就是并行流和串行流的使用,由于parallel在后,所以是以并行流运算。
其实JDK8的并行流和串行流并不复杂,但是想要了解其历史,就要从单线程、多线程、JDK7的fork/join框架一一说起。
首先说单线程问题,如果是单线程,肯定是没有多线程运行快的(通常情况下如此,如果仅此是1+1的操作,多线程因为线程切换等原因,反而会更慢),况且fork/join使用的是线程窃取模式进行处理的,相比多线程更有优势,可以更好的使用内存。
但是为什么fork/join没有被大量使用呢,主要是因为fork/join写法太繁琐,下面就举例说明fork/join的写法
1、fork/join
首先创建一个ForkJoinDemo对象,对数据进行拆分(fork)运算后汇总(join)
package com.example.jdk8demo; import java.util.concurrent.RecursiveTask; public class ForkJoinDemo extends RecursiveTask<Long> { private long start; private long end; private static final long mm = 100; public ForkJoinDemo(long start,long end){ this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; if(length <= mm){ long sum = 0; for (long i=start;i<= end;i++){ sum +=i; } return sum; }else{ long mid = (start+end)/2; ForkJoinDemo left = new ForkJoinDemo(start,mid); left.fork(); ForkJoinDemo right = new ForkJoinDemo(mid+1,end); right.fork(); return left.join() + right.join(); } } }
测试方法:
public void test1(long num){ Instant start = Instant.now(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> forkJoinTask = new ForkJoinDemo(0,num); long sum = forkJoinPool.invoke(forkJoinTask); Instant end = Instant.now(); log.info("fork/join运行时间【{}】,运行结果【{}】",Duration.between(start,end).toMillis(),sum); }
可以发现使用fork/join的写法非常麻烦
JDK8提供的串行流和并行流的操作就非常方便
2、串行流
public void test3(long num){ Instant start = Instant.now(); OptionalLong optionalLong = LongStream.rangeClosed(0,num).sequential().reduce((x,y)->x+y); Instant end = Instant.now(); log.info("串行流运行时间【{}】,运行结果【{}】",Duration.between(start,end).toMillis(),optionalLong.getAsLong()); }
3、并行流
public void test4(long num){ Instant start = Instant.now(); OptionalLong optionalLong = LongStream.rangeClosed(0,num).parallel().reduce((x,y)->x+y); Instant end = Instant.now(); log.info("并行流运行时间【{}】,运行结果【{}】",Duration.between(start,end).toMillis(),optionalLong.getAsLong()); }
4、为了演示执行时间,再添加一个单线程测试
public void test2(long num){ long sum = 0; Instant start = Instant.now(); for(int i=0;i<=num;i++){ sum+=i; } Instant end = Instant.now(); log.info("单线程for循环运行时间【{}】,运行结果【{}】",Duration.between(start,end).toMillis(),sum); }
5、测试
由于都是数据的累加操作,因此多线程由于线程切换等原因,会造成比单线程执行慢的假象,为了排除这一假象,直接累加到20亿的运行时间作为参考
@Test public void test(){ long num = 50*10000*10000L; test1(num); test3(num); test4(num); test2(num); }
运行结果:
单线程运行了一分钟,还没有出结果
fork/join运行时间3899毫秒,串行流2081毫秒,并行流1532毫秒,可见性能提升还是非常明显的。