【java笔记】java中线程池之ForkJoinPool的原理及使用

本文参考自CSDN作者 YourBatman 的ForkJoinPool线程池的使用以及原理和知乎作者 欣然 的文章高并发之Fork/Join框架使用及注意事项

  • ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
  • ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

1. 实例

计算从1到100000的累计和:

package ecnu.cn;

import java.util.concurrent.*;

public class SumTask extends RecursiveTask<Long> {
    private int start, end;

    public static void main(String[] args) {
        SumTask sumTask = new SumTask(1, 100000);
        ForkJoinPool pool = new ForkJoinPool();
        Future<Long> result = pool.submit(sumTask);
        try {
            System.out.println(result.get());;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        Long sum = 0L;
        if (end - start < 10) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            int middle = (start + end) / 2;
            SumTask taskLeft = new SumTask(start, middle);
            SumTask taskRight = new SumTask(middle + 1, end);
            // 执行子任务
            taskLeft.fork();
            taskRight.fork();
            // 等待任务执行结束
            Long leftResult = taskLeft.join();
            Long rightResult = taskRight.join();
            sum = leftResult + rightResult;
        }
        return sum;
    }
}

2. fork和join、submit和invoke、RecursiveTask和RecursiveAction

(1) 每个线程执行时

  • fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
  • join():等待该任务的处理线程处理完毕,获得返回值。

也可以将上述代码中的

taskLeft.fork();
taskRight.fork();

改为:

invokeAll(leftTask, rightTask);

两者的区别在于,对于Fork/Join模式,假如Pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B,然后A当监工不干活,B去完成A交代的任务。所以上面的模式相当于浪费了一个线程。那么如果使用invokeAll相当于A分工给B后,A和B都去完成工作。这样可以更好的利用线程池,缩短执行的时间。 所以使用invokeAll更优。

(2) ForkJoinPool提交时

  • submit():异步执行,只有在Future调用get的时候会阻塞。
  • invoke():同步执行,调用之后需要等待任务完成,才能执行后面的代码。

(3) 类在继承时

  • RecursiveTask:适用于有返回值的情况。
  • RecursiveAction:适用于无返回值的情况。

3. 原理

在线程创建时,每个fork()并不都会生成一个新的线程,而每个join()也不一定会造成线程阻塞,而是采用了一种更加复杂的算法——工作切取(work stealing),其流程如下:
【java笔记】java中线程池之ForkJoinPool的原理及使用

  • ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
  • 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
  • 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
  • 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  • 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
上一篇:云风 极不和谐的 fork 多线程程序


下一篇:自己封装了一个EF的上下文类.,分享一下,顺便求大神指点