线程同步工具(四)在同一个点同步任务

声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷

在同一个点同步任务

Java 并发 API 提供了可以允许2个或多个线程在在一个确定点的同步应用。它是 CyclicBarrier 类。此类与在此章节的等待多个并发事件完成指南中的 CountDownLatch 类相似,但是它有一些特殊性让它成为更强大的类。

CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量。当其中一个线程到达确定点,它会调用await() 方法来等待其他线程。当线程调用这个方法,CyclicBarrier阻塞线程进入休眠直到其他线程到达。当最后一个线程调用CyclicBarrier 类的await() 方法,它唤醒所有等待的线程并继续执行它们的任务。

CyclicBarrier 类有个有趣的优势是,你可以传递一个外加的 Runnable 对象作为初始参数,并且当全部线程都到达同一个点时,CyclicBarrier类 会把这个对象当做线程来执行。此特点让这个类在使用 divide 和 conquer 编程技术时,可以充分发挥任务的并行性,

在这个指南,你将学习如何使用 CyclicBarrier 类来让一组线程在一个确定点同步。你也将使用 Runnable 对象,它将会在全部线程都到达确定点后被执行。在这个例子里,你将在数字矩阵中查找一个数字。矩阵会被分成多个子集(使用divide 和 conquer 技术),所以每个线程会在一个子集中查找那个数字。一旦全部行程运行结束,会有一个最终任务来统一他们的结果。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java项目。

怎么做呢

按照这些步骤来实现下面的例子::

