Java并发编程:Semaphore、CountDownLatch、CyclicBarrier

首先我们来实现一个功能:当我们启动一个系统的时候需要初始化许多数据,这时候我们可能需要启动很多线程来进行数据的初始化,只有这些系统初始化结束之后才能够启动系统。其实在Java的类库中已经提供了Semaphore、CountDownLatch、CyclicBarrier这3个类来帮我们实现这样类似的功能了。

一、信号灯 Semaphore

Semaphore sp = new Semaphore(int permits) 接受一个整数型的参数,表示有几盏灯。线程可以通过semaphore.acquire()获取一盏信号灯,通过semaphore.release()释放。它与 Lock 的区别就是 Lock 只能是一个锁,而 Semaphore 更像是多个锁的一个集合,像一个阻塞队列一样,当队列中的锁用完了,而你又需要锁的时候,你就必须等待其他的线程释放锁。

下面我们声明了只有 1 个灯的信号灯,然后启动 3 个线程同时去获取信号灯,另外还启动了 1 个线程每 2 秒就释放一次信号灯。

/**
* Semaphore 实现信号灯
* @author chenyr
* @time 2014-12-24 下午08:13:32
* All Rights Reserved.
*/
public class Semaphore1 { public static void main(String[] args) {
final Semaphore sp = new Semaphore(1); //只声明一盏信号灯
//业务线程1
new Thread(new Runnable(){
public void run(){
try{
System.out.println(Thread.currentThread().getName() + "准备获取信号灯-A");
sp.acquire(); //获取信号灯
System.out.println(Thread.currentThread().getName() + "已获取信号灯-A");
}catch(Exception e){
e.printStackTrace();
}
}
}).start();
//业务线程2
new Thread(new Runnable(){
public void run(){
try{
System.out.println(Thread.currentThread().getName() + "准备获取信号灯-B");
sp.acquire(); //获取信号灯
System.out.println(Thread.currentThread().getName() + "已获取信号灯-B");
}catch(Exception e){
e.printStackTrace();
}
}
}).start();
//业务线程3
new Thread(new Runnable(){
public void run(){
try{
System.out.println(Thread.currentThread().getName() + "准备获取信号灯-C");
sp.acquire(); //获取信号灯
System.out.println(Thread.currentThread().getName() + "已获取信号灯-C");
}catch(Exception e){
e.printStackTrace();
}
}
}).start();
//检查线程
new Timer().schedule(new TimerTask(){
public void run(){
System.out.println("每10s释放一次信号灯");
sp.release();
System.out.println("信号灯已释放");
}
}, 2000, 2000); //每2秒释放一次信号灯
}
}

执行结果如下:

Thread-0准备获取信号灯-A
Thread-0已获取信号灯-A
Thread-1准备获取信号灯-B
Thread-2准备获取信号灯-C
每2s释放一次信号灯
信号灯已释放
Thread-1已获取信号灯-B
每2s释放一次信号灯
信号灯已释放
Thread-2已获取信号灯-C
每2s释放一次信号灯
信号灯已释放

从结果可以看出一开始 Thread-0 获得了锁,Thread-1 和 Thread-2 都在等待获取,直到检查线程 2 秒后释放信号灯,Thread-1 才获得了信号灯。而 Thread-2 是在检查线程再次释放锁的时候获取到的。

二、倒计时门栓 CountDownLatch

接受一个整数型的参数,可以通过countDownLatch.countDown()减少一个计时,countDownLatch.await()进行线程等待,等到countDownLatch中的计数到0之后就会恢复执行。CountDownLatch 与 Semaphore 的作用完全不同,CountDownLatch 是类似于集合点的一个类,当调用者到达一个数目就会触发一些操作。而 Semaphore 是一个类似于锁队列的东西,锁用完了就是用完了,而不会触发操作。

下面我们模拟跑步比赛的例子,用 3 个线程分别模拟 3 个运动员。而这其中有 3 个节点,分别是:

1、要等 3 个运动员都准备好了,裁判才能发开跑命令

2、3 个运动员要等裁判发令才能跑

3、裁判员要等 3 个运动员都到终点了才能宣布成绩

这 3 个时间点我们分别用一个 CountDownLatch 对象来表示,具体实现如下。

