Java并发编程之Condition

1.使用synchronized中的等待和唤醒实现消费者和生产者模式

/**
 * 使用Synchronized实现消费者生产者模式
 */
public class SynchronizedDemo {

    static List<Integer> list = new ArrayList<Integer>();

    private static int maxNum = 5;


    // 消费者
    private void Consumer(String name){
        synchronized (list){
            while(list.isEmpty()){
                // 如果list为空,调用wait等待,并且释放锁
                try {
                    System.out.println("----当前产品数量为0个,"+name+"无法消费");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.remove(0);
            System.out.println("当前产品数量为"+list.size()+"个,"+name+"消费1个产品");
            list.notifyAll();
        }
    }

    // 生产者
    private void Producer(String name){
        synchronized (list){
            while(list.size()>maxNum){
                // 如果list容量大于5个,那么就等待,直到有空余容量再生产产品
                try {
                    System.out.println("++++当前容量已满,"+name+"无法生产");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(1);
            System.out.println("当前容量为,"+list.size()+","+name+"生产1个产品");
            list.notifyAll();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedDemo demo = new SynchronizedDemo();

        Thread consumer1 = new Thread(()->demo.Consumer("consumer1"));
        Thread consumer2 = new Thread(()->demo.Consumer("consumer2"));
        Thread consumer3 = new Thread(()->demo.Consumer("consumer3"));
        Thread consumer4 = new Thread(()->demo.Consumer("consumer4"));

        Thread producer1 = new Thread(()->demo.Producer("producer1"));
        Thread producer2 = new Thread(()->demo.Producer("producer2"));
        Thread producer3 = new Thread(()->demo.Producer("producer3"));
        Thread producer4 = new Thread(()->demo.Producer("producer4"));

        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();

        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();

        TimeUnit.SECONDS.sleep(5);
        System.out.println("最后剩余多少个产品:"+demo.list.size());


    }
}

Java并发编程之Condition

如上图,假设有一个公共的容量有限的池子,有两种人,一种是生产者,另一种是消费者。需要满足如下条件:

  1. 生产者产生资源往池子里添加,前提是池子没有满,如果池子满了,则生产者暂停生产,直到自己的生成能放下池子。
  2. 消费者消耗池子里的资源,前提是池子的资源不为空,否则消费者暂停消耗,进入等待直到池子里有资源数满足自己的需求。

注意:

  1. wait()方法和notifyAll()方法必须放在同步块内调用(synchronized块内),否则会报错。
  2. 调用wait()方法,线程会进入等待状态,同时释放锁。
  3. 调用notify()或者notifyAll()会唤醒正在等待的线程。

2.Condition

    任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以
实现等待/通知模式
Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Object的监视器方法与Condition接口的对比:

Java并发编程之Condition

3.Condition接口与示例

    Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到
Condition对象关联的锁
Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创
建出来的
,换句话说,Condition是依赖Lock对象的

使用Condition接口改写上面的消费者生产者代码:

/**
 * 使用Condition实现消费者生产者模式
 */
public class ConditionDemo {

    static List<Integer> list = new ArrayList<Integer>();

    static ReentrantLock lock = new ReentrantLock();

    static Condition condition = lock.newCondition();


    private static int maxNum = 5;


    // 消费者
    private void Consumer(String name){
        lock.lock();
        try{
            while(list.isEmpty()){
                System.out.println("----当前产品数量为0个,"+name+"进入等待队列");
                condition.await();
            }
            list.remove(0);
            System.out.println("当前产品数量为"+list.size()+"个,"+name+"消费1个产品");
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    // 生产者
    private void Producer(String name){
        lock.lock();
        try{
            while(list.size()>maxNum){
                System.out.println("++++当前容量已满,"+name+"进入等待队列");
                condition.await();
            }
            list.add(1);
            System.out.println("当前容量为,"+list.size()+","+name+"生产1个产品");
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo demo = new ConditionDemo();

        Thread consumer1 = new Thread(()->demo.Consumer("consumer1"));
        Thread consumer2 = new Thread(()->demo.Consumer("consumer2"));
        Thread consumer3 = new Thread(()->demo.Consumer("consumer3"));
        Thread consumer4 = new Thread(()->demo.Consumer("consumer4"));

        Thread producer1 = new Thread(()->demo.Producer("producer1"));
        Thread producer2 = new Thread(()->demo.Producer("producer2"));
        Thread producer3 = new Thread(()->demo.Producer("producer3"));
        Thread producer4 = new Thread(()->demo.Producer("producer4"));

        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();

        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();

        TimeUnit.SECONDS.sleep(5);
        System.out.println("最后剩余多少个产品:"+demo.list.size());

    }
}

    这里创建了4个消费者4个生产者,有个容量池list,当list为空时,调用condition.await()进入等待队列,其他线程调用condition.signal()唤醒一个等待在Contidion上的线程,该线程从等待方法返回前必须获得与Contidion相关联的锁。当list大于最大容量时,生产者调用condition.await()进行等待,直到其他线程调用condition.signal()唤醒当前线程,并且去获取相关联的锁,只有获取到了锁才会从await方法返回。
    获取一个Condition必须通过Lock的newCondition()方法

    Condition的(部分)方法以及描述:

void await() throws InterruptedException

  1. 当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
  2. 其他线程调用interrupt()可以中断正在等待的线程。
  3. 线程从await方法返回,说明线程已经获取到了锁。

void awaitUninterruptibly()

    当前线程进入等待状态直到被通知,该方法对中断不敏感,也就是在等待状态中不能被中断。

long awaitNanos(long nanosTimeout) throws InterruptedException

    当前线程进入等待状态,直到被通知,中断,或者超时。返回值表示剩余时间,如果返回值为0或者负数,说明已经超时了。如果在nanosTimeout之前就被唤醒了,那么返回值就是nanosTimeout-实际耗时。

boolean await(long time, TimeUnit unit) throws InterruptedException

    当前线程进入等待状态,直到被通知,中断,或者超时。支持自定义时间单位,false:表示方法超时之后自动返回的,true:表示等待还未超时时,await方法就返回了(超时之前,被其他线程唤醒了)。

boolean awaitUntil(Date deadline) throws InterruptedException

    当前线程进入等待状态,直到被通知,中断,或者到将来某个时间,如果没有到指定时间就被通知,返回true,如果到了某个时间,还未被唤醒,就返回false。

void signal()

    唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁。

void signalAll()

    唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须是获得了与Condition相关联的锁。

4.同一个锁支持创建多个Condition

    使用两个Condition改写上面的消费者生产者:

/**
 * 使用Condition实现消费者生产者模式
 */
public class ConditionDemo {

    static List<Integer> list = new ArrayList<Integer>();

    static ReentrantLock lock = new ReentrantLock();

    // 当队列已满时,生产者不能生产产品,在full队列中等待
    static Condition full = lock.newCondition();

    // 当队列为空时,消费者不能消费产品,在empty队列中等待
    static Condition empty = lock.newCondition();


    private static int maxNum = 5;


    // 消费者
    private void Consumer(String name){
        lock.lock();
        try{
            while(list.isEmpty()){
                System.out.println("----当前产品数量为0个,"+name+"进入等待队列");
                empty.await();
            }
            list.remove(0);
            System.out.println("当前产品数量为"+list.size()+"个,"+name+"消费1个产品");
            // full队列中等待的线程
            full.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    // 生产者
    private void Producer(String name){
        lock.lock();
        try{
            while(list.size()>maxNum){
                System.out.println("++++当前容量已满,"+name+"进入等待队列");
                full.await();
            }
            list.add(1);
            System.out.println("当前容量为,"+list.size()+","+name+"生产1个产品");
            empty.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo demo = new ConditionDemo();

        Thread consumer1 = new Thread(()->demo.Consumer("consumer1"));
        Thread consumer2 = new Thread(()->demo.Consumer("consumer2"));
        Thread consumer3 = new Thread(()->demo.Consumer("consumer3"));
        Thread consumer4 = new Thread(()->demo.Consumer("consumer4"));

        Thread producer1 = new Thread(()->demo.Producer("producer1"));
        Thread producer2 = new Thread(()->demo.Producer("producer2"));
        Thread producer3 = new Thread(()->demo.Producer("producer3"));
        Thread producer4 = new Thread(()->demo.Producer("producer4"));

        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();

        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();

        TimeUnit.SECONDS.sleep(5);
        System.out.println("最后剩余多少个产品:"+demo.list.size());

    }
}

    示例代码中用了两个Condition,因此会有两个等待队列,当消费者消费数据时,如果list为空,那么就进入empty队列中等待,当生产者生产数据时,如果list容量已满,那么就进入full队列中等待。此时生产者和消费者都在各自的等待队列中等待如果是synchronized的话,只支持一个等待队列,Condition可以支持多个等待队列

总结

1. 使用condition的步骤:创建condition对象,获取锁,然后调用condition的方法

2. 一个ReentrantLock支持创建多个condition对象

3. void await()throwsInterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持线程中断

4. void awaitUninterruptibly();方法会释放锁,让当前线程等待,支持唤醒,不支持线程中断

5. long awaitNanos(longnanosTimeout)throwsInterruptedException;参数为纳秒,此方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为负数;超时之前被唤醒返回的,结果为正数(表示返回时距离超时时间相差的纳秒数)

6. boolean await(longtime,TimeUnitunit)throwsInterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true

7. boolean awaitUntil(Datedeadline)throwsInterruptedException;参数表示超时的截止时间点,方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true

8. void signal();会唤醒一个等待中的线程,然后被唤醒的线程会被加入同步队列,去尝试获取锁

9. void signalAll();会唤醒所有等待中的线程,将所有等待中的线程加入同步队列,然后去尝试获取锁






原文资料:

java并发编程的艺术

https://mp.weixin.qq.com/s/n7OWc69dLAcesiBertbjDA

上一篇:Spring Boot 2 实战:使用 @Condition 注解来根据条件注入 Bean


下一篇:DevOps