Java并发框架Fork-Join

1 Fork-Join概念

  从JDK7提供的另一种并行框架,它是一种分治编程:将任务分解,分解完了,挨个治理,最后把结果合并。适用于整体任务量不好确定的场合(最小任务可确定)。

1.1 Fork-Join主要类

  • ForkJoinPool:任务池
  • RecursiveAction、RecursiveTask:定义具体的任务,RecursiveTask的compute方法有返回值,RecursiveAction的compute方法无返回值,可根据任务需要选择哪一种。

2 Fork-Join用法

使用Fork-Join框架计算1~10000000的和:

SumTest:

package com.antique.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

public class SumTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1 创建执行线程池
        ForkJoinPool pool = new ForkJoinPool();
        //ForkJoinPool pool = new ForkJoinPool(4);

        // 2 创建任务
        SumTask task = new SumTask(1, 10000000);

        // 3 提交任务
        ForkJoinTask<Long> result = pool.submit(task);

        // 4 等待结果
        do {
            System.out.println("Main: Active 线程数量: " + pool.getActiveThreadCount());
            System.out.println("Main: 并行度: " + pool.getParallelism());
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());

        System.out.println(result.get().toString());
    }
}

SumTask:

package com.antique.forkjoin;

import java.util.concurrent.RecursiveTask;

public class SumTask extends RecursiveTask<Long> {
    private int start;
    private int end;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    public static final int THREAD_HOLD = 5;

    @Override
    protected Long compute() {
        Long sum = 0L;

        // 如果任务足够小,就直接执行
        boolean canCompute = (end - start) <= THREAD_HOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 任务大于阈值 分裂为两个任务
            int middle = (start + end)/2;
            SumTask subTask1 = new SumTask(start, middle);
            SumTask subTask2 = new SumTask(middle+1, end);

            // 提交两个子任务
            invokeAll(subTask1, subTask2);

            // 等待两个子任务结束 并拿到结果
            Long sum1 = subTask1.join();
            Long sum2 = subTask2.join();

            sum = sum1 + sum2;
        }
        return sum;
    }
}

运行结果:

Java并发框架Fork-Join

  这里只提交了一个任务,但是在程序运行的过程中,这个大的任务会不断的被分解,通常都是分解成两个,或者是更多个。然后每个子任务运行结束以后,把每个子任务的结果收集回来,再向上呈现出最终的结果。

上一篇:poj 2462 Period of an Infinite Binary Expansion


下一篇:实验六 进程基础