await和signal方法使用
await和signal方法是基于ReentrantLock的Condition使用的。并且await方法要先于singnal,否则await方法会一直阻塞该线程。lockA线程调用ReentrantLock实例的lock方法占用锁,再调用Condition实例的await方法释放锁挂起线程;2秒后,lockB线程调用ReentrantLock实例的lock方法占用锁,再调用Condition实例的signal方法将lockA放在同步队列中,lockB调用unlock方法释放锁之后就会唤醒lockA线程。
private static Lock lock = new ReentrantLock();
@Test
public void testAwaitAndSignal() {
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "lockA").start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
lock.lock();
condition.signal();
lock.unlock();
}, "lockB").start();
}
Condition
(1)Condition能与任意的Lock结合使用,Lock代替Synchronized方法和语句,Condition代替Object监控对象。Condition.await()可以用来代替Object.wait(),Condition.signal可以用来代替Object.notify()。
(2)条件(也称为条件队列或条件变量)为一种线程中止执行“等待”直到另一线程通知某些状态条件现在为真提供了一种手段。由于对该共享状态信息的访问发生在不同的线程中,因此必须对其进行保护,因此某种形式的锁与该条件相关联。Condition实例从本质上绑定到锁。要获取特定Lock实例的Condition实例,请使用其Lock#newCondition方法。
package java.util.concurrent.locks;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public interface Condition {
//当前线程一直等待直到被或许或者中断
void await() throws InterruptedException;
//当前线程一直等待直到被唤醒
void awaitUninterruptibly();
//当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。
long awaitNanos(long nanosTimeout) throws InterruptedException;
//当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。
boolean await(long time, TimeUnit unit) throws InterruptedException;
//当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待的线程
void signal();
//唤醒全部的等待线程
void signalAll();
}
ConditionNode
ConditionNode是AbstractQueuedSynchronizer(AQS同步器)的静态final内部类。ConditionNode(条件结点)继承了Node类,实现了ForkJoinPool.ManagedBlocker接口。
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
//表示下一个等待结点
ConditionNode nextWaiter;
//允许在ForkJoinPools中使用条件,而不会冒固定池耗尽的风险。 这仅适用于非定时条件等待,不适用于定时版本。
public final boolean isReleasable() {
//如果结点是条件等待且不是中断状态,返回false。
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
//如果结点是条件等待且不是中断状态,挂起线程
while (!isReleasable()) LockSupport.park();
return true;
}
}
ConditionObject
ConditionObject是AbstractQueuedSynchronizer(AQS同步器)的内部类。维护了2个属性,一个是firstWait,表示条件队列的第一个结点;另一个是lastWaiter,表示条件队列的最后一个结点。提供了一个无参构造函数用于构建ConditionObject实例。
public class ConditionObject implements Condition, java.io.Serializable {
private transient ConditionNode firstWaiter;
private transient ConditionNode lastWaiter;
public ConditionObject() { }
}
ReentrantLock.Sync.isHeldExclusively
ReentrantLock有个内部类Sync,Sync实现了AbstractQueueSynchronizer接口。
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
ConditionObject#canReacquire
//如果一个结点是在条件队列初始化,现在也存放在同步队列就会返回true。
private boolean canReacquire(ConditionNode node) {
return node != null && node.prev != null && isEnqueued(node);
}
AbstractQueueSynchronizer#isEnqueued
从同步队列的尾结点往前遍历,判断给定的结点是否在同步队列中。
//判断结点是否在同步队列中
final boolean isEnqueued(Node node) {
for (Node t = tail; t != null; t = t.prev)
if (t == node)
return true;
return false;
}
ConditionObject#enableWait
这个方法主要是将条件结点添加到条件队列,然后释放占用的锁资源。
static final int WAITING = 1; // 二进制表示是 0000 0001
static final int COND = 2; // 二进制表示是 0000 0010
private int enableWait(ConditionNode node) {
//判断锁的持有者是否是当前线程
if (isHeldExclusively()) {
//结点的等待线程是当前线程
node.waiter = Thread.currentThread();
//WAITING | COND = 0000 0001 | 0000 0010 = 0000 0011 = 3
node.setStatusRelaxed(COND | WAITING);
ConditionNode last = lastWaiter;
//将结点加入到条件队列尾部
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
//获取state值
int savedState = getState();
//释放锁资源
if (release(savedState))
return savedState;
}
//释放锁资源失败,状态设置为取消,抛出IllegalMonitorStateException。
node.status = CANCELLED;
throw new IllegalMonitorStateException();
}
ConditionObject#await
(1)创建一个ConditionNode。
(2)调用enableWait方法将将新创建的ConditionNode加入到条件队列中,然后释放占用的锁资源。
(3)判断新建条件结点是否在同步队列中,存在返回true,不存在返回false。
(4)如果新建条件结点不在同步队列中,判断当前线程是否是中断状态,如果是中断状态,从同步队列中移出该条件结点,并调用当前线程的中断方法。如果不是中断状态,调用ForkJoinPool.managedBlock方法挂起线程。
(5)如果新建条件结点在同步队列中,清除新建条件结点的状态,重置为0,并尝试获取锁。
public final void await() throws InterruptedException {
//判断当前线程是否是中断状态,如果为true,抛出InterruptedException。
if (Thread.interrupted())
throw new InterruptedException();
//创建一个条件结点
ConditionNode node = new ConditionNode();
//将条件结点放在条件队列,释放lock占用的资源
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false;
while (!canReacquire(node)) {
//判断当前线程是否是中断状态,false | false = false
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
//node.state = 3; 3 & 2 = 2
} else if ((node.status & COND) != 0) {
try {
//线程堵塞
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait();
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
ConditionObject#signal
public final void signal() {
//获取条件队列的第一个等待结点
ConditionNode first = firstWaiter;
//如果持有锁的线程不是当前线程,抛出IllegalMonitorStateException异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//判断第一个等待结点是否为空
if (first != null)
//将第一个等待结点放在同步队列尾部
doSignal(first, false);
}
ConditionObject#signal
(1)重新设置条件队列的第一个条件结点。
(2)将给定的条件结点添加到同步队列的尾部。
(3)入参有一个boolean值,表示是否将全部的条件结点都放在同步队列中。
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
//获取first结点的后一个条件结点
ConditionNode next = first.nextWaiter;
//将后一个结点设置为第一个条件结点,如果next结点为空,条件队列的最后一个结点也为null。
if ((firstWaiter = next) == null)
lastWaiter = null;
//COND=2(0000 0010)
//`COND=`2=-3(10000000 00000000 00000000 00000011) & 00000010 = 00000010 = 2
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
//将结点添加到同步队列的尾部
enqueue(first);
if (!all)
break;
}
first = next;
}
}
AbstractQueueSynchoronizer#enqueue
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
//判断尾结点是否为空,如果为空,初始化头结点和尾结点
if (t == null) // initialize
tryInitializeHead();
//将新结点添加同步
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
总结
(1)await和signal方法是接口Condition的方法。ReentrantLock的内部类ConditionObject实现了Condition接口,暴露了一个newCondition方法可以让我们获取到ConditionObject对象。
(2)await和signal方法的阻塞释放机制是:A线程获取锁成功,调用await方法后进入等待队列,释放占用的锁,挂起A线程;然后B线程获取锁成功,调用signal方法将第一个条件结点放在同步队列中,然后释放锁,唤醒同步队列的第一个等待结点(head.next, head不关联任何线程信息)。signalAll方法会将条件队列的全部结点都加入到同步队列中。