1 Fork-Join概念
从JDK7提供的另一种并行框架,它是一种分治编程:将任务分解,分解完了,挨个治理,最后把结果合并。适用于整体任务量不好确定的场合(最小任务可确定)。
1.1 Fork-Join主要类
- ForkJoinPool:任务池
- RecursiveAction、RecursiveTask:定义具体的任务,RecursiveTask的compute方法有返回值,RecursiveAction的compute方法无返回值,可根据任务需要选择哪一种。
2 Fork-Join用法
使用Fork-Join框架计算1~10000000的和:
SumTest:
package com.antique.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class SumTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1 创建执行线程池
ForkJoinPool pool = new ForkJoinPool();
//ForkJoinPool pool = new ForkJoinPool(4);
// 2 创建任务
SumTask task = new SumTask(1, 10000000);
// 3 提交任务
ForkJoinTask<Long> result = pool.submit(task);
// 4 等待结果
do {
System.out.println("Main: Active 线程数量: " + pool.getActiveThreadCount());
System.out.println("Main: 并行度: " + pool.getParallelism());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
System.out.println(result.get().toString());
}
}
SumTask:
package com.antique.forkjoin;
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Long> {
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
public static final int THREAD_HOLD = 5;
@Override
protected Long compute() {
Long sum = 0L;
// 如果任务足够小,就直接执行
boolean canCompute = (end - start) <= THREAD_HOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 任务大于阈值 分裂为两个任务
int middle = (start + end)/2;
SumTask subTask1 = new SumTask(start, middle);
SumTask subTask2 = new SumTask(middle+1, end);
// 提交两个子任务
invokeAll(subTask1, subTask2);
// 等待两个子任务结束 并拿到结果
Long sum1 = subTask1.join();
Long sum2 = subTask2.join();
sum = sum1 + sum2;
}
return sum;
}
}
运行结果:
这里只提交了一个任务,但是在程序运行的过程中,这个大的任务会不断的被分解,通常都是分解成两个,或者是更多个。然后每个子任务运行结束以后,把每个子任务的结果收集回来,再向上呈现出最终的结果。