ReentrantLock Condition 线程间通信

方法介绍:

boolean await() 阻塞线程 直至被唤醒
boolean await(long time, TimeUnit unit) 阻塞线程 超时自动唤醒
void signal() 唤醒一个await线程
void signalAll() 唤醒所有await线程

 

场景举例:

班级组织郊游, 每个到了的同学就在原地等待上大巴,先到的排在最前面, 后到的紧接着排在最后一个人后面,  直到班长来了, 班长有两种方式让让同学上车,

方式一: 班长从前往后一个一个点 点到几个上几个(按排队顺序) ------- await() + signal() 的使用

方式二: 班长直接让所有人按排队顺序上车 -----  await() + signalAll() 的使用

 

方式一代码如下:

public static void main(String[] args) throws InterruptedException {

    ReentrantLock lock = new ReentrantLock();

    //new condition
    Condition condition = lock.newCondition();

    //t1线程
    Thread t1 = new Thread(()->{
        try {
            lock.lock();
            System.out.println("t1到位准备 等待班长点到");
            condition.await();
            System.out.println("t1已上车");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    });

    Thread t2 = new Thread(()->{
        try {
            lock.lock();
            System.out.println("t2到位准备 等待班长点到");
            condition.await();
            System.out.println("t2已上车");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    });

    Thread t3 = new Thread(()->{
        try {
            lock.lock();
            System.out.println("t3到位准备 等待班长点到");
            condition.await();
            System.out.println("t3已上车");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    });

    Thread t4 = new Thread(()->{
        try {
            lock.lock();
            System.out.println("班长到位 准备按排队顺序点到上车");
            for (int i=1; i<=2; i++){
                System.out.println("点"+i+"个");
                condition.signal();
                System.out.println("休息1秒....");
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            System.out.println("点到的同学可以上车了");
            lock.unlock();
        }
    });

    t1.start();
    //休眠200秒是为了保证t1在t2之前执行线程
    Thread.sleep(200);
    t2.start();
    //休眠200秒是为了保证t2在t3之前执行线程
    Thread.sleep(200);
    t3.start();
    //休眠200秒是为了保证t3在t4之前执行线程
    t4.start();
}



打印结果:

t1到位准备 等待班长发令
t2到位准备 等待班长发令
t3到位准备 等待班长发令
班长到位准备发令上车
点1个
休息1秒....
点2个
休息1秒....
点到的同学可以上车了
t1已上车
t2已上车

从打印结果可以看出, t1 和 t2同学被点到 并且已上车 ,  t3则需要等待班长继续点到

方式二代码如下:

public static void main(String[] args) throws InterruptedException {

    ReentrantLock lock = new ReentrantLock();

    //new condition
    Condition condition = lock.newCondition();

    //t1线程
    Thread t1 = new Thread(()->{
        try {
            lock.lock();
            System.out.println("t1到位准备 等待班长点到");
            condition.await();
            System.out.println("t1已上车");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    });

    Thread t2 = new Thread(()->{
        try {
            lock.lock();
            System.out.println("t2到位准备 等待班长点到");
            condition.await();
            System.out.println("t2已上车");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    });

    Thread t3 = new Thread(()->{
        try {
            lock.lock();
            System.out.println("t3到位准备 等待班长点到");
            condition.await();
            System.out.println("t3已上车");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    });

    Thread t4 = new Thread(()->{
        try {
            lock.lock();
            //发令让所有同学上车
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            System.out.println("所有同学可以上车了");
            lock.unlock();
        }
    });

    t1.start();
    //休眠200秒是为了保证t1在t2之前执行线程
    Thread.sleep(200);
    t2.start();
    //休眠200秒是为了保证t2在t3之前执行线程
    Thread.sleep(200);
    t3.start();
    //休眠200秒是为了保证t3在t4之前执行线程
    t4.start();
}

打印结果:
t1到位准备 等待班长点到
t2到位准备 等待班长点到
t3到位准备 等待班长点到
所有同学可以上车了
t1已上车
t2已上车
t3已上车

从打印结果可以看出,班长发令所有人上车之后, 所有人按当时排队顺序陆续上车了

 

数据结构代码如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
                        implements java.io.Serializable {
     //头节点
     private transient volatile Node head;
     //尾部节点
     private transient volatile Node tail;
     //状态 
     private volatile int state;
    //当前拥有锁的线程
    private transient Thread exclusiveOwnerThread;
    
    static final class Node {

        //节点等待状态 这个很重要 relase时会根据节点等待状态做出下一步动作
     //  1取消由于超时或打断而取消 不再阻塞
        // -1 等待被唤醒 后续节点需要阻塞 
        // -2 等待转移 非阻塞
        // -3 共享参数 需要继续传播给下一个节点   
        volatile int waitStatus;
        //前节点
        volatile Node prev;
        //后节点
        volatile Node next;
        //节点线程 这个是重点 保存了等待锁的线程
        volatile Thread thread;
        //下一个等待锁的节点 condition时有用 本文关注重点
        Node nextWaiter;
    }
   
//本文关注重点 public class ConditionObject implements Condition, java.io.Serializable { //condition 阻塞队列第一个线程节点 private transient Node firstWaiter; //condition 阻塞队列最后一个线程节点 private transient Node lastWaiter; } }

 

 

await():数据结构变化 

lock.newCondition() 实际是新建了一个ConditionObject对象, (这里涉及到一些内部类引用外部类 具体原理不详述)

先从数据结构看await()工作原理, 方便后面源码分析

1 new newCondition() 此时ConditionObject中属性 firstWaiter和lastWaiter都为null, 

2 线程t1调用代码 condition.await(),此时设置firstWaiter = lastWaiter = new Node(t1)

3 线程t2调用代码 condition.await(),此时设置 lastWaiter = new Node(t2), 并且 firstWaiter.nextWaiter = node(t2)

4 线程t3原理同上一步

ReentrantLock Condition 线程间通信

 

 

sign():数据结构变化 

下图中有两个框,

第一个框表示t1 t2 t3线程中执行了lock.lock() 以及 condition.await() , t5 t6只执行了lock.lock() 之后定格的数据结构状态图

第二个框表示新开线程执行lock.lock() 以及 condition.sign()后的定格数据结构状态图(signAll()原理一样, 不同点是会把所有condition的waiter节点全部移入AQS的CLH队列中)

ReentrantLock Condition 线程间通信

 

简述condition使用过程逻辑

1 新建一个ReentrantLock对象

2 新建一个Condition(ReentrantLock内部类) 对象

3 await() 在condition中添加一个waiter节点,链式结构不详述 firstWaiter- > 若干node -> lastWaiter

 循环判断是否当前新建节点是否transfer到CLH队列中,多线程情况下,当前节点可能正好被其他线程移入CLH中了

4 sign() 将firstWaiter移入CLH最后一个节点(如果CLH最后一个节点为cancel 则直接传播给当前节点)

 

 

源码分析:

await()

public final void await() throws InterruptedException {
    //如果当前线程被打断 抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //新增一个等待节点
    Node node = addConditionWaiter();
    //设置AQS独占线程为空, 释放当前线程持有AQS锁(如果AQS阻塞队列有节点则传播给下一个节点)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断是否在阻塞队列
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //代码运行到这必然是别的线程调用sign()方法, 已经将waiter队列中的节点移入到CLH(其实上面isOnSyncQueue(node)=true就表示移入了)
    //当前node获取独占锁 等待lock.unlock()让出锁给CLH队列的下一个节点
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}


private Node addConditionWaiter() {
    //找到最后一个节点
    Node t = lastWaiter;
    //如果最后一个节点状态发生改变 已非等待唤醒状态 则从队列中剔除
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //新建一个节点(当前线程, waitStatus=-3等待唤醒状态)
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //如果当前没有等待节点 则设置此新建节点为第一个等待节点
    if (t == null)
        firstWaiter = node;
    //如果如果已经存在等待节点 则当前新建节点挂在最后一个等待节点后面
    else
        t.nextWaiter = node;
    //最后一个节点即为当前新增节点
    lastWaiter = node;
    return node;
}

sign()

//唤醒await中的线程
public final void signal() {
    //判断当前线程是否为AQS锁独占线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //唤醒第一个waiter节点
        doSignal(first);
}

//以下代码的作用是转移第一个节点到CLH队列
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
//转移过程 这段代码是核心代码
final boolean transferForSignal(Node node) {
    //原子操作 修改node等待状态从-3修改为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //将此等待节点加入到CLH的tail节点之后 成为新的tail节点
    //节点p为node的前一个节点 也就是加入之前CLH的tail节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果p节点为取消状态 或者 p节点为非取消状态但是修改p为独占状态失败 则唤醒当前节点线程
    //因为链式调用的原则 node线程的唤醒交给前驱节点, 若前驱节点无法达到唤醒当前节点条件 那么就在此处直接唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

condition还有一些其他的await方法, 实现方法基本差不多, 只不过在LockSupport.park时带上线程阻塞时间

上一篇:test


下一篇:2. ansible-playbook 条件语句-内部变量使用