Fork-Join
java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如Thread,Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序,
分治策略是:对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为k个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。
归并排序
归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。
若将两个有序表合并成一个有序表,称为2-路归并,与之对应的还有多路归并。
对于给定的一组数据,利用递归与分治技术将数据序列划分成为越来越小的半子表,在对半子表排序后,再用递归方法将排好序的半子表合并成为越来越大的有序序列。
为了提升性能,有时我们在半子表的个数小于某个数(比如15)的情况下,对半子表的排序采用其他排序算法,比如插入排序。
归并排序(降序)示例
先讲数组划分为左右两个子表:
,
然后继续左右两个子表拆分:
对最后的拆分的子表,两两进行排序
对有序的子表进行排序和比较合并
对合并后的子表继续比较合并
Fork-Join原理
工作密取
即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。
ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。
Fork/Join使用
要使用fork/join框架,需要先创建forkjoin任务,它提供任务中执行fork和join的操作机制,一般不直接继承ForkjoinTask,而是继承其子类。其子类一共有两个:
1. RecursiveAction,用于没有返回结果的任务
2. RecursiveTask,用于有返回值的任务
task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:
1. invoke()是同步执行,调用之后需要等待任务完成,才能执行后面的代码;
2. submit()是异步执行。
join()和get()方法当任务完成的时候返回计算结果。
使用图解如下:
在compute()方法中,首先要判断任务是否足够小,如果小就执行任务(自己代码要做的事),否则就拆分任务,拆分时必须拆分成两个每个子任务使用fork()方法调用又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果
简单示例如下:
import java.util.concurrent.*;
/**
* 使用fork/join计算0-1000的和
* 使用fork/join继承类可以为RecursiveTask<T>或RecursiveAction
* RecursiveTask<T> 重新抽象方法compute,返回类型为<T>,
* RecursiveAction 返回类型为void
*/
public class MyTask extends RecursiveTask<Integer> {
// 拆分任务的刻度(达到10则拆分任务)
private final int MAX_SIZE = 10;
// 任务开始
private int start;
// 任务终止
private int end;
//构造方法,赋值开始和结束值
public MyTask(int start, int end) {
this.start = start;
this.end = end;
}
/**
* 必须要重新的方法,在方法里面通过new <RecursiveTask>的对象并调用fork()方法来进行拆分。
* 然后合并拆分子任务的值,最后返回
* @return
*/
@Override
protected Integer compute() {
int sum = 0;
if (end - start <MAX_SIZE){
for (int i = start ;i < end;i++){
sum += i;
}
}else {
int middle = (start + end)/2; // 中间值
MyTask left = new MyTask(start , middle); // 拆分左任务
MyTask right = new MyTask(middle+1,end); // 拆分右任务
left.fork(); // 执行左任务
right.fork(); // 执行右任务
sum = left.join() + right.join(); // 计算左右相加
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool(); // 创建fork/join线程池
Future<Integer> future = pool.submit(new MyTask(0,1000)); // 创建一个任务并交给线程池执行
System.out.println(future.get()); // 获取结果,阻塞方法
pool.shutdown();//关闭fork/join线程池
}
}