001 //1.  我们从实现2个辅助类开始。首先,创建一个类名为 MatrixMock。此类随机生成一个在1-10之间的 数字矩阵,我们将从中查找数字。
002 public class MatrixMock {
003  
004 //2.   声明私有 int matrix,名为 data。
005 private int data[][];
006  
007 //3.   实现类的构造函数。此构造函数将接收矩阵的行数,行的长度,和我们将要查找的数字作为参数。3个参数全部int 类型。
008 public MatrixMock(int size, int length, int number){
009  
010 //4.   初始化构造函数将使用的变量和对象。
011 int counter=0;
012 data=new int[size][length];
013 Random random=new Random();
014  
015 //5.   用随机数字填充矩阵。每生成一个数字就与要查找的数字对比,如果相等,就增加counter值。
016 for (int i=0; i<size; i++) {
017     for (int j=0; j<length; j++){
018         data[i][j]=random.nextInt(10);
019         if (data[i][j]==number){
020             counter++;
021         }
022     }
023 }
024  
025 //6.   最后,在操控台打印一条信息,表示查找的数字在生成的矩阵里的出现次数。此信息是用来检查线程们获得的正确结果的。
026 System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number); //译者注:把字符串里的number改为%d.
027  
028 //7.    实现 getRow() 方法。此方法接收一个 int为参数,是矩阵的行数。返回行数如果存在,否则返回null。
029 public int[] getRow(int row){
030     if ((row>=0)&&(row<data.length)){
031         return data[row];
032     }
033     return null;
034 }
035  
036 //8.   现在,实现一个类名为 Results。此类会在array内保存被查找的数字在矩阵的每行里出现的次数。
037 public class Results {
038  
039 //9.   声明私有 int array 名为 data。
040 private int data[];
041  
042 //10. 实现类的构造函数。此构造函数接收一个表明array元素量的整数作为参数。
043 public Results(int size){
044     data=new int[size];
045 }
046  
047 //11. 实现 setData() 方法。此方法接收array的某个位置和一个值作为参数,然后把array的那个位置设定为那个值。
048 public void setData(int position, int value){
049     data[position]=value;
050 }
051  
052 //12. 实现 getData() 方法。此方法返回结果 array。
053 public int[] getData(){
054 return data;
055 }
056  
057 //13. 现在你有了辅助类,是时候来实现线程了。首先,实现 Searcher 类。这个类会在随机数字的矩阵中的特定的行里查找数字。创建一个类名为Searcher 并一定实现  Runnable 接口.
058 public class Searcher implements Runnable {
059  
060 //14. 声明2个私有int属性名为 firstRow 和 lastRow。这2个属性是用来确定将要用的子集的行。
061 private int firstRow;
062 private int lastRow;
063  
064 //15. 声明一个私有 MatrixMock 属性,名为 mock。
065 private MatrixMock mock;
066  
067 //16. 声明一个私有 Results 属性,名为 results。
068 private Results results;
069  
070 //17.  声明一个私有 int 属性名为 number,用来储存我们要查找的数字。
071 private int number;
072  
073 //18. 声明一个 CyclicBarrier 对象,名为 barrier。
074 private final CyclicBarrier barrier;
075  
076 //19. 实现类的构造函数,并初始化之前声明的全部属性。
077 public Searcher(int firstRow, int lastRow, NumberMock mock, Results results, int number, CyclicBarrier barrier){
078     this.firstRow=firstRow;
079     this.lastRow=lastRow;
080     this.mock=mock;
081     this.results=results;
082     this.number=number;
083     this.barrier=barrier;
084 }
085  
086 //20. 实现 run() 方法,用来查找数字。它使用内部变量,名为counter,用来储存数字在每行出现的次数。
087 @Override
088 public void run() {
089     int counter;
090  
091 //21. 在操控台打印一条信息表明被分配到这个对象的行。
092 System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);
093  
094 //22. 处理分配给这个线程的全部行。对于每行,记录正在查找的数字出现的次数,并在相对于的 Results 对象中保存此数据。
095 for (int i=firstRow; i<lastRow; i++){
096     int row[]=mock.getRow(i);
097     counter=0;
098     for (int j=0; j<row.length; j++){
099         if (row[j]==number){
100         counter++;
101     }
102 }
103  
104 results.setData(i, counter);
105 }
106  
107 //23. 打印信息到操控台表明此对象已经结束搜索。
108 System.out.printf("%s: Lines processed.\n",Thread. currentThread().getName());
109  
110 //24. 调用 CyclicBarrier 对象的 await() 方法 ,由于可能抛出的异常,要加入处理 InterruptedException and BrokenBarrierException 异常的必需代码。
111 try {
112     barrier.await();
113 catch (InterruptedException e) {
114     e.printStackTrace();
115 catch (BrokenBarrierException e) {
116     e.printStackTrace();
117 }
118  
119 //25. 现在,实现一个类来计算数字在这个矩阵里出现的总数。它使用储存了矩阵中每行里数字出现次数的 Results 对象来进行运算。创建一个类,名为 Grouper 并一定实现 Runnable 接口.
120 public class Grouper implements Runnable {
121  
122 //26. 声明一个私有 Results 属性,名为 results。
123 private Results results;
124  
125 //27.  实现类的构造函数,并初始化 Results 属性。
126 public Grouper(Results results){
127 this.results=results;
128 }
129  
130 //28.实现 run() 方法,用来计算结果array里数字出现次数的总和。
131 @Override
132 public void run() {
133  
134 //29. 声明一个 int 变量并写在操控台写一条信息表明开始处理了。
135 int finalResult=0;
136 System.out.printf("Grouper: Processing results...\n");
137  
138 //30. 使用 results 对象的 getData() 方法来获得每行数字出现的次数。然后,处理array的全部元素,把每个元素的值加给 finalResult 变量。
139 int data[]=results.getData();
140 for (int number:data){
141 finalResult+=number;
142 }
143  
144 //31. 在操控台打印结果。
145 System.out.printf("Grouper: Total result: %d.\n",finalResult);
146  
147 //32. 最后, 实现例子的 main 类,通过创建一个类,名为 Main 并为其添加 main() 方法。
148 public class Main {
149  
150 public static void main(String[] args) {
151  
152 //33. 声明并初始5个常熟来储存应用的参数。
153 final int ROWS=10000;
154 final int NUMBERS=1000;
155 final int SEARCH=5;
156 final int PARTICIPANTS=5;
157 final int LINES_PARTICIPANT=2000;
158  
159 //34. Create a MatrixMock 对象,名为 mock. 它将有 10,000 行,每行1000个元素。现在,你要查找的数字是5。
160 MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH);
161  
162 //35. 创建 Results 对象,名为 results。它将有 10,000 元素。
163 Results results=new Results(ROWS);
164  
165 //36. 创建 Grouper 对象,名为 grouper。
166 Grouper grouper=new Grouper(results);
167  
168 //37.  创建 CyclicBarrier 对象,名为 barrier。此对象会等待5个线程。当此线程结束后,它会执行前面创建的 Grouper 对象。
169 CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper);
170  
171 //38. 创建5个 Searcher 对象,5个执行他们的线程,并开始这5个线程。
172 Searcher searchers[]=new Searcher[PARTICIPANTS];
173 for (int i=0; i<PARTICIPANTS; i++){
174     searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_ PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier);
175     Thread thread=new Thread(searchers[i]);
176     thread.start();
177 }
178 System.out.printf("Main: The main thread has finished.\n");

它是怎么工作的…

以下裁图是例子的运行结果:

线程同步工具(四)在同一个点同步任务

例子中解决的问题比较简单。我们有一个很大的随机的整数矩阵,然后你想知道这矩阵里面某个数字出现的次数。为了更好的执行,我们使用了 divide 和 conquer 技术。我们 divide 矩阵成5个子集,然后在每个子集里使用一个线程来查找数字。这些线程是 Searcher 类的对象。

我们使用 CyclicBarrier 对象来同步5个线程的完成,并执行 Grouper 任务处理个别结果,最后计算最终结果。

如我们之前提到的,CyclicBarrier 类有一个内部计数器控制到达同步点的线程数量。每次线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象到达同步点了。CyclicBarrier 把线程放入睡眠状态直到全部的线程都到达他们的同步点。

当全部的线程都到达他们的同步点,CyclicBarrier 对象叫醒全部正在 await() 方法中等待的线程们,然后,选择性的,为CyclicBarrier的构造函数 传递的 Runnable 对象(例子里,是 Grouper 对象)创建新的线程执行外加任务。

更多…

CyclicBarrier 类有另一个版本的 await() 方法:

  • await(long time, TimeUnit unit): 线程会一直休眠直到被中断;内部计数器到达0,或者特定的时间过去了。TimeUnit类有多种常量: DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, and SECONDS.

此类也提供了 getNumberWaiting() 方法,返回被 await() 方法阻塞的线程数,还有 getParties() 方法,返回将与CyclicBarrier同步的任务数。

重置 CyclicBarrier 对象
CyclicBarrier 类与CountDownLatch有一些共同点,但是也有一些不同。最主要的不同是,CyclicBarrier对象可以重置到它的初始状态,重新分配新的值给内部计数器,即使它已经被初始过了。

可以使用 CyclicBarrier的reset() 方法来进行重置操作。当这个方法被调用后,全部的正在await() 方法里等待的线程接收到一个 BrokenBarrierException 异常。此异常在例子中已经用打印stack trace处理了,但是在一个更复制的应用,它可以执行一些其他操作,例如重新开始执行或者在中断点恢复操作。

破坏 CyclicBarrier 对象 
CyclicBarrier 对象可能处于一个特殊的状态,称为 broken。当多个线程正在 await() 方法中等待时,其中一个被中断了,此线程会收到 InterruptedException 异常,但是其他正在等待的线程将收到 BrokenBarrierException 异常,并且 CyclicBarrier 会被置于broken 状态中。

CyclicBarrier 类提供了isBroken() 方法,如果对象在 broken 状态,返回true,否则返回false。

参见

第三章,线程同步应用:等待多个并发事件

文章转自 并发编程网-ifeve.com

上一篇:什么是阿里云云服务器ecs?为什么要选择使用阿里云ECS服务器呢?


下一篇:固若金汤 - PostgreSQL pgcrypto加密插件