声明:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 译者:郑玉婷
自定义在 Fork/Join 框架中运行的任务
执行者框架分开了任务的创建和运行。这样,你只要实现 Runnable 对象来使用 Executor 对象。你可以发送 Runnable 任务给执行者,然后它会创建,管理,并终结必要的线程来执行这些任务。
Java 7 在 Fork/Join 框架中提供了特殊的执行者。这个框架是设计用来解决那些可以使用 divide 和 conquer 技术分成更小点的任务的问题。在一个任务内,你要检查你要解决的问题的大小,如果它比设定的大小还大,你就把问题分成2个或多个任务,再使用框架来执行这些任务。
如果问题的大小比设定的大小要小,你可以在任务中直接解决问题,可选择返回结果。Fork/Join 框架实现 work-stealing 算法来提高这类问题的整体表现。
Fork/Join 框架的主要类是 ForkJoinPool 类。它内部有以下2个元素:
- 一个等待执行的任务queue
- 一个执行任务的线程池
默认情况,被 ForkJoinPool类执行的任务是 ForkJoinTask 类的对象。你也可以发送 Runnable 和 Callable 对象给 ForkJoinPool 类,但是他们就不能获得所以 Fork/Join 框架的好处。通常情况,你将发送ForkJoinTask 类的这2个子类中的一个给 ForkJoinPool 对象:
- RecursiveAction: 如果你的任务没有返回结果
- RecursiveTask: 如果你的任务返回结果
在这个指南,你将学习如何为 Fork/Join 框架实现你自己的任务,实现一个任务扩展ForkJoinTask类。你将要实现的任务是计量运行时间并写入操控台,这样你可以控制它的进展(evolution)。你也可以实现你自己的 Fork/Join 任务来写日志信息,为了获得在这个任务中使用的资源,或者来 post-process 任务的结果。
怎么做呢…
按照这些步骤来实现下面的例子::
002 |
public abstract class MyWorkerTask extends ForkJoinTask<Void> {
|
008 |
public MyWorkerTask(String name) {
|
014 |
public Void getRawResult() {
|
020 |
protected void setRawResult(Void value) {
|
027 |
protected boolean exec() {
|
028 |
Date startDate= new Date();
|
030 |
Date finishDate= new Date();
|
031 |
long diff=finishDate.getTime()-startDate.getTime();
|
032 |
System.out.printf( "MyWorkerTask: %s : %d Milliseconds to complete.\n" ,name,diff);
|
037 |
public String getName(){
|
042 |
protected abstract void compute();
|
045 |
public class Task extends MyWorkerTask {
|
051 |
public Task(String name, int array[], int start, int end){
|
059 |
protected void compute() {
|
061 |
int mid=(end+start)/ 2 ;
|
062 |
Task task1= new Task( this .getName()+ "1" ,array,start,mid);
|
063 |
Task task2= new Task( this .getName()+ "2" ,array,mid,end);
|
064 |
invokeAll(task1,task2);
|
068 |
for ( int i=start; i<end; i++) {
|
075 |
} catch (InterruptedException e) {
|
083 |
public static void main(String[] args) throws Exception {
|
086 |
int array[]= new int [ 10000 ];
|
089 |
ForkJoinPool pool= new ForkJoinPool();
|
092 |
Task task= new Task( "Task" ,array, 0 ,array.length);
|
101 |
System.out.printf( "Main: End of the program.\n" );
|
它是怎么工作的…
在这个指南,你实现了扩展 ForkJoinTask 类的 MyWorkerTask 类。这是你的基本类,它可以在 ForkJoinPool 执行者中执行,并且可以获得这个执行者的所以好处,如 work-stealing 算法。这个类等同于 RecursiveAction 和 RecursiveTask 类。
当你扩展 ForkJoinTask 类,你必须实现以下3个方法:
- setRawResult(): 此方法是用来设立任务的结果。由于你的任务不返回任何结果,所以留白即可。
- getRawResult(): 此方法是用来返回任务的结果。由于你的任务不返回任何结果,此方法会返回null值。
- exec(): 此方法实现了任务的算法。在这个例子,你把算法委托给抽象方法 compute() (如 RecursiveAction 类和 RecursiveTask 类),且在exec()方法你计算了方法的运行时间,并写入到操控台。
最后,在例子的主类,你创建了一个有10,000个元素的array,一个 ForkJoinPool 执行者,和一个处理整个array的 Task 对象。运行程序,你可以发现不同的任务运行后都在操控台写入他们的运行时间。