介绍
Condition
是j.u.c
包下提供的一个接口。
可以翻译成 条件对象,其作用是线程先等待,当外部满足某一条件时,在通过条件对象唤醒等待的线程。ArrayBlockingQueue
就是通过Condition
实现的。
先看一下Condition
接口提供了哪些方法:
/**
* 条件对象
*/
public interface Condition {
/**
* 让线程进入等待,如果其他线程调用同一Condition对象的notify/notifyAll,那么等待的线程可能被唤醒
*/
void await() throws InterruptedException;
/**
* 不抛出中断异常的await方法
*/
void awaitUninterruptibly();
/**
* 带超时的await
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 带超时的await(可指定时间单位)
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 带超时的await(指定截止时间)
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 唤醒等待的线程
*/
void signal();
/**
* 唤醒所有线程
*/
void signalAll();
}
Condition
接口主要提供了两类方法——让线程等待的方法(await()等)和唤醒线程的方法(signal())。
AQS
内部提供了Condition
接口的实现——ConditionalObject
。它内部的字段如下:
private static final long serialVersionUID = 1173984872572414699L;
//该ConditionObject维护的等待队列的头节点
private transient Node firstWaiter;
//该ConditionObject维护的等待队列的尾节点
private transient Node lastWaiter;
非常简单,从上面的字段我们大概可以猜到Condition
内部也维护了一个队列。
上篇文章中,我们已经分析锁实现的远离就是通过节点构成队列:让队列中除头节点外的其他线程都被Park,当头节点释放锁时,头节点唤醒下一个节点(Unpark线程),同时更新头节点。
举一反三,我们推测Condition
唤醒功能的原理也是通过维护队列的节点。
接下来就通过分析源码,(主要是await()
和signal()
方法),验证我们的猜测。
Condtion对象的获取
Condition
对象的获取主要是通过Lock.newCondition()
方法。
一个Lock
对象可以返回多个Condition
对象。
在对Condition
进行等待或者唤醒前,都需要先持有Condition
关联Lock
对象,否则会抛出IllegalMonitorStateException
异常。
Condition.await()过程
public final void await() throws InterruptedException {
//如果线程已经被标记为中断,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//将线程添加进等待队列
//注意等待队列和AQS维护的阻塞队列是两个不同的队列
//正常流程当线程能调用await(),说明线程此时拥有锁,此时AQS的阻塞队列中,线程应该在head节点
Node node = addConditionWaiter();
//释放掉锁(如果释放失败,NODE的waitStatus被更新为CANCELLED)
//同时因为释放掉了锁,该线程在阻塞队列中的节点也已经被移除
int savedState = fullyRelease(node);
//这里会将线程挂起,除非线程节点被移到AQS的阻塞队列或是线程被外部中断
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//检查是否是由于被中断而唤醒,如果是,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//在阻塞队列中尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//节点已经在阻塞队列中,与Condition的等待队列联系断开
//对于SIGNAL唤醒的线程而言,SIGNAL时除了将节点移到阻塞队列,同时也清空了node.nextWaiter
//而对于中断唤醒的线程而言,只是将节点移到阻塞队列,并没有清空node.nextWaiter(因为此时线程不持有,操作等待线程并非线程安全)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//根据interruptMode 决定是否需要抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await()
方法主要可以分为以下几个过程:
1)线程将自己包装成节点,并添加到Condition
的阻塞队列中
2)线程主动释放掉锁
3)线程进入自循环等待(主动通过LockSupport.park()
),醒来时,检查自己是否已经被移动至Lock的阻塞队列
4)线程在阻塞队列中等待,直到获取锁(线程在等待时可能又会被LockSupport.park()
挂起)
5)线程获取锁,检查自己在等待过程中(await()
过程)是否有被中断
6)如果有需要,则清理节点与等待队列之间的联系
7)根据中断状态确定是否需要抛出异常,以便让await()
的调用者可以响应线程的中断状态
从上面的流程,我们可以清楚的了解到以上步骤1和2,以及步骤5,6,7是持有锁的,步骤3和4并没有持有锁。了解这一点很重要,因为涉及某些方法是否需要以CAS来保证线程安全。
了解了大体流程,接下来就逐步分析各个步骤。
步骤1.线程包装成节点,添加进Condition
的等待队列
这一步骤主要是addConditionWaiter()
过程
private Node addConditionWaiter() {
Node t = lastWaiter;
// 找出该ConditionObject的等待队列中 真正未被取消的最后一个节点,并更新为lastWaiter
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将该线程包装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果此时ConditionObject队列为空,初始化链表且头节点为node
if (t == null)
firstWaiter = node;
else //否则将node添加队尾
t.nextWaiter = node;
//更新链表尾节点为node
lastWaiter = node;
return node;
}
主要找出等待队列的最后一个节点,将线程包装成Node,添加到队列的队尾。
这里要注意的一点是此时Node的waitStatus
为CONDITION
。节点的waitStatus
对判断等待是否被取消很重要,在等待队列中等待的节点状态应该为CONDITION
,如果状态不为CONDITION
,说明线程已经取消了等待(如果waitStatus为0说明被唤醒或中断)。
步骤2.线程释放锁
释放锁的步骤比较简单。主要通过fullRelease()
更新AQS
的state
为0并且将AQS
的拥有者置为null,同时唤醒阻塞队列中的后继节点。
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取锁被持有的此时
int savedState = getState();
//让锁直接释放被持有的次数
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//如果释放失败了,则将节点标记为CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
步骤3.线程挂起,进入循环等待
这一步比较关键,线程等待的动作都发生在这一步。
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//检查是否是由于被中断而唤醒,如果是,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
线程执行到这,会被LockSupport.park()
挂起。
如果此时线程被唤醒,线程会检查是否是因为中断,如果发生中断,还需要确定中断是否发生在SIGNAL前(如果发生在SIGNAL前,之后线程需要抛出异常,让外部响应)。
如果线程不是因为中断而唤醒,线程需要确认节点是否已经被移动至AQS
的等待队列。如果没有被移动,则继续被挂起(防止假唤醒)。
checkInterruptWhileWaiting()
就是用来检测线程在等待的时候是否被中断。
/**
* 检查线程在WAITING状态期间,是否有被中断
* 如果没有返回0;如果是在SIGNAL之前被中断,返回 THROW_IE;如果在SIGNAL之后被中断,则返回REINTERRUPT
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
如果线程被中断,还需要通过transferAferCancelledWait
判断中断是否发生在SIGNAL
之前。
final boolean transferAfterCancelledWait(Node node) {
//CAS操作,期待值是CONDITION,说明此时唤醒是被取消(中断),因为如果是SIGNAL,那么waitingStatus不会CONDITION,而是0(可以见SIGNAL流程)
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//即使是取消的,也需要移到AQS的阻塞队列
enq(node);
return true;
}
//说明线程先收到了SIGNAL信号
//此时要等SIGNAL信号处理完成
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
前文说了Node节点的waitStatus
是一个很重要的状态,从它可以推断出线程节点发生了什么操作。
上述方法先用CAS
尝试将更新节点的waitStatus
为0,期待值为CONDITION
。
如果尝试成功,说明此时节点未被操作过(SIGNAL信号),线程是中断唤醒的,此时需要通过enq()
将节点添加AQS
阻塞队列,因为此时没有锁,所以enq()
方法以CAS重试的方式保证线程安全。
如果尝试失败,说明线程收到了SIGNAL
信号,节点将由负责SIGNAL
的线程移动至阻塞队列。这里为了避免线程返回过早,在判断出线程还未移动至阻塞队列的情况下,会通过Thread.yeild()
让出CPU时间。
步骤4.线程在阻塞队列中重新等待锁
这一步主要是通过acquireQueue()
方法。该方法已经在上一篇文章中介绍过了,这里不过多介绍。
需要注意的一点是,即使线程在等待时被中断,仍然需要在AQS
的阻塞队列中等待获取锁。因为外部没有办法在线程获取锁之前发现中断状态,而且即使线程抛出了中断异常,此时线程也是持有锁的,外部需要显式的释放。
步骤5.检查并设置中断状态
这一步很简单,主要就是通过步骤3中的checkInterruptWhilewaiting()
方法返回值:0表示未中断,-1表示中断发生在SIGNAl之前,1表示中断发生在SIGNAL之后。
步骤6.清理节点与等待队列的联系
这里有两种情况,如果线程是因为SIGNAL
唤醒的,在唤醒时调用signal()
的线程已经清理了被唤醒节点与等待队列的关系。 因为那时唤醒线程持有锁,操作是安全的。
但是如果对于中断被唤醒的线程,唤醒时是不持有锁的,不能保证线程安全的清理唤醒节点与等待队列的关系。因此就将等待清理工作放在了获取锁之后。
//此方法并不保证线程安全,因此调用此方法时,必须要在获取锁的情况下调用
//此方法的目的是为了整理Condition的等待队列,将非CONDITION状态的节点从等待队列中移除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
步骤7.确定中断状态,并决定是否抛出异常
这一步也很简单,就是根据interruptMode
来确定是否抛出异常,如果interruptMode
值为THROW_IE,说明线程被唤醒前先被中断,此时需要抛出InterruptedException
。
Condition.signal()过程
signal()
过程比起await()
要简单很多。既然await()
过程是将节点添加到等待队列,那么signal()
作为await()
的逆过程,就是将节点从等待队列重新移动到AQS
阻塞队列。
/**
* 唤醒等待节点
* 主要的流程就是将节点从Condition的等待队列移到AQS的阻塞队列中,让其重新等待锁的获取
*/
public final void signal() {
//先验证唤醒者是否是锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//唤醒等待队列的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
为保证安全,先确定唤醒线程是否为锁的持有者。
之后找出等待队列中的头节点,将其唤醒。
/**
* 唤醒操作(此时占有锁,为线程安全),
* 找出等待队列中第一个真正要被唤醒的节点,移动到阻塞队列,
*/
private void doSignal(Node first) {
do {
//找出第一个需要唤醒的节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//断开和之后等待队列的联系,并更新等待队列的头节点
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); //等待队列的头节点移动失败 且 等待队列中还有其他节点,则继续尝试其他节点
}
因为头节点可能已经被取消,所以这里会一直在等待队列中从前往后找一个节点,开始唤醒。直到一个节点唤醒成功或者等待队列中没有节点需要唤醒。
上文也说了唤醒过程其实就是节点的移动过程。
/**
* 将节点从Condition的等待队列移动到AQS的阻塞队列(该方法只会在signal相关的方法中被调用)
*/
final boolean transferForSignal(Node node) {
//CAS操作更新节点的waitStatus,期待值为CONDITION
//操作成功,说明没有其他线程在操作这个节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS操作失败,说明节点已经被中断操作过,waitStatus已经变成了0
return false;
//这里说明CAS操作成功
//应该是线程安全的
//将节点添加到阻塞队列的队尾
Node p = enq(node);
//判断此时阻塞队列是否有前继节点等待,有就Park线程,等待前继节点唤醒
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
transferForSignal()
操作其实就是先通过CAS
操作确定等待队列中的节点还未被取消。如果CAS
操作成功,则将其添加到阻塞队列,如果此时阻塞队列还需要等待锁,则让await()
的线程继续被挂起。将给线程唤醒的任务(此时已经是获取锁的任务,不再是之前的await()任务)交给阻塞队列中的前继节点。
await()以及signal()过程的流程图
跟着流程图在走一遍,可以帮助巩固上述的知识点。
总结
再借上篇文章重口味的比方梳理下这个流程。
当你排队进了WC的包厢,想要方便时,你觉得太脏了,于是你在包厢内留了张纸条,希望有人能在厕所包厢干净了在叫你过来上厕所(调用Condition的await ()),随后主动让出了包厢的使用权(释放锁)。后面在排队等着方便的人便进去了。
你离开后,到另一个地方边玩手机边等(移动到Condition的阻塞队列,并被Park),期间,也有一些人同样觉得厕所太脏,跑了出来,在外面等,并排在了你的后面。
过了一段时间,有人把打电话给你说厕所已经变干净了,可以去用了,你就重新回到厕所那排起了队伍,等待轮到你用厕所(acquireQueue)。因为那个人仅通知了你,没有通知其他因为嫌弃厕所脏,而跑出来的人,所以那些只能继续在那等别人去叫他们。
如果你在等待的时候突然有人说服了你,其实脏一点也无所谓(外部未给signal却被中断),你就主动从在外面等待走到了厕所前去排队等待继续去用厕所。