并发编程 — Condition 使用及原理详解

一、概述

Condition本身也是一个接口,其功能和Object类中 wait/notify类似。Object中的 wait 和 notify方法需要与synchronized关键字配合使用,可实现线程间的等待/通知功能。Condition 接口也是提供了类似的功能,但是需要与Lock配合使用,可实现等待/通知模式。

二、Condition 接口与示例

 Condition 定义了等待 / 通知两种类型的方法,当前线程调用这些方法时,需要提前获取Condition 对象关联的锁。Condition 对象是调用 Lock对象的 newCondition() 方法创建出来的,也就是说,Condition 是依赖 Lock 对象的。

Condition 的使用很简单,需要注意的是在调用方法前获取锁。下面我们通过一个有界对垒的实例来了解一下Condition的使用方式。有界队列是指当队列空时,获取槽中将被阻塞,直到队列中有新元素,当队列已满时,队列插入操作将被阻塞,直到队列出现空位。代码如下所示:

public class SyncBoundedQueue<E> {

    private LinkedList<E> items;
    private int size; // 记录队列的初始大小
    private Lock lock = new ReentrantLock(); //定义一个锁
    private Condition notFull = lock.newCondition(); //队列不满
    private Condition notEmpty = lock.newCondition(); // 队列不空

    public SyncBoundedQueue(int size){
        items = new LinkedList<>();
        this.size = size;
    }
    //添加元素
    public void add(E e) throws InterruptedException{
        lock.lock();
        try {
            // 当队列已满时,需要阻塞等待,等待其他线程获取元素,流程空位
             while (items.size() == size){
                 notFull.await();
             }
             items.addLast(e); // 有空位则添加元素
             notEmpty.signalAll(); // 通知获取元素的线程现在队列已经有元素了
        }finally {
            lock.unlock();
        }
    }

    // 获取元素
    public E get() throws InterruptedException{
        lock.lock();
        try {
            // 如果队列中没有元素,则一直等待
             while (items.size() == 0){
                 notEmpty.await();
             }
             E e = items.removeFirst(); // 移除元素
             notFull.signalAll(); // 移除元素有空位,通知唤醒添加元素的线程
             return e; // 返回元素
        }finally {
            lock.unlock();
        }
    }
}

三、实现原理分析

1、基本原理分析

因为Condition 必须和 Lock一起使用,所以Condition的实现也是Lock的一部分。下面先分别看一下互斥锁中Condition的构造。

    // ReentrantLock 中的方法
    public Condition newCondition() {
        return sync.newCondition(); 
    }

   abstract static class Sync extends AbstractQueuedSynchronizer {
        .....

        final ConditionObject newCondition() {
            return new ConditionObject(); // 创建 Condition 对象
        }

        .....
    }

有上面代码可知,调用 Lock.newConditon() 方法实际是调用的 Sync 内部类中的方法创建了Condition实现了 ConditionObject()。ConditionObject 类是 同步器 AQS 的内部类,因为 Condition 的操作需要相关联的锁,每一个Condition对象上面,都阻塞了多个线程。因此,在 ConditionObject 内部也有一个双向链表组成的队列,如下所示:

    public class ConditionObject implements Condition {

        private transient Node firstWaiter;
      
        private transient Node lastWaiter;
	}

 由代码实现可知 ConditionObject 内部的等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程是在 Condition 对象上等待线程,如果一个线程调用了 Condition.await() 方法,那么该线程将会释放锁、构造成节点加入等待对垒并进行等待状态。一个 Condition 包含一个等待队列, Condition 拥有首节点  ( firstWaiter ) 和 尾结点 ( lastWaiter )。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从未被加入等待队列,等待队列的基本结构u如下所示:

并发编程 — Condition 使用及原理详解
等待队列的基本结构

在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而Lock中拥有一个同步队列和多个等待队列,如下图所示:

并发编程 — Condition 使用及原理详解
同步队列和等待队列

 

2、等待实现分析

调用Condition的await方法,会是当前线程进入等待队列并释放锁,同时线程状态变为等待状态。如wait方法实现如下:

        public final void await() throws InterruptedException {
            if (Thread.interrupted()) // 如果线程收到中断信号则直接抛出中断异常
                throw new InterruptedException();
            Node node = addConditionWaiter(); //构造一个等待节点,并加入到等待队列中
            int savedState = fullyRelease(node); //关键点:在阻塞线程之前,先释放锁,否则造成死锁
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) { // 判断当前节点是否在同步队列中,如不不存在则进行等待阻塞
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 重新竞争锁
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode); // 如果线程已被中断,在唤醒后抛出中断异常
        }

关于await,有几个关键点要说明:

(1)线程调用await() 的时候,肯定已经先拿到了锁。所以,在addConditionWaiter() 内部,入队的操作不需要执行CAS操作,线程天生是安全的.

(2)在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。

(3)线程从wait中被唤醒后,必须用acquireQueued(node,savedState) 函数重新拿锁。

(4)checkInterruptWhileWaiting(node) 代码在 park(this) 代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号。这里的await()函数是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()函数也会返回。

(5)isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只在Condition的队列里,而不在AQS的队列里。但执行 signal 操作的时候,会放进AQS的同步队列。

该过程下图所示:

并发编程 — Condition 使用及原理详解
当前线程加入等地队列

3、通知实现分析

调用Condition 的 signal() 方法,将唤醒在等待队列中等待时间最长的节点,在唤醒之前会将该节点移到同步队列中。如下代码所示:

        public final void signal() {
            if (!isHeldExclusively()) // 在调用 signal 方法之前必须先获取到锁
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first); // 唤醒第一个节点
        }

        private void doSignal(Node first) { //唤醒第一个节点
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
  
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

       
        Node p = enq(node); // 关键点:先把唤醒的节点加入到同步队列中,然后唤醒
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); // 唤醒当前线程
        return true;
    }

同await() 一样,在调用signal() 的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行 await() 的时候,把锁释放了。然后,从队列中取出firstWait,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)函数把这个Node放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await() 函数里面的判断条件 while (!isOnSyncQueue(node)),这个判断条件被满足,说明 await 线程不是被中断,而是被unpark唤醒的。

节点从等待队列移到同步队列如下图所示:

并发编程 — Condition 使用及原理详解
节点等待队列移到同步队列

 

参考文献:

《Java并发实现原理:JDK源码剖析》

《Java并发编程的艺术》

《Java并发编程实战》

 

上一篇:Ionic2集成ArcGIS JavaScript API.md


下一篇:条件锁condition与Queue()