信号量(Semaphore)、闭锁(Latch)、栅栏(Barrier)

目录

Semaphore、Barrier、Latch都属于同步工具类

1、信号量(Semaphore)

描述

​ 计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个制定操作的数量。还可以用来实现资源池。

场景

​ 信号量就是一个计数器,所以应用很广泛。

例如:我们构建一个有界队列,在队列满的时候希望阻塞而不是中断。那么信号量的大小就是队列的边界。

例如:网络请求中,每秒只允许100个请求进入系统,超过100个则需要阻塞,直到有请求完成或释放信号。

Semaphore

介绍:计数信号量;

主要方法:

// 方法1:创建一个计数为bound的信号量
Semaphore sem = new Semaphore(bound);
// 方法2:获取一个信号量(如果sem中的计数已经为0,则此方法阻塞,直到sem中有空闲的信号量)
sem.acquire();  
// 方法3:释放一个信号量
sem.release(); 

使用方式:创建一个计数为n的信号量,然后当有请求或者有占用时,调用方法2拿走一个信号量。

案例:使用信号量来维护set的大小。

/**
 * 使用信号量(semaphore)构建一个有界容器。信号量的值为容器的大小
 * @Author: dhcao
 * @Version: 1.0
 */
public class BoundedHashSet<T> {
    
    private final Set<T> set;
    private final Semaphore sem;
    
    public BoundedHashSet(int bound) {
        // 初始化信号量的值,作为set容器的边界
        this.set = Collections.synchronizedSet(new HashSet<T>());
        this.sem = new Semaphore(bound);
    }

    /**
     * 如果添加成功,则信号量的可用值减少一个;否则,被acquire占用的信号量要释放。
     * @param o
     * @return
     * @throws InterruptedException
     */
    public boolean add(T o) throws InterruptedException{
        
        // 1. 尝试取获取一个信号量,如果获取不到,此方法阻塞,直到能够获取到信号量
        sem.acquire();
        boolean wasAdded = false;
        
        try{
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            // 2. finaly会在return之前执行,但是不会改变return的值。
            if (!wasAdded) {
                // 3. 如果添加失败,则set没有增加数据,则信号量不需要计数,所以释放它。
                sem.release();
            }
        }
    }

    /**
     * 容器删除一个数据是,要释放对应的信号量
     * @param o
     * @return
     */
    public boolean remove(Object o){
        final boolean remove = set.remove(o);

        if (remove) {
            sem.release();
        }
        
        return remove;
    }
    
}

2、闭锁(Latch)

描述

​ 闭锁是以一种同步工具类,它的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直关闭,并且没有任何线程能够通过,当达到结束状态时,这扇门会打开并允许所有线程通过。(不能被重置,终态不可逆)

场景

​ 闭锁的用法很多

例如:并发测试中,我们希望2000个线程同时开始执行。那么闭锁的结束状态:线程数目 = 2000。

例如:资源加载中,我们希望所有菜单加载完毕后才允许访问。那么闭锁的结束状态:所有菜单已加载。

例如:游戏加载中(lol或者dota??),我们希望所有玩家都就绪之后才开始游戏。那么闭锁的结束状态:所有玩家均就绪

CountDownLatch

介绍:顾名思义,这是一个计数器闭锁。

主要方法:

// 方法1:构造函数,计数器为n
CountDownLatch startGate = new CountDownLatch(int n);
// 方法2:线程等待
startGate.await();
// 方法3:计数减1
startGate.countDown();

使用方式:线程调用方法2则开始等待,直到线程startGate 计数器为0;

