JDK15源码(五):await和signal

await和signal方法使用

await和signal方法是基于ReentrantLock的Condition使用的。并且await方法要先于singnal,否则await方法会一直阻塞该线程。lockA线程调用ReentrantLock实例的lock方法占用锁,再调用Condition实例的await方法释放锁挂起线程;2秒后,lockB线程调用ReentrantLock实例的lock方法占用锁,再调用Condition实例的signal方法将lockA放在同步队列中,lockB调用unlock方法释放锁之后就会唤醒lockA线程。

private static Lock lock = new ReentrantLock();
@Test
public void testAwaitAndSignal() {
    Condition condition = lock.newCondition();
    new Thread(() -> {
        lock.lock();
        try {
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }, "lockA").start();

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(() -> {
        lock.lock();
        condition.signal();
        lock.unlock();
    }, "lockB").start();
}

Condition

(1)Condition能与任意的Lock结合使用,Lock代替Synchronized方法和语句,Condition代替Object监控对象。Condition.await()可以用来代替Object.wait(),Condition.signal可以用来代替Object.notify()。

(2)条件(也称为条件队列或条件变量)为一种线程中止执行“等待”直到另一线程通知某些状态条件现在为真提供了一种手段。由于对该共享状态信息的访问发生在不同的线程中,因此必须对其进行保护,因此某种形式的锁与该条件相关联。Condition实例从本质上绑定到锁。要获取特定Lock实例的Condition实例,请使用其Lock#newCondition方法。

package java.util.concurrent.locks;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public interface Condition {

    //当前线程一直等待直到被或许或者中断
    void await() throws InterruptedException;

    //当前线程一直等待直到被唤醒
    void awaitUninterruptibly();

    //当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。
    long awaitNanos(long nanosTimeout) throws InterruptedException;

   
    //当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。 
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    //当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。
    boolean awaitUntil(Date deadline) throws InterruptedException;

    //唤醒一个等待的线程
    void signal();

    //唤醒全部的等待线程
    void signalAll();
}

ConditionNode

ConditionNode是AbstractQueuedSynchronizer(AQS同步器)的静态final内部类。ConditionNode(条件结点)继承了Node类,实现了ForkJoinPool.ManagedBlocker接口。

static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
    //表示下一个等待结点
    ConditionNode nextWaiter;            

    //允许在ForkJoinPools中使用条件,而不会冒固定池耗尽的风险。 这仅适用于非定时条件等待,不适用于定时版本。
    public final boolean isReleasable() {
        //如果结点是条件等待且不是中断状态,返回false。
        return status <= 1 || Thread.currentThread().isInterrupted();
    }

    public final boolean block() {
        //如果结点是条件等待且不是中断状态,挂起线程
        while (!isReleasable()) LockSupport.park();
        return true;
    }
} 

ConditionObject

ConditionObject是AbstractQueuedSynchronizer(AQS同步器)的内部类。维护了2个属性,一个是firstWait,表示条件队列的第一个结点;另一个是lastWaiter,表示条件队列的最后一个结点。提供了一个无参构造函数用于构建ConditionObject实例。

public class ConditionObject implements Condition, java.io.Serializable {

    private transient ConditionNode firstWaiter;

    private transient ConditionNode lastWaiter;

    public ConditionObject() { }
}

ReentrantLock.Sync.isHeldExclusively

ReentrantLock有个内部类Sync,Sync实现了AbstractQueueSynchronizer接口。

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

ConditionObject#canReacquire

//如果一个结点是在条件队列初始化,现在也存放在同步队列就会返回true。
private boolean canReacquire(ConditionNode node) {
    return node != null && node.prev != null && isEnqueued(node);
}

AbstractQueueSynchronizer#isEnqueued

从同步队列的尾结点往前遍历,判断给定的结点是否在同步队列中。

//判断结点是否在同步队列中
final boolean isEnqueued(Node node) {
    for (Node t = tail; t != null; t = t.prev)
        if (t == node)
            return true;
    return false;
}

ConditionObject#enableWait

这个方法主要是将条件结点添加到条件队列,然后释放占用的锁资源。

static final int WAITING   = 1;          // 二进制表示是 0000 0001
static final int COND      = 2;          // 二进制表示是 0000 0010

