什么是ForkJoin?
ForkJoin 在JDK1.7,并行执行任务!提高效率~。在大数据量速率会更快!
大数据中:MapReduce 核心思想->把大任务拆分为小任务!
ForkJoin 特点: 工作窃取!
实现原理是:双端队列!从上面和下面都可以去拿到任务进行执行!
如何使用ForkJoin?
-
1、通过ForkJoinPool来执行
-
2、计算任务 execute(ForkJoinTask<?> task)
-
3、计算类要去继承ForkJoinTask;
ForkJoin的计算类!
package com.ogj.forkjoin;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {
private long star;
private long end;
//临界值
private long temp=1000000L;
public ForkJoinDemo(long star, long end) {
this.star = star;
this.end = end;
}
/**
* 计算方法
* @return Long
*/
@Override
protected Long compute() {
if((end-star)<temp){
Long sum = 0L;
for (Long i = star; i < end; i++) {
sum+=i;
}
// System.out.println(sum);
return sum;
}else {
//使用forkJoin 分而治之 计算
//计算平均值
long middle = (star+ end)/2;
ForkJoinDemo forkJoinDemoTask1 = new ForkJoinDemo(star, middle);
forkJoinDemoTask1.fork(); //拆分任务,把线程任务压入线程队列
ForkJoinDemo forkJoinDemoTask2 = new ForkJoinDemo(middle, end);
forkJoinDemoTask2.fork(); //拆分任务,把线程任务压入线程队列
long taskSum = forkJoinDemoTask1.join() + forkJoinDemoTask2.join();
return taskSum;
}
}
}
测试类!
package com.ogj.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}
/**
* 普通计算
*/
public static void test1(){
long star = System.currentTimeMillis();
long sum = 0L;
for (long i = 1; i < 20_0000_0000; i++) {
sum+=i;
}
long end = System.currentTimeMillis();
System.out.println("sum="+"时间:"+(end-star));
System.out.println(sum);
}
/**
* 使用ForkJoin
*/
public static void test2() throws ExecutionException, InterruptedException {
long star = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 20_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long aLong = submit.get();
System.out.println(aLong);
long end = System.currentTimeMillis();
System.out.println("sum="+"时间:"+(end-star));
}
/**
* 使用Stream 并行流
*/
public static void test3(){
long star = System.currentTimeMillis();
//Stream并行流()
long sum = LongStream.range(0L, 20_0000_0000L).parallel().reduce(0, Long::sum);
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("sum="+"时间:"+(end-star));
}
}
.parallel().reduce(0, Long::sum使用一个并行流去计算整个计算,提高效率。
reduce方法的优点: