forkjoin分而治之的介绍和使用

Fork-Join

 

java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如Thread,Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序,

分治策略是:对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为k个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。

归并排序

归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。

若将两个有序表合并成一个有序表,称为2-路归并,与之对应的还有多路归并。

对于给定的一组数据,利用递归与分治技术将数据序列划分成为越来越小的半子表,在对半子表排序后,再用递归方法将排好序的半子表合并成为越来越大的有序序列。

为了提升性能,有时我们在半子表的个数小于某个数(比如15)的情况下,对半子表的排序采用其他排序算法,比如插入排序。

归并排序(降序)示例

forkjoin分而治之的介绍和使用

先讲数组划分为左右两个子表:

forkjoin分而治之的介绍和使用

 forkjoin分而治之的介绍和使用

 

然后继续左右两个子表拆分:

forkjoin分而治之的介绍和使用

对最后的拆分的子表,两两进行排序

 forkjoin分而治之的介绍和使用

对有序的子表进行排序和比较合并

forkjoin分而治之的介绍和使用

对合并后的子表继续比较合并

forkjoin分而治之的介绍和使用

 

 Fork-Join原理

forkjoin分而治之的介绍和使用

 

工作密取

即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。

ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

forkjoin分而治之的介绍和使用

 

Fork/Join使用

要使用fork/join框架,需要先创建forkjoin任务,它提供任务中执行fork和join的操作机制,一般不直接继承ForkjoinTask,而是继承其子类。其子类一共有两个:

1. RecursiveAction,用于没有返回结果的任务

2. RecursiveTask,用于有返回值的任务

task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:

1. invoke()是同步执行,调用之后需要等待任务完成,才能执行后面的代码;

2. submit()是异步执行。

join()和get()方法当任务完成的时候返回计算结果。

使用图解如下:

forkjoin分而治之的介绍和使用

 

在compute()方法中,首先要判断任务是否足够小,如果小就执行任务(自己代码要做的事),否则就拆分任务,拆分时必须拆分成两个每个子任务使用fork()方法调用又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果

简单示例如下:

import java.util.concurrent.*;

/**
 * 使用fork/join计算0-1000的和
 * 使用fork/join继承类可以为RecursiveTask<T>或RecursiveAction
 *      RecursiveTask<T> 重新抽象方法compute,返回类型为<T>,
 *      RecursiveAction 返回类型为void
 */
public class MyTask extends RecursiveTask<Integer> {

    // 拆分任务的刻度(达到10则拆分任务)
    private final int MAX_SIZE = 10;

    // 任务开始
    private int start;
    // 任务终止
    private int end;

    //构造方法,赋值开始和结束值
    public MyTask(int start, int end) {
        this.start = start;
        this.end = end;
    }


    /**
     * 必须要重新的方法,在方法里面通过new <RecursiveTask>的对象并调用fork()方法来进行拆分。
     *      然后合并拆分子任务的值,最后返回
     * @return
     */
    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start <MAX_SIZE){
            for (int i = start ;i < end;i++){
                sum += i;
            }
        }else {
            int middle = (start + end)/2; // 中间值
            MyTask left = new MyTask(start , middle); // 拆分左任务
            MyTask right = new MyTask(middle+1,end); // 拆分右任务
            left.fork(); // 执行左任务
            right.fork(); // 执行右任务

            sum = left.join() + right.join(); // 计算左右相加
        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(); // 创建fork/join线程池
        Future<Integer> future = pool.submit(new MyTask(0,1000)); // 创建一个任务并交给线程池执行
        System.out.println(future.get()); // 获取结果,阻塞方法
        pool.shutdown();//关闭fork/join线程池
    }
}

上一篇:python os 模块简介


下一篇:mysql join和left join区别