private int enableWait(ConditionNode node) {
    //判断锁的持有者是否是当前线程
    if (isHeldExclusively()) {
        //结点的等待线程是当前线程
        node.waiter = Thread.currentThread();
        //WAITING | COND = 0000 0001 | 0000 0010 = 0000 0011 = 3
        node.setStatusRelaxed(COND | WAITING);
        ConditionNode last = lastWaiter;
        //将结点加入到条件队列尾部
        if (last == null)
            firstWaiter = node;
        else
            last.nextWaiter = node;
        lastWaiter = node;
        //获取state值
        int savedState = getState();
        //释放锁资源
        if (release(savedState))
            return savedState;
    }
    //释放锁资源失败,状态设置为取消,抛出IllegalMonitorStateException。
    node.status = CANCELLED;
    throw new IllegalMonitorStateException();
}

ConditionObject#await

(1)创建一个ConditionNode。
(2)调用enableWait方法将将新创建的ConditionNode加入到条件队列中,然后释放占用的锁资源。
(3)判断新建条件结点是否在同步队列中,存在返回true,不存在返回false。
(4)如果新建条件结点不在同步队列中,判断当前线程是否是中断状态,如果是中断状态,从同步队列中移出该条件结点,并调用当前线程的中断方法。如果不是中断状态,调用ForkJoinPool.managedBlock方法挂起线程。
(5)如果新建条件结点在同步队列中,清除新建条件结点的状态,重置为0,并尝试获取锁。

public final void await() throws InterruptedException {
    //判断当前线程是否是中断状态,如果为true,抛出InterruptedException。
    if (Thread.interrupted())
        throw new InterruptedException();
    //创建一个条件结点    
    ConditionNode node = new ConditionNode();
    //将条件结点放在条件队列,释放lock占用的资源
    int savedState = enableWait(node);
    LockSupport.setCurrentBlocker(this); // for back-compatibility
    boolean interrupted = false, cancelled = false;
    while (!canReacquire(node)) {
        //判断当前线程是否是中断状态,false | false = false
        if (interrupted |= Thread.interrupted()) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;              // else interrupted after signal
        //node.state = 3; 3 & 2 = 2        
        } else if ((node.status & COND) != 0) {
            try {
                //线程堵塞
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                interrupted = true; 
            }
        } else
            Thread.onSpinWait();
    }
    LockSupport.setCurrentBlocker(null);
    node.clearStatus();
    acquire(node, savedState, false, false, false, 0L);
    if (interrupted) {
        if (cancelled) {
            unlinkCancelledWaiters(node);
            throw new InterruptedException();
        }
        Thread.currentThread().interrupt();
    }
}

ConditionObject#signal

public final void signal() {
    //获取条件队列的第一个等待结点
    ConditionNode first = firstWaiter;
    //如果持有锁的线程不是当前线程,抛出IllegalMonitorStateException异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //判断第一个等待结点是否为空    
    if (first != null)
        //将第一个等待结点放在同步队列尾部
        doSignal(first, false);
}

ConditionObject#signal

(1)重新设置条件队列的第一个条件结点。
(2)将给定的条件结点添加到同步队列的尾部。
(3)入参有一个boolean值,表示是否将全部的条件结点都放在同步队列中。

private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        //获取first结点的后一个条件结点
        ConditionNode next = first.nextWaiter;
        //将后一个结点设置为第一个条件结点,如果next结点为空,条件队列的最后一个结点也为null。
        if ((firstWaiter = next) == null)
            lastWaiter = null;
        //COND=2(0000 0010)
        //`COND=`2=-3(10000000 00000000 00000000 00000011) & 00000010 = 00000010 = 2  
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            //将结点添加到同步队列的尾部
            enqueue(first);
            if (!all)
                break;
        }
        first = next;
    }
}

AbstractQueueSynchoronizer#enqueue

final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            //判断尾结点是否为空,如果为空,初始化头结点和尾结点
            if (t == null)                 // initialize
                tryInitializeHead();
            //将新结点添加同步    
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}

总结

(1)await和signal方法是接口Condition的方法。ReentrantLock的内部类ConditionObject实现了Condition接口,暴露了一个newCondition方法可以让我们获取到ConditionObject对象。

(2)await和signal方法的阻塞释放机制是:A线程获取锁成功,调用await方法后进入等待队列,释放占用的锁,挂起A线程;然后B线程获取锁成功,调用signal方法将第一个条件结点放在同步队列中,然后释放锁,唤醒同步队列的第一个等待结点(head.next, head不关联任何线程信息)。signalAll方法会将条件队列的全部结点都加入到同步队列中。

上一篇:jdk15+版本无jre文件


下一篇:k8s 部署 (二) EMQ X 集群