Java并发编程之CountDownLatch/CyclicBarrierDemo/SemaphoreDemo详解

CountDownLatch详解

什么是CountDownLatch?

让一线程阻塞直到另一些线程完成一系列操作才被唤醒。

CountDownLatch主要有两个方法(await(),countDown())。

当一个或多个线程调用await()时,调用线程会被阻塞。其它线程调用countDown()会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,继续执行。
附加:

latch
英 [lætʃ] 美 [lætʃ]
n. 门闩;插销;碰锁;弹簧锁
v. 用插销插上;用碰锁锁上

代码说明一 :班长锁门

假设一个自习室里有7个人,其中有一个是班长,班长的主要职责就是在其它6个同学走了后,关灯,锁教室门,然后走人,因此班长是需要最后一个走的,那么有什么方法能够控制班长这个线程是最后一个执行,而其它线程是随机执行的?

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {

        // 计数器
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 0; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 上完自习,离开教室");
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }

        countDownLatch.await();

        System.out.println(Thread.currentThread().getName() + "\t 班长最后关门");
    }
}


输出结果

0	 上完自习,离开教室
6	 上完自习,离开教室
4	 上完自习,离开教室
5	 上完自习,离开教室
3	 上完自习,离开教室
1	 上完自习,离开教室
2	 上完自习,离开教室
main	 班长最后关门

代码说明二:秦国统一六国

新建枚举类

import java.util.Objects;

public enum CountryEnum {
	ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");

	private Integer retcode;
	private String retMessage;

	CountryEnum(Integer retcode, String retMessage) {
		this.retcode = retcode;
		this.retMessage = retMessage;
	}

	public static CountryEnum forEach_countryEnum(int index) {
		
		CountryEnum[] myArray = CountryEnum.values();
		
		for(CountryEnum ce : myArray) {
			if(Objects.equals(index, ce.getRetcode())) {
				return ce;
			}
		}
		
		return null;
	}

	public Integer getRetcode() {
		return retcode;
	}

	public void setRetcode(Integer retcode) {
		this.retcode = retcode;
	}

	public String getRetMessage() {
		return retMessage;
	}

	public void setRetMessage(String retMessage) {
		this.retMessage = retMessage;
	}

}

逻辑类

import java.util.concurrent.CountDownLatch;

public class UnifySixCountriesDemo {

	public static void main(String[] args) throws InterruptedException {
        // 计数器
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "国被灭了!");
                countDownLatch.countDown();
            }, CountryEnum.forEach_countryEnum(i).getRetMessage()).start();
        }

        countDownLatch.await();

        System.out.println(Thread.currentThread().getName() + " 秦国统一中原。");
	}
}


测试结果

齐国被灭了!
燕国被灭了!
楚国被灭了!
魏国被灭了!
韩国被灭了!
赵国被灭了!
main 秦国统一中原。

什么是CyclicBarrierDemo?

CyclicBarrier的字面意思就是可循环(Cyclic)使用的屏障(Barrier)。
它要求做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await方法。

CyclicBarrier与CountDownLatch的区别:CyclicBarrier可重复多次,而CountDownLatch只能是一次。

代码说明一:集齐7个龙珠,召唤神龙

