Java并发工具学习(八)——Semaphore和Condition

文章目录

前言

从这一篇博客开始,开始总结线程间协作,并发流程控制的工具类,这一篇主要介绍Semaphore和Condition

Semaphore

Semaphore中文译文为信号量,操作系统中也有同样的概念。类似于生活中常见的许可证的概念。在执行指定业务逻辑之前,需要先获取相关的许可证。在限流的使用场景中也有Semaphore的影子。

在Java中,信号量的作用是维护一个"许可证"的计数,线程可以获取许可证,获取成功信号量的个数就减一,同时线程也可以释放一个许可证,释放一个信号量就加一,当信号量所拥有的许可证数量为0,那么其他想要获取许可证的线程就需要等待,直到有另外的线程释放了许可证。

在初始化Semaphore的时候,需要初始化信号量个数,同时也可以设置对应的公平策略,如果指定为公平,那么Semaphore会把之前的等待线程放入到自己维护的一个FIFO队列中,信号量的分发会根据在FIFO队列中等待的时长来进行。

通过Semaphore获取信号量的方法有多个,大体上分为以下三类

方法 作用
acquire()
acquire(int permits)
获取信号量的时候阻塞,同时响应中断。这里的permits参数指的是想要获取信号量的个数
acquireUninterruptibly()
acquireUninterruptibly(int permits)
在获取信号量的时候不响应中断
tryAcquire()
tryAcquire(int permits)
tryAcquire(int permits, long timeout, TimeUnit unit)
tryAcquire(long timeout, TimeUnit unit)
尝试获取信号量,如果没有则不阻塞,可以指定获取信号量的时间

如果执行完逻辑,可以调用release来释放信号量

简单实例

/**
 * autor:liman
 * createtime:2021/11/28
 * comment:多个线程等待许可证
 */
public class SemaphoreDemo01 {

    static Semaphore semaphore = new Semaphore(3, true);//3个许可证,公平模式

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            executorService.submit(new Task());
        }
        //判断线程池是否停止
        executorService.shutdown();
        while(!executorService.isTerminated()){
            //线程池没有执行完成任务,主线程就空转,直到线程池运行结束
        }
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            try {
                semaphore.acquire();
                //获取多个信号量
                //semaphore.acquire(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release();
            //释放多个信号量
            //semaphore.release(3);
            System.out.println(Thread.currentThread().getName() + "释放了许可证");
        }
    }
}

信号量的获取个数可以灵活指定,这在一定程度上可以根据线程耗费资源的程度进行侧重,比如有两个任务,一个是TaskA ,一个是TaskB,TaskA的执行需要耗费较多的资源,TaskB需要耗费较少的资源,我们可以定义3个信号量。执行TaskA需要获取全部信号量才能执行,而执行TaskB只需要获取一个信号量,这样TaskA和TaskB就永远不可能同时执行。

信号量需要注意的是,在获取和释放的时候,数量必须要保证一致,每次获取多少信号量就要释放多少信号量,如果某个线程获取了2个信号量,但是释放的时候,只释放了1个,随着时间的推移,总会有一个时间点,其他线程的信号量不够用了,这样会导致程序卡死。

信号量的获取和释放,可以不在同一个线程中执行,也许是线程A获取了3个信号量,线程B释放了3个信号量,这也是可行的,但要保证逻辑合理。

Condition

Condition是一个接口,其通过ReentrantLock创建,其作用非常类似于Object中的wait和notify的组合,在Conditon中变成了await和signal。关于wait和notify这两者可以参考之前的博客Thread和Object中的方法。

Java并发工具学习(八)——Semaphore和Condition

简单基本实例

/**
 * autor:liman
 * createtime:2021/11/28
 * comment:普通的condition用法实例
 */
@Slf4j
public class ConditionDemo {

    private ReentrantLock lock = new ReentrantLock();
    //condition由锁来实例化
    private Condition condition = lock.newCondition();

    public void method01(){
        lock.lock();
        try{
            System.out.println("条件不满足,进入await");
            //这里
            condition.await();
            System.out.println("条件满足,开始继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void method02(){
        lock.lock();
        try{
            System.out.println("准备工作完成,唤醒其他线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ConditionDemo conditionDemo = new ConditionDemo();
        new Thread(()->{
            try {
                Thread.sleep(1000);
                conditionDemo.method02();//唤醒在condition上等待的线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //主线程调用method01先行阻塞,在1秒之后,被上面新建的线程唤醒
        conditionDemo.method01();
    }
}

与notify相对应的,Condition也存在signal和signalAll两个方法,其中signalAll会唤醒Condition上等待的所有线程,而signal是公平的,只会唤醒等待时间最长的线程。

利用Condition实现的生产者消费者模式

/**
 * autor:liman
 * createtime:2021/11/28
 * comment:condition实现生产者和消费者
 */
@Slf4j
public class ConditionProducerAndCustomer {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    class Customer extends Thread{
        @Override
        public void run() {
            consume();
        }

        private void consume(){
            while(true){//消费者一直消费
                lock.lock();
                try{
                    while(queue.size() == 0){
                        System.out.println("队列为空,消费者进入等待");
                        try {
                            notEmpty.await();//用notEmpty进行等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signal();//取出数据,同志爱生产者继续生产
                    System.out.println("从队列取走了一个数据,队列剩余"+queue.size()+"个数据");
                }finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread{
        @Override
        public void run() {
            produce();
        }

        private void produce(){
            while(true){//生产者一直生产
                lock.lock();
                try{
                    while(queue.size() == queueSize){
                        System.out.println("队列已满,生产者进入等待");
                        try {
                            notFull.await();//用 notFull 进行等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(new Random().nextInt(100));
                    notEmpty.signalAll();//取出数据,同志爱生产者继续生产
                    System.out.println("生产者向消费者插入了一个数据,队列剩余"+queue.size()+"个数据");
                }finally {
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        ConditionProducerAndCustomer conditionProducerAndCustomer = new ConditionProducerAndCustomer();
        Producer producer = conditionProducerAndCustomer.new Producer();
        Customer customer = conditionProducerAndCustomer.new Customer();
        customer.start();
        producer.start();
    }
}

如果说Lock是用来替代synchronized关键字的,那么Condition其实可以理解为用来替代Object.wati/notify的,在用法和性质上,二者几乎没有区别。

await方法会释放lock的锁,和Object.wait一样,不需要自己手动先行释放

调用await的时候,也必须要持有lock锁,否则会出现异常(虽然编译的时候不会出现异常)

总结

简单总结了一下Semaphore和Condition,下一篇总结一下CountDownLatch和CyclicBarrier

上一篇:在mac上安装nodejs


下一篇:在ASP.NET中跨页面实现多选