java多线程之路之同步器—Core Java学习

今天为大家介绍几种java内置的同步器。

CountDownLatch:倒计数门栓

CountDownLatch让一个线程集等待直到计数变为0.该Latch为一次性的,一旦计数为0,就不能再使用了。

Sample 1:

public class Driver { 
	public static void main(String[] args) throws InterruptedException {
		CountDownLatch startSignal = new CountDownLatch(1);
		CountDownLatch doneSignal = new CountDownLatch(5);


		for (int i = 0; i < 5; ++i)
			// create and start threads
			new Thread(new Worker(startSignal, doneSignal)).start();

		doSomethingElse(); // don't let run yet
		startSignal.countDown(); // startSignal change to zero, all threads begin to run
		doneSignal.await(); // all thread await doneSignal to zero
		System.out.println("All threads runs over");
	}


	private static void doSomethingElse() {
		System.out.println("don't let run yet");
	}
}

class Worker implements Runnable {
	private final CountDownLatch startSignal;
	private final CountDownLatch doneSignal;

	Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
		this.startSignal = startSignal;
		this.doneSignal = doneSignal;
	}

	public void run() {
		try {
			startSignal.await();
			System.out.println("Thread executing");
			doneSignal.countDown();
		} catch (InterruptedException ex) {
		} 
	}
}

result:

don‘t let run yet
Thread executing
Thread executing
Thread executing
Thread executing
Thread executing
All threads runs over

Sample 2

public class Driver2 { 
	public static void main(String[] args) throws InterruptedException {
		     CountDownLatch doneSignal = new CountDownLatch(3);
		     Executor e = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(5));


		     for (int i = 0; i < 3; ++i) // create and start threads
		       e.execute(new WorkerRunnable(doneSignal, i));


		     doneSignal.await();           // wait for all to finish
		     System.out.println("All threads are done");
		   }
		 }


		 class WorkerRunnable implements Runnable {
		   private final CountDownLatch doneSignal;
		   private final int i;
		   WorkerRunnable(CountDownLatch doneSignal, int i) {
		      this.doneSignal = doneSignal;
		      this.i = i;
		   }
		   public void run() {
		      doWork(i);
			doneSignal.countDown();
		   }


		   void doWork(int i) {
			   System.out.println("Thread " +i+" is working");
		   }
		 }
result:

Thread 0 is working
Thread 1 is working
Thread 2 is working
All threads are done

CyclicBarrier 可循环障栅

特性同CountDownLatch,只不过CyclicBarrier 可循环使用。调用reset()方法即可重置障栅。

sample:

public class Solver {
	final int N;
	final float[][] data;
	final CyclicBarrier barrier;

	class Worker implements Runnable {
		int myRow;

		Worker(int row) {
			myRow = row;
		}

		public void run() {
			while (!done()) {
				processRow();
				
				try {
					Thread.sleep((int)(Math.random() * 10)*1000);
					System.out.println(Thread.currentThread().getName() + "  run over, and await");
					barrier.await();
				} catch (InterruptedException ex) {
					return;
				} catch (BrokenBarrierException ex) {
					return;
				}
			}
		}

		private void processRow() {
			for (int i = 0; i < data[0].length; i++) {
				data[myRow][i] = (float) Math.random() * 100;
			}
		}

		private boolean done() {
			if (data[myRow][data[0].length - 1] != 0) {
				return true;
			}
			return false;
		}
	}

	public Solver(float[][] matrix) {
		data = matrix;
		N = matrix.length;
		barrier = new CyclicBarrier(N+1, new Runnable() {
			public void run() {
				mergeRows();
			}

			private void mergeRows() {
				// TODO Auto-generated method stub
				System.out.println("row merge invoking");
			}
		});
		for (int i = 0; i < N; ++i)
			new Thread(new Worker(i)).start();

		waitUntilDone();
	}

	private void waitUntilDone() {
		try {
			barrier.await();
			System.out.println("all threads, done");
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
	
	public static void main(String[] args) {
		float[][] array = new float[6][6];
		Solver s = new Solver(array);
	}
}
result:

Thread-6  run over, and await
Thread-10  run over, and await
Thread-7  run over, and await
Thread-9  run over, and await
Thread-8  run over, and await
Thread-5  run over, and await
row merge invoking
all threads, done

Exchanger(交换器)

当两个线程在同一个数据缓存区上工作的时候,就可以使用交换器(Exchanger),典型的情况是,一个线程向缓冲区填入数据,另一个线程消耗这些数据。当他们都完成以后相互交换缓冲区。

SynchronousQueue(同步队列)

同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put方法时,它会阻塞直到另一个线程调用take方法为止,反之亦然。与Exchanger的情况不同,数据仅仅沿着一个方向传递,冲生产者到消费者。

即使SynchronousQueue类实现了BlockingQueue接口,概念上讲,它依然不是一个队列。它没有包含任何元素,它的size方法总是返回0.





java多线程之路之同步器—Core Java学习,布布扣,bubuko.com

java多线程之路之同步器—Core Java学习

上一篇:Spring Bean的作用域 实例


下一篇:回调与Java8的λ表达式