文章目录
概述
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。
它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。
CPU密集型 vs IO密集型
通常来讲,任务可以划分为计算密集型和IO密集型
计算密集型任务
特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。
这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
IO密集型
涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。
对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少 。
简单示例
来看个最简单的求和
public class ForkJoinTest {
private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
public static void main(String[] args) {
System.out.println("result=> " + calc());
System.out.println("result=> " + calcByStream());
}
private static int calc() {
int result = 0;
for (int i = 0; i < data.length; i++) {
result += data[i];
}
return result;
}
private static Long calcByStream() {
return LongStream.rangeClosed(0,10).reduce(0, Long::sum);
}
}
Fork/Join常用的类
- ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制 。
通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
-
RecursiveAction:用于没有返回结果的任务。 比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者CPU执行。 我们可以通过继承来实现一个RecursiveAction
-
RecursiveTask :用于有返回结果的任务。 可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并
-
CountedCompleter: 在任务完成执行后会触发执行一个自定义的钩子函数
-
ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
RecursiveTask 实现 并行计算
要把任务提交到这个池,必须创建 RecursiveTask<R>
的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型 。
要定义 RecursiveTask, 只需实现它唯一的抽象方法compute
protected abstract R compute();
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。
if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}
一般来说并没有确?的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你
事实上,这只不过是著名的分治算法的并行版本而已。
现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
这里用了一个 LongStream 来生成包含前n个自然数的数组,然后创建一个 ForkJoinTask( RecursiveTask 的父类),并把数组传递给 ForkJoinSumCalculator 的公共构造函数。
最后,创建了一个新的 ForkJoinPool ,并把任务传给它的调用方法 。在ForkJoinPool 中执行时,最后一个方法返回的值就是 ForkJoinSumCalculator 类定义的任务结果。
在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。
当把 ForkJoinSumCalculator 任务传给 ForkJoinPool 时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法。
该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的 ForkJoinSumCalculator ,而它们也由ForkJoinPool 安排执行。
因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。
这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。
package com.artisan.java8;
import java.util.concurrent.RecursiveTask;
public class AccumulatorRecursiveTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
private final int[] data;
private final int LIMIT = 3;
public AccumulatorRecursiveTask(int start, int end, int[] data) {
this.start = start;
this.end = end;
this.data = data;
}
@Override
protected Integer compute() {
if ((end - start) <= LIMIT) {
int result = 0;
for (int i = start; i < end; i++) {
result += data[i];
}
return result;
}
int mid = (start + end) / 2;
AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data);
AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid, end, data);
left.fork();
Integer rightResult = right.compute();
Integer leftResult = left.join();
return rightResult + leftResult;
}
}
RecursiveAction
package com.artisan.java8;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicInteger;
public class AccumulatorRecursiveAction extends RecursiveAction {
private final int start;
private final int end;
private final int[] data;
private final int LIMIT = 3;
public AccumulatorRecursiveAction(int start, int end, int[] data) {
this.start = start;
this.end = end;
this.data = data;
}
@Override
protected void compute() {
if ((end - start) <= LIMIT) {
for (int i = start; i < end; i++) {
AccumulatorHelper.accumulate(data[i]);
}
} else {
int mid = (start + end) / 2;
AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data);
AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid, end, data);
left.fork();
right.fork();
left.join();
right.join();
}
}
static class AccumulatorHelper {
private static final AtomicInteger result = new AtomicInteger(0);
static void accumulate(int value) {
result.getAndAdd(value);
}
public static int getResult() {
return result.get();
}
static void rest() {
result.set(0);
}
}
}
Fork/Join执行流程
最佳实践
虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用
- 对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
- 不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。