/**
* CountDownLatch同步工具
* 实例:模拟运动员跑步的例子(等待裁判发令,3个运动员才跑。等到3个运动员都跑完了,裁判才宣布成绩)
* @author chenyr
* @time 2014-12-25 下午07:49:25
* All Rights Reserved.
*/
public class CountDownLatch1 { public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool(); //要等到3个运动都准备好了,裁判才能命令
final CountDownLatch waitCd = new CountDownLatch(3);
//裁判发1次命令,运动员就开始跑
final CountDownLatch orderCd = new CountDownLatch(1);
//要等到3个运动员到达终点,裁判才公布成绩
final CountDownLatch scoreCd = new CountDownLatch(3); //模拟3个运动员
for(int i = 1; i <= 3; i++){
final int count = i;
Runnable runnable = new Runnable(){
public void run(){
try{
System.out.println("运动员" + count + "站在起跑线准备比赛了!");
waitCd.countDown(); //准备好
orderCd.await(); Thread.sleep((long)(Math.random() * 10000)); //模拟运动员隔多久听到命令
System.out.println("运动员" + count + "听到开跑命令,开跑!"); Thread.sleep((long)(Math.random() * 10000)); //模拟运动员用多长时间跑到终点
System.out.println("运动员" + count + "跑到了终点!");
scoreCd.countDown(); //跑到终点
}catch(Exception e){
e.printStackTrace();
}
}
};
service.execute(runnable);
} //模拟裁判
Runnable runnable = new Runnable(){
public void run(){
try{
System.out.println("裁判已到位,正在等待运动员做好准备!");
waitCd.await();
System.out.println("所有运动员已经就位,裁判准备发令!");
Thread.sleep((long)(Math.random() * 10000)); //模拟裁判的准备时间
System.out.println("裁判:比赛开始! 跑!跑!跑!");
orderCd.countDown(); //开跑
scoreCd.await();
System.out.println("所有运动员已经到达终点,裁判宣布成绩!");
}catch(Exception e){
e.printStackTrace();
}
}
};
service.execute(runnable); service.shutdown();
}
}

运行结果如下:

运动员2站在起跑线准备比赛了!
运动员3站在起跑线准备比赛了!
裁判已到位,正在等待运动员做好准备!
运动员1站在起跑线准备比赛了!
所有运动员已经就位,裁判准备发令!
裁判:比赛开始! 跑!跑!跑!
运动员1听到开跑命令,开跑!
运动员2听到开跑命令,开跑!
运动员1跑到了终点!
运动员3听到开跑命令,开跑!
运动员2跑到了终点!
运动员3跑到了终点!
所有运动员已经到达终点,裁判宣布成绩!

三、栅栏  CyclicBarrier

CyclicBarrier cb = new CyclicBarrier(int parties) 接受一个整数型的参数。线程可以通过cb.await()等待,只要正在等待的线程数目达到设定的参数,所有等待的线程就会恢复执行。CyclicBarrier 与 CountDownLatch 相似,都是要达到一样的人数才可以执行某些操作,只不过 CountDownLatch 是减操作,而 CyclicBarrier 是加操作。与 CyclicBarrier 相似的事件是集合点,即我们 5 个人周末一起去爬山,我们大家都要在某个地方等 5 个人到齐了再出发。

下面设置了两个集合点,只有当全部人到齐了第一个集合点之后,才会继续前往下一个集合点。

/**
* CyclicBarrier同步工具
* 等待所有线程到达之后再继续执行
* @author chenyr
* @time 2014-12-25 下午07:30:00
* All Rights Reserved.
*/
public class CyclicBarrier1 { public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3); //一共要等到几个线程才继续执行
for(int i = 1; i <= 3; i++){
Runnable runnable = new Runnable(){
public void run(){
try{
Thread.sleep((long)(Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + "已经到达集合点A,正在等待。目前已有" + (cb.getNumberWaiting() + 1) + "个线程在等待" );
cb.await();
System.out.println("全部线程到达A集合点"); Thread.sleep((long)(Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + "已经到达集合点B,正在等待。目前已有" + (cb.getNumberWaiting() + 1) + "个线程在等待" );
cb.await();
System.out.println("全部线程到达B集合点");
}catch(Exception e){
e.printStackTrace();
}
}
};
service.execute(runnable);
}
}
}

运行结果如下:

pool-1-thread-2已经到达集合点A,正在等待。目前已有1个线程在等待
pool-1-thread-1已经到达集合点A,正在等待。目前已有2个线程在等待
pool-1-thread-3已经到达集合点A,正在等待。目前已有3个线程在等待
全部线程到达A集合点
全部线程到达A集合点
全部线程到达A集合点
pool-1-thread-1已经到达集合点B,正在等待。目前已有1个线程在等待
pool-1-thread-3已经到达集合点B,正在等待。目前已有2个线程在等待
pool-1-thread-2已经到达集合点B,正在等待。目前已有3个线程在等待
全部线程到达B集合点
全部线程到达B集合点
全部线程到达B集合点

四、CountDownLatch 和 CyclicBarrier 的区别

一般情况下对于两个非常相似的类,我们一般都会想当然地去把他们进行类比。对于 CountDownLatch 和 CyclicBarrier 两个类,我们可以看到CountDownLatch 类都是一个类似于集结点的概念,很多个线程做完事情之后等待其他线程完成,全部线程完成之后再恢复运行。不同的是CountDownLatch 类需要你自己调用 countDown() 方法减少一个计数,然后调用 await() 方法即可。而 CyclicBarrier 则直接调用 await() 方法即可。

所以从上面来看,CountDownLatch 更倾向于多个线程合作的情况,等你所有东西都准备好了,我这边就自动执行了。而 CyclicBarrier 则是我们都在一个地方等你,大家到齐了,大家再一起执行。

参考资料:http://www.cnblogs.com/dolphin0520/p/3920397.html

上一篇:java并发中CountDownLatch的使用


下一篇:spark、hadoop集群添加节点