案例1:创建n条线程,并让他们并发执行(同时开始,就像跑步一样,所有人就位之后同时开始)

    /**
     * nThreads条线程同时开始执行任务task
     * @param nThreads 线程数量
     * @param task 需要执行的任务
     * @return 
     * @throws InterruptedException 线程中断异常交由客户端处理
     */
    public static void timeTasks(int nThreads, final Runnable task) throws InterruptedException{

        final CountDownLatch startGate = new CountDownLatch(1);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(new Runnable() {
                public void run() {
                    try {
                        // 1. 每条线程出来都开始等待;for循环共创建nThread条线程,线程就绪之后等待信号
                        startGate.await();
                        task.run();
                    }catch (InterruptedException ignored){

                    }
                }
            });
            t.start();
        }
        
        // 2.startGate - 1 = 0;在1中等待的线程开始执行。即已经准备好的nThreads条线程同时开始执行
        startGate.countDown();
    }

案例2: 计算nThreads条线程并发执行任务的时间

    /**
     * 测试并发执行任务的时间(包含了创建线程的时间)
     * @param nThreads 线程数量
     * @param task 需要执行的任务
     * @return 任务完成时间
     * @throws InterruptedException 线程中断异常交由客户端处理
     */
    public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException{

        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(new Runnable() {
                public void run() {

                    try {
                        // 1. 每条线程出来都开始等待;for循环共创建nThread条线程,所有
                        startGate.await();
                        try{
                            task.run();
                        }finally {
                            // 2. 每条线程完成run任务之后,先等待着,直到nThreads线程都完成;
                            endGate.countDown();
                        }
                    }catch (Exception ignored){

                    }
                }
            });
            t.start();
        }

        long start = System.nanoTime();
        
        // 3.所有线程准备好之后,统一放行
        startGate.countDown();
        // 4.nThreads条线程都完成任务之后,endGate统计最后时间!
        endGate.await();
        long end = System.nanoTime();

        return end - start;
    }

上述案例在互联网项目中做并发测试时很有意义,所以选择了这2个例子。通常测试并发时,如果只是for循环创建线程再执行,其实线程还是有先后顺序的。如果使用Executor框架,代码又显得过于复杂,所以我常使用上述案例来做并发测试。

3、栅栏(Barrier)

描述

​ 栅栏,顾名思义,是一个拦截屏障,类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

场景

​ 栅栏应用于很多依赖性场景

例如:考试时当所有考生都交卷之后,才能开始阅卷。

例如:约定所有人都到达公司楼下,才发车去郊游。

CyclicBarrier

介绍:顾名思义,这是一个循环屏障。它可以循环运行(区别于CountDownLatch终态之后不可复用)

主要方法:

// 方法1:创建一个栅栏;run为栅栏拦截的线程到齐之后进行的操作;n 为线程数目
CyclicBarrier barrier = new CyclicBarrier(int n, Runnable run);
// 方法2:被此栅栏约束的线程在等待
barrier.await();

使用方式:创建一个栅栏,然后将此栅栏传递给线程,代表此类型线程要被该栅栏约束。

案例

/**
 * 5个同事去吃饭,约定在饭店门口见面
 * @Author: dhcao
 * @Version: 1.0
 */
public class BarrierUtil {

    /**
     * 定义同事类
     */
    private static class Colleague extends Thread{

        // 要约定栅栏;所有的同事实现类都要被此栅栏约束
        private CyclicBarrier barrier;

        Colleague(CyclicBarrier barrier, String name){
            super(name);
            this.barrier = barrier;
        }


        @Override
        public void run() {

            try {

                System.out.println("同事:" + getName() + " 已经到达约定地点,等待其他人");
                barrier.await();
                System.out.println("人齐了,去吃饭");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        int colleagueNum = 5;
        // 1. 定义一个栅栏:5条线程到位之后执行run方法(由最后到达的线程执行)
        CyclicBarrier barrier = new CyclicBarrier(colleagueNum, new Runnable() {
            @Override
            public void run() {
                System.out.println("执行此操作的线程:" + Thread.currentThread().getName() + "。所有人均已到达约定地点;领头进门....");
            }
        });

        // 2. 将此栅栏约束在线程上(如果i<其他值)
        for (int i = 0; i < colleagueNum; i++) {
            new Colleague(barrier,"同事" + i).start();
        }
    }
}
上一篇:CyclicBarrier一组线程相互等待


下一篇:python multiprocessing.Barrier