线程通信synchronized中的wait/notify、J.U.C Condition的使用和源码分析

记得以前有个最经典的面试题:如何用多个线程顺序的从1输出到100?
上章说了Java中锁的使用以及原理分析,上述面试题应该手到擒来了吧

本章主要说下Java中线程通信实现生产消费队列以及Condition源码

线程通信

利用共享锁的互斥实现两个线程通信,从而实现生产消费队列

1.用Synchronized的wait/notify实现生产消费队列

//定义一个队列
static Queue<Integer> list = new LinkedList<>();
//定义队列的大小
static int size = 10;
//生产这代码 放线程执行
public static void producer() throws InterruptedException {
    int i = 0;
    while (true) {
        i++;
        //给队列加锁
        synchronized (list) {
            if (list.size() == size) {
                System.out.println("队列满了");
                //队列满了就等待,消费者消费后又会唤醒生产者
                list.wait();
            }
            Thread.sleep(1000);
            list.add(i);
            System.out.println("生产者添加:" + i);
            //唤醒线程后会从wait后面接着执行 意味着又是去抢占到锁才能继续执行
            list.notify();	
        }
    }
}
//消费者代码 
public static void comsume() throws InterruptedException {
    while (true) {
        synchronized (list) {
            if (list.size() == 0) {
                System.out.println("队列空了");
                //队列空了就等待,生产者生产了就会唤醒消费者
                list.wait();
            }
            Thread.sleep(1000);

            Integer remove = list.remove();
            System.out.println("消费者消费:" + remove);
            list.notify();
        }
    }
}
  • 这个基于synchronized实现的是底层源码就不说了,在Java中的JUC也有实现,我们一起看看吧

2.J.U.C中的Condition

  • signal/await 等价于synchronized的notify/wait
  • 也是必须要抢占锁的
  • signal 唤醒多个线程,唤醒后需要同步竞争锁,没抢到锁的需要同步到AQS队列中去(务必先了解上一篇文章中的内容
  • await 阻塞当前线程,添加到等待队列,释放锁
  • synchronized不同的是Condition里面可以用多个队列,放不同的线程

3.用Condition实现生产消费队列

Lock lock = new ReentrantLock();
//与synchronized不同的是Condition里面可以用多个队列 放不同的线程
Condition addCondition = lock.newCondition();
Condition removeCondition = lock.newCondition();
int count = 10;
List<String> list = new ArrayList<>(count);

public void producer() {
    int i = 0;
    while (true) {
        lock.lock();
        i++;
        try {
            if (list.size() == count) {
                System.out.println("队列满了");
                //阻塞生产者 释放锁 消费者消费时会唤醒生产者
                addCondition.await();
            }
            Thread.sleep(1000);
            list.add("abc"+i);
            System.out.println("生产者添加:" + i);
            //唤醒消费者
            removeCondition.signal();
        } catch (Exception e) {
        } finally {
            lock.unlock();
        }

    }
}

public void comsume() {
    while (true) {
        lock.lock();
        try {
            if (list.size() == 0) {
                System.out.println("队列空了");
                //阻塞消费者 释放锁 生产者添加时会唤醒
                removeCondition.await();
            }
            Thread.sleep(1000);
            System.out.println("消费者消费:"+list.get(0));
            list.remove(0);
            addCondition.signal();
        } catch (Exception e) {
        } finally {
            lock.unlock();
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    ConditionDemo prod = new ConditionDemo();
    new Thread(() -> { prod.producer(); }).start();
    Thread.sleep(100);
    new Thread(() -> { prod.comsume(); }).start();
}

代码一点要自己手写跑一跑,不然眨眼忘

4.Condition源码分析

Condition队列有自己管理的一个单向链表的等待队列

  • await()阻塞当前线程,添加到等待队列,释放锁(全部,可能会存在重入),然后再同步到AQS队列,有处理interrupt的操作(可能是中断标识唤醒,不是Signal唤醒)
//AbstractQueuedSynchronizer#ConditionObject#await
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //添加到Condition等待队列 与构建AQS队列差不多 但这个是单向队列 状态为CONDITION = -2;
    Node node = addConditionWaiter();
    //完全的释放锁 可能会重入
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断是否已经在AQS队列 然后根据状态为CONDITION会返回false阻塞当前线程
    //signal唤醒会同步到AQS队列 然后isOnSyncQueue 会返回true跳出循环
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //等待被唤醒后,从这里开始要先处理是不是由interrupt中断唤醒
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //接下来就是AQS里的内容了 这里是拿到锁以后的内容 是线程安全的
    //被唤醒后重新去竞争锁 savedState/被释放的锁重入次数
    //如果没抢到就会由继续parkAndCheckInterrupt阻塞,然后等待AQS队列中的线程唤醒
    if (acquireQueued(node,  savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        //清空无效线程
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

这个很好理解,就是在原有的AQS队列前面加了一个单向链表的等待队列。

  • 接下来继续分析唤醒源码Signal()
//AbstractQueuedSynchronizer#ConditionObject#signal
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //当下第一个等待节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        //取出下一个等待的节点 判断条件是transferForSignal
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	//过滤无效状态的节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

	//这个方法就很熟悉了 加到AQS队列
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果已经是待唤醒状态 直接唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒Condition队列的线程 又回到LockSupport.park(this);后面继续执行
        LockSupport.unpark(node.thread);
    return true;
}

这个源码仔细看起来还是很简单的,看不懂的多看几遍慢慢的就懂了。

5.Condition的实际应用

  • 可以生成不懂的阻塞队列,基于需求唤醒不同的线程
  • 阻塞队列、生产消费者
  • 线程池
  • 流量缓存

以上就是本章的全部内容了。

上一篇:J.U.C ReentrantLock可重入锁使用以及源码分析
下一篇:J.U.C中的阻塞队列使用及源码分析–ArrayBlockingQueue

志士惜日短,愁人知夜长

上一篇:炫“库“行动—人大金仓有奖征文——金仓数据库安装教程


下一篇:Java Condition类使用及分析