程序演示集齐7个龙珠,召唤神龙

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class SummonTheDragonDemo {
    public static void main(String[] args) {
        /**
         * 定义一个循环屏障,参数1:需要累加的值,参数2 需要执行的方法
         */
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("召唤神龙");
        });

        for (int i = 1; i <= 7; i++) {
            final Integer tempInt = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t 收集到 第" + tempInt + "颗龙珠");

                try {
                    // 先到的被阻塞,等全部线程完成后,才能执行方法
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}

测试结果

2	 收集到 第2颗龙珠
6	 收集到 第6颗龙珠
1	 收集到 第1颗龙珠
7	 收集到 第7颗龙珠
5	 收集到 第5颗龙珠
4	 收集到 第4颗龙珠
3	 收集到 第3颗龙珠
召唤神龙

代码说明二:模拟赛马

import java.util.concurrent.*;
import java.util.*;

class Horse implements Runnable {
	private static int counter = 0;
	private final int id = counter++;
	private int strides = 0;
	private static Random rand = new Random(47);
	private static CyclicBarrier barrier;

	public Horse(CyclicBarrier b) {
		barrier = b;
	}

	public synchronized int getStrides() {
		return strides;
	}

	public void run() {
		try {
			while (!Thread.interrupted()) {//没有中断,就不断循环
				synchronized (this) {
                    //模拟马单位时间的移动距离
					strides += rand.nextInt(3); // Produces 0, 1 or 2
				}
				barrier.await();//<---等待其他马到齐到循环屏障
			}
		} catch (InterruptedException e) {
			// A legitimate way to exit
		} catch (BrokenBarrierException e) {
			// This one we want to know about
			throw new RuntimeException(e);
		}
	}

	public String toString() {
		return "Horse " + id + " ";
	}

	public String tracks() {
		StringBuilder s = new StringBuilder();
		for (int i = 0; i < getStrides(); i++)
			s.append("*");
		s.append(id);
		return s.toString();
	}
}

public class HorseRace {
	static final int FINISH_LINE = 75;
	private List<Horse> horses = new ArrayList<Horse>();
	private ExecutorService exec = Executors.newCachedThreadPool();
	private CyclicBarrier barrier;

	public HorseRace(int nHorses, final int pause) {
        //初始化循环屏障
		barrier = new CyclicBarrier(nHorses, new Runnable() {
			// 循环多次执行的任务
			public void run() {
                
                // The fence on the racetrack
				StringBuilder s = new StringBuilder();
				for (int i = 0; i < FINISH_LINE; i++)
					s.append("="); 
				System.out.println(s);
                
                //打印马移动距离
				for (Horse horse : horses)
					System.out.println(horse.tracks());
                
                //判断有没有马到终点了
				for (Horse horse : horses)
					if (horse.getStrides() >= FINISH_LINE) {
						System.out.println(horse + "won!");
						exec.shutdownNow();// 有只马跑赢了,所有任务都结束了
						return;
					}
				
                try {
					TimeUnit.MILLISECONDS.sleep(pause);
				} catch (InterruptedException e) {
					System.out.println("barrier-action sleep interrupted");
				}
			}
		});
		// 开跑!
		for (int i = 0; i < nHorses; i++) {
			Horse horse = new Horse(barrier);
			horses.add(horse);
			exec.execute(horse);
		}
	}

	public static void main(String[] args) {
		int nHorses = 7;
		int pause = 200;
		new HorseRace(nHorses, pause);
	}
}

测试结果

...省略一些...
===========================================================================
**********************************************************0
************************************************************1
******************************************************2
***********************************************************************3
*************************************************************************4
*****************************************************************5
*****************************************************************6
===========================================================================
**********************************************************0
************************************************************1
*******************************************************2
***********************************************************************3
**************************************************************************4
*****************************************************************5
*******************************************************************6
===========================================================================
***********************************************************0
*************************************************************1
*******************************************************2
***********************************************************************3
****************************************************************************4
*******************************************************************5
********************************************************************6
Horse 4 won!

什么是SemaphoreDemo?

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
附加:

semaphore
英 [ˈseməfɔː®] 美 [ˈseməfɔːr]
n. 信号标;旗语
v. 打旗语;(用其他类似的信号系统)发信号

正常的锁(concurrency.locks或synchronized锁)在任何时刻都只允许一个任务访问一项资源,
而 Semaphore允许n个任务同时访问这个资源。

代码说明一:抢车位

模拟一个抢车位的场景,假设一共有6个车,3个停车位

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {

        /**
         * 初始化一个信号量为3,默认是false 非公平锁, 模拟3个停车位
         */
        Semaphore semaphore = new Semaphore(3, false);

        // 模拟6部车
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    // 代表一辆车,已经占用了该车位
                    semaphore.acquire(); // 抢占

                    System.out.println(Thread.currentThread().getName() + "\t 抢到车位");

                    // 每个车停3秒
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName() + "\t 离开车位");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放停车位
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }
    }
}

测试结果

1	 抢到车位
2	 抢到车位
0	 抢到车位
0	 离开车位
2	 离开车位
1	 离开车位
5	 抢到车位
4	 抢到车位
3	 抢到车位
5	 离开车位
4	 离开车位
3	 离开车位

上一篇:CountDownLatch源码解析


下一篇:你觉得我的这段Java代码还有优化的空间吗?