Java 中Object 类中Wait Notify NotifyAll 源码如下:
/**
* 线程等待
* @param var1 毫秒
* @param var3 纳秒
*/
public final void wait(long var1, int var3) throws InterruptedException {
if (var1 < 0L) {
throw new IllegalArgumentException("timeout value is negative");
} else if (var3 >= 0 && var3 <= 999999) {
//纳秒>0,毫秒直接++
if (var3 > 0) {
++var1;
}
//调用native方法
this.wait(var1);
} else {
throw new IllegalArgumentException("nanosecond timeout value out of range");
}
}
/**
* native方法线程等待
*/
public final native void wait(long var1) throws InterruptedException;
/**
* native方法线程单个唤醒
*/
public final native void notify();
/**
* native方法线程唤醒等待池中所有线程
*/
public final native void notifyAll();
解析源码之前的先具备的条件:
对象锁ObjectMonitor拥有等待队列和同步队列两种队列
wait 方法:
线程等待,让出对象锁,加入等待队列,然后进入park,等待其他线程释放锁unpark
synchronized (a) {
a.wait();
}
等价于
moniter.enter //获取对象锁
{
1.判断锁是否存在
2.判断中断状态
3.创建node 加入 等待队列
4.moniter.exit(根据不同策略,从同步队列获取头节点线程a,然后执行线程a的event.unpark 唤醒机制)
5.本线程执行event.park 等待其他线程唤醒
6.判断唤醒是不是被中断唤醒的,需不需要抛出异常
}
moniter.exit //释放锁,唤醒同步队列下一个对象
- CHECK_OWNER 判断锁是否存在,不存在就抛异常。没有加Synchronize的话,会抛出IllegalMonitorStateException
#define CHECK_OWNER()
do {
if (THREAD != _owner) {
if (THREAD->is_lock_owned((address) _owner)) {
_owner = THREAD ; /* Convert from basiclock addr to Thread addr */
_recursions = 0;
OwnerIsThread = 1 ;
} else {
TEVENT (Throw IMSX) ;
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
}
} while (false)
- 调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束
//调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
...
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
- 利用自旋锁创建一个node 放入队列
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add")
AddWaiter (&node)
Thread::SpinRelease (&_WaitSetLock)
- 退出监视器 exit (Self)
intptr_t save = _recursions; // 记录旧的递归次数
_waiters++; // waiters 自增
_recursions = 0; // 设置 recursion level to be 1
exit (Self) ; // 退出监视器
- 利用parkEvent.park 方法阻塞等待信号提醒
if (millis <= 0) {
// 调用park()方法阻塞线程
Self->_ParkEvent->park () ;
} else {
// 调用park()方法在超时时间内阻塞线程
ret = Self->_ParkEvent->park (millis) ;
}
- 判断是否需要中断,被parkEvent.unpark 唤醒判断一下interrupt 发起的,还是notify发起的
if (!WasNotified) {
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException());
}
}
wait 本质是调用了ObjectMonitor 的wait 方法
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize () ;
// Throw IMSX or IEX.
CHECK_OWNER();
//调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
//post monitor waited event
//注意这是过去式,已经等待完了
if (JvmtiExport::should_post_monitor_waited()) {
//注意:这里传递参数'false',这是因为由于线程中断,等待不会超时
JvmtiExport::post_monitor_waited(jt, this, false);
}
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
//创建一个node放入队列
//关键是,在reset()之后,但在park()之前,必须检查是否有挂起的中断
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence();
//在本例中等待队列是一个循环的双向链表,但它也可以是一个优先级队列或任何数据结构。
//_WaitSetLock保护着等待队列.
//通常,等待队列只能由监视器*except*的所有者访问,但在park()因中断超时而返回的情况下也是可以。
//竞争非常小,所以使用一个自旋锁而不是重量级的阻塞锁。
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // 记录旧的递归次数
_waiters++; // waiters 自增
_recursions = 0; // 设置 recursion level to be 1
exit (Self) ; // 退出监视器
guarantee (_owner != Self, "invariant") ;
//一旦在上面的exit()调用中删除了ObjectMonitor的所有权,
//另一个线程就可以进入ObjectMonitor,执行notify()和exit()对象监视器。
//如果另一个线程的exit()调用选择此线程作为后继者,并且此线程在发布MONITOR_CONTENDED_EXIT时发生unpark()调用,
//则我们使用RawMonitors运行事件风险处理,并使用unpark().
//为了避免这个问题,我们重新发布事件,即使未使用原来的unpark(),
//这也不会造成任何伤害,因为已经为此监视器选好了继任者。
if (node._notified != 0 && _succ == Self) {
node._event->unpark();
}
// The thread is on the WaitSet list - now park() it.
// On MP systems it's conceivable that a brief spin before we park
// could be profitable.
//
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
//线程处于阻塞状态,并且oop访问是不安全的
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty 空处理
} else
if (node._notified == 0) {
if (millis <= 0) {
// 调用park()方法阻塞线程
Self->_ParkEvent->park () ;
} else {
// 调用park()方法在超时时间内阻塞线程
ret = Self->_ParkEvent->park (millis) ;
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
//当线程不在等待队列时,使用双重检查锁定避免获取_WaitSetLock
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
Thread::SpinRelease (&_WaitSetLock) ;
}
//从这个线程的角度来看,Node's TState是稳定的,
//没有其他线程能够异步修改TState
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
OrderAccess::loadload() ;
if (_succ == Self) _succ = NULL ;
WasNotified = node._notified ;
// Reentry phase -- reacquire the monitor.
// re-enter contended(竞争) monitor after object.wait().
// retain OBJECT_WAIT state until re-enter successfully completes
// Thread state is thread_in_vm and oop access is again safe,
// although the raw address of the object may have changed.
// (Don't cache naked oops over safepoints, of course).
// post monitor waited event.
//注意这是过去式,已经等待完了
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
}
OrderAccess::fence() ;
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
// Self has reacquired the lock.
// Lifecycle - the node representing Self must not appear on any queues.
// Node is about to go out-of-scope, but even if it were immortal(长久的) we wouldn't
// want residual(残留的) elements associated with this thread left on any lists.
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
guarantee (_recursions == 0, "invariant") ;
_recursions = save; // restore the old recursion count
_waiters--; // decrement the number of waiters
// Verify a few postconditions
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
//检查是否有通知notify发生
// 从park()方法返回后,判断是否是因为中断返回,再次调用
// thread::is_interrupted(Self, true)判断并清除线程中断状态
// 如果中断状态为true,抛出中断异常并结束。
if (!WasNotified) {
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException());
}
}
//注意:虚假唤醒将被视为超时;监视器通知优先于线程中断。
}
notify 方法 :从等待队列中获取第一个节点,然后加入同步队列,本身没有释放锁的功能,是Synchroinzed 自己提供的(重要)
synchronized (a) {
a.notify();
}
相当于
moniter.enter //获取对象锁
{
1.判断锁是否存在
2.从等待队列中获取第一个节点
3.根据不同的policy策略加入到cxq 或者entryList 同步队列
}
moniter.exit //释放锁,唤醒同步队列下一个对象
- CHECK_OWNER 判断锁是否存在,不存在就抛异常。没有加Synchronize的话,会抛出IllegalMonitorStateException
#define CHECK_OWNER()
do {
if (THREAD != _owner) {
if (THREAD->is_lock_owned((address) _owner)) {
_owner = THREAD ; /* Convert from basiclock addr to Thread addr */
_recursions = 0;
OwnerIsThread = 1 ;
} else {
TEVENT (Throw IMSX) ;
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
}
} while (false)
- 从等待队列的取出第一个节点
ObjectWaiter * iterator = DequeueWaiter() ;
- 根据不同policy,将等待对列的节点加入到同步队列中
if (Policy == 0) { // prepend(预追加) to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
}......
Notify 本质是调用了ObjectMonitor 的notify 方法
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
iterator->_notified = 1 ;
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { // prepend(预追加) to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append(真正追加) to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
//考虑:当前获取EntryList的tail需要遍历整个链表
//将tail访问转换为CDLL而不是使用当前的DLL,从而使访问时间固定。
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend(预追加) to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append(真正追加) to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc() ;
}
}
notifyAll方法 :跟Notify 方法类似,只是利用for循环 将等待队列的全部节点,加入到同步队列中,本身没有释放锁的功能,是Synchroinzed 自己提供的
void ObjectMonitor::notifyAll(TRAPS) {
CHECK_OWNER();
ObjectWaiter* iterator;
if (_WaitSet == NULL) {
TEVENT (Empty-NotifyAll) ;
return ;
}
DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
int Tally = 0 ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;
for (;;) {
iterator = DequeueWaiter () ;
if (iterator == NULL) break ;
TEVENT (NotifyAll - Transfer1) ;
++Tally ;
// Disposition - what might we do with iterator ?
// a. add it directly to the EntryList - either tail or head.
// b. push it onto the front of the _cxq.
// For now we use (a).
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
iterator->_notified = 1 ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// CONSIDER: finding the tail currently requires a linear-time walk of
// the EntryList. We can make tail access constant-time by converting to
// a CDLL instead of using our current DLL.
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc(Tally) ;
}
}
问题1:wait 在前面环节存在ParkEvent.park 阻塞等待唤醒,但是notify 本质只是将等待队列中的节点加入到了同步队列节点了,但是同步队列中有很多的节点,谁会拿出来用,在哪里调用了ParkEvent.unpark 唤醒线程继续往下走呢?
问题2:wait 方法只是退出对象锁。它是怎么将对象锁让给其他线程的,因为这个对象锁的转移只发生在wait 和notify 这个两个线程里面,没有第三者进行协调的,对象锁是怎么流转的。
其实本质都是一个问题:对象锁是怎么转让的?
关键点:wait 方法本身调用了一次ObjectMonitor.exit 方法,Synchronized 关键字本身也有一次ObjectMonitor.exit 方法。
void ATTR ObjectMonitor::exit(TRAPS) {
......
//根据QMode 策略从同步队列 取出节点
if (QMode == 2 && _cxq != NULL) {
// QMode == 2 : cxq has precedence over EntryList.
// Try to directly wake a successor from the cxq.
// If successful, the successor will need to unlink itself from cxq.
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
// Aggressively drain cxq into EntryList at the first opportunity.
// This policy ensure that recently-run threads live at the head of EntryList.
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap (&cxq, NULL)
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Append the RATs to the EntryList
// TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
// Fall thru into code that tries to wake a successor from EntryList
}
......
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
}
重点是拿到对应的节点执行了ExitEpilog 方法,唤醒这个正在wait 的节点
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
{
assert (_owner == Self, "invariant") ;
ParkEvent * Trigger = Wakee->_event ;
.... //这里对应wait 方法使用的ParkEvent.park
Trigger->unpark() ; //unpark唤醒wait线程
.....
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}