3.4 在一个共同点上同步任务
Java 并发API提供了一个同步实用程序,它允许两个或者多个线程在一个确定的点进行同步(synchronization)。这个类就是循环阻塞类(CyclicBarrier)。这个类与CountDownLatch类有点类似,但是也有不同之处。
CyclicBarrier类也是以一个integer数来初始化一个实例类,在一个确定点上,这个数就是所有线程必须同步的线程数。当这些线程到达确定点时,它调用await()方法来等待其它线程。当这个线程调用这用方法时,CyclicBarrier类阻塞并使之睡眠直到其它线程都到达为止。当最后的线程调用await() 方法时,它将唤醒所有等待的线程并继续执行它的工作。
CyclicBarrier类与CountDownLatch类有共同之处,但是,它们也有一些不同之处。最重要的不同之一就是一个CyclicBarrier对象能够重设置为它的初始化状态,赋值给它内部计数器。
下面来看一个例子,说明CyclicBarrier如何使用。假设我们有个大矩阵,矩阵中存放着0到10的数,现在,我们想查找值是5 的个数。为了高效的解决这个问题,我们使用了CyclicBarrier类。
定义矩阵类MatrixMock:
import java.util.Random; /** * This class generates a randommatrix of integer numbers between 1 and 10 * */ public class MatrixMock { /** * Bi-dimensional array with the random numbers */ private intdata[][]; /** * Constructor of the class. Generates the bi-dimensional array ofnumbers. * While generates the array, it counts thetimes that appears the number we are going * to look for so we can check that theCiclycBarrier class does a good job * @param size Number of rows of the array * @param length Number of columns of the array * @param number Number we are going to look for */ public MatrixMock(intsize, intlength, intnumber){ int counter=0; data=new int[size][length]; Random random=new Random(); for (int i=0; i<size; i++) { for (int j=0; j<length; j++){ data[i][j]=random.nextInt(10); if (data[i][j]==number){ counter++; } } } System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number); } /** * This methods returns a row of the bi-dimensional array * @param row the number of the row to return * @return the selected row */ public int[]getRow(introw){ if ((row>=0)&&(row<data.length)){ return data[row]; } return null; } }
定义并行执行结果类Results:
/** * This class is used to store thenumber of occurrences of the number * we are looking for in each row ofthe bi-dimensionalarray * */ public class Results { /** * Array to store the number of occurrences ofthe number in each row of the array */ private intdata[]; /** * Constructor of the class. Initializes itsattributes * @param size Size of the array to store the results */ public Results(intsize){ data=new int[size]; } /** * Sets the value of one position in the arrayof results * @param position Position in the array * @param value Value to set in that position */ public void setData(int position, int value){ data[position]=value; } /** * Returns the array of results * @return the array of results */ public int[]getData(){ return data; } }
定义搜索群组类:
/** * Group the results of eachSearcher. Sum the values stored in the Results object * An object of this class isexecuted automatically by the CyclicBarrier when * all the Searchers finish its job */ public class Grouper implements Runnable { /** * Results object with the occurrences of thenumber in each row */ private Results results; /** * Constructor of the class. Initializes itsattributes * @param results Results object with the ocurrences of thenumber in each row */ public Grouper(Results results){ this.results=results; } /** * Main method of the Grouper. Sum the valuesstored in the Results object */ @Override public voidrun() { int finalResult=0; System.out.printf("Grouper: Processing results...\n"); int data[]=results.getData(); for (int number : data){ finalResult += number; } System.out.printf("Grouper: Total result: %d.\n",finalResult); } }
定义搜索类:
public class Searcher implements Runnable { /** * First row where look for */ private intfirstRow; /** * Last row where look for */ private intlastRow; /** * Bi-dimensional array with the numbers */ private MatrixMock mock; /** * Array to store the results */ private Results results; /** * Number to look for */ private intnumber; /** * CyclicBarrier to control the execution */ private finalCyclicBarrier barrier; /** * Constructor of the class. Initializes itsattributes * @param firstRow First row where look for * @param lastRow Last row where fook for * @param mock Object with the array of numbers * @param results Array to store the results * @param number Number to look for * @param barrier CyclicBarrier to control the execution */ public Searcher(intfirstRow, intlastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier){ this.firstRow=firstRow; this.lastRow=lastRow; this.mock=mock; this.results=results; this.number=number; this.barrier=barrier; } /** * Main method of the searcher. Look for thenumber in a subset of rows. For each row, saves the * number of occurrences of the number in thearray of results */ @Override public voidrun() { int counter; System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow); for (int i=firstRow; i < lastRow;i++){ int row[]=mock.getRow(i); counter=0; for (int j=0; j<row.length; j++){ if (row[j]==number){ counter++; } } results.setData(i, counter); } System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName()); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } /** * Main method of the example * @param args */ public staticvoidmain(String[] args) { /* * Initializes the bi-dimensional arrayof data * 10000rows * 1000numbers in each row * Lookingfor number 5 */ final int ROWS=10000; final int NUMBERS=1000; final int SEARCH=5; final int PARTICIPANTS=5; final int LINES_PARTICIPANT=2000; MatrixMock mock=new MatrixMock(ROWS,NUMBERS,SEARCH); // Initializes the object for the results Results results=new Results(ROWS); // Creates an Grouper object Grouper grouper=new Grouper(results); // Creates the CyclicBarrier object. It has 5 participants and, when // they finish, the CyclicBarrier will execute the grouper object CyclicBarrier barrier=newCyclicBarrier(PARTICIPANTS, grouper); // Creates, initializes and starts 5 Searcher objects Searcher searchers[]=new Searcher[PARTICIPANTS]; for (int i=0; i < PARTICIPANTS; i++){ searchers[i]=newSearcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5, barrier); Thread thread=new Thread(searchers[i]); thread.start(); } System.out.printf("Main: The main thread has finished.\n"); } }
执行结果:
Mock:There are 998891 ocurrences of number in generated data. Thread-0:Processing lines from 0 to 2000. Main:The main thread has finished. Thread-3:Processing lines from 6000 to 8000. Thread-4:Processing lines from 8000 to 10000. Thread-1:Processing lines from 2000 to 4000. Thread-2:Processing lines from 4000 to 6000. Thread-3:Lines processed. Thread-4:Lines processed. Thread-0:Lines processed. Thread-1:Lines processed. Thread-2:Lines processed. Grouper:Processing results... Grouper: Total result: 998891.
如果按照一般的处理操作,这个效率会大大降低。可见,CyclicBarrier的功能强大之处。