本文参考自CSDN作者 YourBatman 的ForkJoinPool线程池的使用以及原理和知乎作者 欣然 的文章高并发之Fork/Join框架使用及注意事项。
- ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
- ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。
1. 实例
计算从1到100000的累计和:
package ecnu.cn;
import java.util.concurrent.*;
public class SumTask extends RecursiveTask<Long> {
private int start, end;
public static void main(String[] args) {
SumTask sumTask = new SumTask(1, 100000);
ForkJoinPool pool = new ForkJoinPool();
Future<Long> result = pool.submit(sumTask);
try {
System.out.println(result.get());;
} catch (Exception e) {
e.printStackTrace();
}
}
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
Long sum = 0L;
if (end - start < 10) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
SumTask taskLeft = new SumTask(start, middle);
SumTask taskRight = new SumTask(middle + 1, end);
// 执行子任务
taskLeft.fork();
taskRight.fork();
// 等待任务执行结束
Long leftResult = taskLeft.join();
Long rightResult = taskRight.join();
sum = leftResult + rightResult;
}
return sum;
}
}
2. fork和join、submit和invoke、RecursiveTask和RecursiveAction
(1) 每个线程执行时
- fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
- join():等待该任务的处理线程处理完毕,获得返回值。
也可以将上述代码中的
taskLeft.fork();
taskRight.fork();
改为:
invokeAll(leftTask, rightTask);
两者的区别在于,对于Fork/Join模式,假如Pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B,然后A当监工不干活,B去完成A交代的任务。所以上面的模式相当于浪费了一个线程。那么如果使用invokeAll相当于A分工给B后,A和B都去完成工作。这样可以更好的利用线程池,缩短执行的时间。 所以使用invokeAll更优。
(2) ForkJoinPool提交时
- submit():异步执行,只有在Future调用get的时候会阻塞。
- invoke():同步执行,调用之后需要等待任务完成,才能执行后面的代码。
(3) 类在继承时
- RecursiveTask:适用于有返回值的情况。
- RecursiveAction:适用于无返回值的情况。
3. 原理
在线程创建时,每个fork()并不都会生成一个新的线程,而每个join()也不一定会造成线程阻塞,而是采用了一种更加复杂的算法——工作切取(work stealing),其流程如下:
- ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
- 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。