声明:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González 译者:许巧辉
实现ThreadFactory接口生成自定义的线程给Fork/Join框架
Fork/Join框架是Java7中最有趣的特征之一。它是Executor和ExecutorService接口的一个实现,允许你执行Callable和Runnable任务而不用管理这些执行线程。
这个执行者面向执行能被拆分成更小部分的任务。主要组件如下:
- 一个特殊任务,实现ForkJoinTask类
- 两种操作,将任务划分成子任务的fork操作和等待这些子任务结束的join操作
- 一个算法,优化池中线程的使用的work-stealing算法。当一个任务正在等待它的子任务(结束)时,它的执行线程将执行其他任务(等待执行的任务)。
ForkJoinPool类是Fork/Join的主要类。在它的内部实现,有如下两种元素:
- 一个存储等待执行任务的列队。
- 一个执行任务的线程池
在这个指南中,你将学习如何实现一个在ForkJoinPool类中使用的自定义的工作者线程,及如何使用一个工厂来使用它。
准备工作…
这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。
如何做…
按以下步骤来实现的这个例子:
1.创建一个继承ForkJoinWorkerThread类的MyWorkerThread类。
1 |
public class MyWorkerThread extends ForkJoinWorkerThread {
|
2.声明和创建一个参数化为Integer类的ThreadLocal属性,名为taskCounter。
1 |
private static ThreadLocal<Integer> taskCounter= new ThreadLocal<Integer>();
|
3.实现这个类的构造器。
1 |
protected MyWorkerThread(ForkJoinPool pool) {
|
4.重写onStart()方法。调用父类的这个方法,写入一条信息到控制台。设置当前线程的taskCounter属性值为0。
2 |
protected void onStart() {
|
4 |
System.out.printf("MyWorkerThread %d: Initializing task |
5.重写onTermination()方法。写入当前线程的taskCounter属性值到控制台。
2 |
protected void onTermination(Throwable exception) {
|
3 |
System.out.printf("MyWorkerThread %d: |
4 |
%d\n",getId(),taskCounter.get()); |
5 |
super .onTermination(exception);
|
6.实现addTask()方法。递增taskCounter属性值。
2 |
int counter=taskCounter.get().intValue();
|
4 |
taskCounter.set(counter); |
7.创建一个实现ForkJoinWorkerThreadFactory接口的MyWorkerThreadFactory类。实现newThread()方法,创建和返回一个MyWorkerThread对象。
2 |
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
|
3 |
return new MyWorkerThread(pool);
|
8.创建MyRecursiveTask类,它继承一个参数化为Integer类的RecursiveTask类。
1 |
public class MyRecursiveTask extends RecursiveTask<Integer> {
|
9.声明一个私有的、int类型的属性array。
10.声明两个私有的、int类型的属性start和end。
1 |
private int start, end;
|
11.实现这个类的构造器,初始化它的属性。
1 |
public MyRecursiveTask( int array[], int start, int end) {
|
12.实现compute()方法,用来合计数组中在start和end位置之间的所有元素。首先,将执行这个任务的线程转换成一个MyWorkerThread对象,然后使用addTask()方法来增长这个线程的任务计数器。
2 |
protected Integer compute() {
|
4 |
MyWorkerThread thread=(MyWorkerThread)Thread.currentThread(); |
13.实现addResults()方法。计算和返回两个任务(接收参数)的结果的总和。
01 |
private Integer addResults(Task task1, Task task2) {
|
04 |
value = task1.get().intValue()+task2.get().intValue(); |
05 |
} catch (InterruptedException e) {
|
08 |
} catch (ExecutionException e) {
|
14.令这个线程睡眠10毫秒,然后返回任务的结果。
2 |
TimeUnit.MILLISECONDS.sleep( 10 );
|
3 |
} catch (InterruptedException e) {
|
15.实现这个例子的主类,通过创建Main类,并实现main()方法。
2 |
public static void main(String[] args) throws Exception {
|
16.创建一个名为factory的MyWorkerThreadFactory对象。
1 |
MyWorkerThreadFactory factory= new MyWorkerThreadFactory();
|
17.创建一个名为pool的ForkJoinPool对象,将前面创建的factory对象作为参数传给它的构造器。
1 |
ForkJoinPool pool= new ForkJoinPool( 4 , factory, null , false );
|
18.创建一个大小为100000的整数数组,将所有元素初始化为值1。
1 |
int array[]= new int [ 100000 ];
|
2 |
for ( int i= 0 ; i<array.length; i++){
|
19.创建一个新的Task对象,用来合计数组中的所有元素。
1 |
MyRecursiveTask task= new MyRecursiveTask(array, 0 ,array.length);
|
20.使用execute()方法,将这个任务提交给池。
21.使用join()方法,等待这个任务的结束。
22.使用shutdown()方法,关闭这个池。
23.使用awaitTermination()方法,等待这个执行者的结束。
1 |
pool.awaitTermination( 1 , TimeUnit.DAYS);
|
24.使用get()方法,将任务的结束写入到控制台。
1 |
System.out.printf( "Main: Result: %d\n" ,task.get());
|
25.写入一条信息到控制台,表明程序的结束。
1 |
System.out.printf( "Main: End of the program\n" );
|
它是如何工作的…
Fork/Join框架使用的线程叫工作者线程。Java包含继承Thread类的ForkJoinWorkerThread类和使用Fork/Join框架实现工作者线程。
在这个指南中,你已实现了继承ForkJoinWorkerThread类的MyWorkerThread类,并重写这个类的两个方法。你的目标是实现每个工作者线程的任务计数器,以至于你可以知道每个工作者线程执行多少个任务。你已经通过一个ThreadLocal属性实现计数器。这样,每个线程都拥有它自己的计数器,对于来你说是透明的。
你已重写ForkJoinWorkerThread类的onStart()方法来实现任务的计数器。当工作者线程开始它的执行时,这个方法将被调用。你也重写了onTermination()方法,将任务计数器的值写入到控制台。当工作者线程结束它的执行时,这个方法将被调用。你也在MyWorkerThread类中实现addTask()方法,用来增加每个线程的任务计数器。
对于ForkJoinPool类,与Java并发API中的所有执行者一样,使用工厂来创建它。所以,如果你想在ForkJoinPool类中使用MyWorkerThread线程,你必须实现自己的线程工厂。对于Fork/Join框架,这个工厂必须实现ForkJoinPool.ForkJoinWorkerThreadFactory类。为此,你已实现MyWorkerThreadFactory类。这个类只有一个用来创建一个新的MyWorkerThread对象的方法。
最后,你只要使用已创建的工厂来初始化ForkJoinPool类。你已在Main类中通过使用ForkJoinPool的构造器实现了。
以下截图显示了这个程序的部分输出:
你可以看出ForkJoinPool对象如何执行4个工作者线程及每个工作者线程执行多少个任务。
不止这些…
考虑一下,当一个线程正常结束或抛出一个Exception异常时,调用的ForkJoinWorkerThread提供的onTermination()方法。这个方法接收一个Throwable对象作为参数。如果这个参数值为null时,表明这个工作者线程正常结束。但是,如果这个参数的值不为null,表明这个线程抛出一个异常。你必须包含必要的代码来处理这种情况。