一、概述
Condition本身也是一个接口,其功能和Object类中 wait/notify类似。Object中的 wait 和 notify方法需要与synchronized关键字配合使用,可实现线程间的等待/通知功能。Condition 接口也是提供了类似的功能,但是需要与Lock配合使用,可实现等待/通知模式。
二、Condition 接口与示例
Condition 定义了等待 / 通知两种类型的方法,当前线程调用这些方法时,需要提前获取Condition 对象关联的锁。Condition 对象是调用 Lock对象的 newCondition() 方法创建出来的,也就是说,Condition 是依赖 Lock 对象的。
Condition 的使用很简单,需要注意的是在调用方法前获取锁。下面我们通过一个有界对垒的实例来了解一下Condition的使用方式。有界队列是指当队列空时,获取槽中将被阻塞,直到队列中有新元素,当队列已满时,队列插入操作将被阻塞,直到队列出现空位。代码如下所示:
public class SyncBoundedQueue<E> {
private LinkedList<E> items;
private int size; // 记录队列的初始大小
private Lock lock = new ReentrantLock(); //定义一个锁
private Condition notFull = lock.newCondition(); //队列不满
private Condition notEmpty = lock.newCondition(); // 队列不空
public SyncBoundedQueue(int size){
items = new LinkedList<>();
this.size = size;
}
//添加元素
public void add(E e) throws InterruptedException{
lock.lock();
try {
// 当队列已满时,需要阻塞等待,等待其他线程获取元素,流程空位
while (items.size() == size){
notFull.await();
}
items.addLast(e); // 有空位则添加元素
notEmpty.signalAll(); // 通知获取元素的线程现在队列已经有元素了
}finally {
lock.unlock();
}
}
// 获取元素
public E get() throws InterruptedException{
lock.lock();
try {
// 如果队列中没有元素,则一直等待
while (items.size() == 0){
notEmpty.await();
}
E e = items.removeFirst(); // 移除元素
notFull.signalAll(); // 移除元素有空位,通知唤醒添加元素的线程
return e; // 返回元素
}finally {
lock.unlock();
}
}
}
三、实现原理分析
1、基本原理分析
因为Condition 必须和 Lock一起使用,所以Condition的实现也是Lock的一部分。下面先分别看一下互斥锁中Condition的构造。
// ReentrantLock 中的方法
public Condition newCondition() {
return sync.newCondition();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
.....
final ConditionObject newCondition() {
return new ConditionObject(); // 创建 Condition 对象
}
.....
}
有上面代码可知,调用 Lock.newConditon() 方法实际是调用的 Sync 内部类中的方法创建了Condition实现了 ConditionObject()。ConditionObject 类是 同步器 AQS 的内部类,因为 Condition 的操作需要相关联的锁,每一个Condition对象上面,都阻塞了多个线程。因此,在 ConditionObject 内部也有一个双向链表组成的队列,如下所示:
public class ConditionObject implements Condition {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
由代码实现可知 ConditionObject 内部的等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程是在 Condition 对象上等待线程,如果一个线程调用了 Condition.await() 方法,那么该线程将会释放锁、构造成节点加入等待对垒并进行等待状态。一个 Condition 包含一个等待队列, Condition 拥有首节点 ( firstWaiter ) 和 尾结点 ( lastWaiter )。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从未被加入等待队列,等待队列的基本结构u如下所示:
在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而Lock中拥有一个同步队列和多个等待队列,如下图所示:
2、等待实现分析
调用Condition的await方法,会是当前线程进入等待队列并释放锁,同时线程状态变为等待状态。如wait方法实现如下:
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 如果线程收到中断信号则直接抛出中断异常
throw new InterruptedException();
Node node = addConditionWaiter(); //构造一个等待节点,并加入到等待队列中
int savedState = fullyRelease(node); //关键点:在阻塞线程之前,先释放锁,否则造成死锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 判断当前节点是否在同步队列中,如不不存在则进行等待阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 重新竞争锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); // 如果线程已被中断,在唤醒后抛出中断异常
}
关于await,有几个关键点要说明:
(1)线程调用await() 的时候,肯定已经先拿到了锁。所以,在addConditionWaiter() 内部,入队的操作不需要执行CAS操作,线程天生是安全的.
(2)在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。
(3)线程从wait中被唤醒后,必须用acquireQueued(node,savedState) 函数重新拿锁。
(4)checkInterruptWhileWaiting(node) 代码在 park(this) 代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号。这里的await()函数是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()函数也会返回。
(5)isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只在Condition的队列里,而不在AQS的队列里。但执行 signal 操作的时候,会放进AQS的同步队列。
该过程下图所示:
3、通知实现分析
调用Condition 的 signal() 方法,将唤醒在等待队列中等待时间最长的节点,在唤醒之前会将该节点移到同步队列中。如下代码所示:
public final void signal() {
if (!isHeldExclusively()) // 在调用 signal 方法之前必须先获取到锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒第一个节点
}
private void doSignal(Node first) { //唤醒第一个节点
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 关键点:先把唤醒的节点加入到同步队列中,然后唤醒
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒当前线程
return true;
}
同await() 一样,在调用signal() 的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行 await() 的时候,把锁释放了。然后,从队列中取出firstWait,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)函数把这个Node放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await() 函数里面的判断条件 while (!isOnSyncQueue(node)),这个判断条件被满足,说明 await 线程不是被中断,而是被unpark唤醒的。
节点从等待队列移到同步队列如下图所示:
参考文献:
《Java并发实现原理:JDK源码剖析》
《Java并发编程的艺术》
《Java并发编程实战》