今天为大家介绍几种java内置的同步器。
CountDownLatch:倒计数门栓
CountDownLatch让一个线程集等待直到计数变为0.该Latch为一次性的,一旦计数为0,就不能再使用了。
Sample 1:
public class Driver { public static void main(String[] args) throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(5); for (int i = 0; i < 5; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet startSignal.countDown(); // startSignal change to zero, all threads begin to run doneSignal.await(); // all thread await doneSignal to zero System.out.println("All threads runs over"); } private static void doSomethingElse() { System.out.println("don't let run yet"); } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); System.out.println("Thread executing"); doneSignal.countDown(); } catch (InterruptedException ex) { } } }
result:
don‘t let run yet
Thread executing
Thread executing
Thread executing
Thread executing
Thread executing
All threads runs over
Sample 2
public class Driver2 { public static void main(String[] args) throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(3); Executor e = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(5)); for (int i = 0; i < 3; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish System.out.println("All threads are done"); } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { doWork(i); doneSignal.countDown(); } void doWork(int i) { System.out.println("Thread " +i+" is working"); } }result:
Thread 0 is working
Thread 1 is working
Thread 2 is working
All threads are done
CyclicBarrier 可循环障栅
特性同CountDownLatch,只不过CyclicBarrier 可循环使用。调用reset()方法即可重置障栅。
sample:
public class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(); try { Thread.sleep((int)(Math.random() * 10)*1000); System.out.println(Thread.currentThread().getName() + " run over, and await"); barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } private void processRow() { for (int i = 0; i < data[0].length; i++) { data[myRow][i] = (float) Math.random() * 100; } } private boolean done() { if (data[myRow][data[0].length - 1] != 0) { return true; } return false; } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; barrier = new CyclicBarrier(N+1, new Runnable() { public void run() { mergeRows(); } private void mergeRows() { // TODO Auto-generated method stub System.out.println("row merge invoking"); } }); for (int i = 0; i < N; ++i) new Thread(new Worker(i)).start(); waitUntilDone(); } private void waitUntilDone() { try { barrier.await(); System.out.println("all threads, done"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) { float[][] array = new float[6][6]; Solver s = new Solver(array); } }result:
Thread-6 run over, and await
Thread-10 run over, and await
Thread-7 run over, and await
Thread-9 run over, and await
Thread-8 run over, and await
Thread-5 run over, and await
row merge invoking
all threads, done
Exchanger(交换器)
当两个线程在同一个数据缓存区上工作的时候,就可以使用交换器(Exchanger),典型的情况是,一个线程向缓冲区填入数据,另一个线程消耗这些数据。当他们都完成以后相互交换缓冲区。
SynchronousQueue(同步队列)
同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put方法时,它会阻塞直到另一个线程调用take方法为止,反之亦然。与Exchanger的情况不同,数据仅仅沿着一个方向传递,冲生产者到消费者。
即使SynchronousQueue类实现了BlockingQueue接口,概念上讲,它依然不是一个队列。它没有包含任何元素,它的size方法总是返回0.