记得以前有个最经典的面试题:如何用多个线程顺序的从1输出到100?
上章说了Java中锁的使用以及原理分析,上述面试题应该手到擒来了吧
本章主要说下Java中线程通信实现生产消费队列
以及Condition源码
线程通信
利用共享锁的互斥实现两个线程通信,从而实现生产消费队列
1.用Synchronized的wait/notify实现生产消费队列
//定义一个队列
static Queue<Integer> list = new LinkedList<>();
//定义队列的大小
static int size = 10;
//生产这代码 放线程执行
public static void producer() throws InterruptedException {
int i = 0;
while (true) {
i++;
//给队列加锁
synchronized (list) {
if (list.size() == size) {
System.out.println("队列满了");
//队列满了就等待,消费者消费后又会唤醒生产者
list.wait();
}
Thread.sleep(1000);
list.add(i);
System.out.println("生产者添加:" + i);
//唤醒线程后会从wait后面接着执行 意味着又是去抢占到锁才能继续执行
list.notify();
}
}
}
//消费者代码
public static void comsume() throws InterruptedException {
while (true) {
synchronized (list) {
if (list.size() == 0) {
System.out.println("队列空了");
//队列空了就等待,生产者生产了就会唤醒消费者
list.wait();
}
Thread.sleep(1000);
Integer remove = list.remove();
System.out.println("消费者消费:" + remove);
list.notify();
}
}
}
- 这个基于synchronized实现的是底层源码就不说了,在Java中的JUC也有实现,我们一起看看吧
2.J.U.C中的Condition
-
signal/await
等价于synchronized的notify/wait
- 也是必须要抢占锁的
- signal 唤醒多个线程,唤醒后需要同步竞争锁,没抢到锁的需要同步到AQS队列中去(
务必先了解上一篇文章中的内容
) - await 阻塞当前线程,添加到等待队列,释放锁
- synchronized不同的是Condition里面可以用多个队列,放不同的线程
3.用Condition实现生产消费队列
Lock lock = new ReentrantLock();
//与synchronized不同的是Condition里面可以用多个队列 放不同的线程
Condition addCondition = lock.newCondition();
Condition removeCondition = lock.newCondition();
int count = 10;
List<String> list = new ArrayList<>(count);
public void producer() {
int i = 0;
while (true) {
lock.lock();
i++;
try {
if (list.size() == count) {
System.out.println("队列满了");
//阻塞生产者 释放锁 消费者消费时会唤醒生产者
addCondition.await();
}
Thread.sleep(1000);
list.add("abc"+i);
System.out.println("生产者添加:" + i);
//唤醒消费者
removeCondition.signal();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
public void comsume() {
while (true) {
lock.lock();
try {
if (list.size() == 0) {
System.out.println("队列空了");
//阻塞消费者 释放锁 生产者添加时会唤醒
removeCondition.await();
}
Thread.sleep(1000);
System.out.println("消费者消费:"+list.get(0));
list.remove(0);
addCondition.signal();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ConditionDemo prod = new ConditionDemo();
new Thread(() -> { prod.producer(); }).start();
Thread.sleep(100);
new Thread(() -> { prod.comsume(); }).start();
}
代码一点要自己手写跑一跑,不然眨眼忘
4.Condition源码分析
Condition队列有自己管理的一个单向链表的等待队列
- await()阻塞当前线程,添加到等待队列,释放锁(全部,可能会存在重入),然后再同步到AQS队列,有处理
interrupt
的操作(可能是中断标识唤醒,不是Signal唤醒
)
//AbstractQueuedSynchronizer#ConditionObject#await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到Condition等待队列 与构建AQS队列差不多 但这个是单向队列 状态为CONDITION = -2;
Node node = addConditionWaiter();
//完全的释放锁 可能会重入
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断是否已经在AQS队列 然后根据状态为CONDITION会返回false阻塞当前线程
//signal唤醒会同步到AQS队列 然后isOnSyncQueue 会返回true跳出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//等待被唤醒后,从这里开始要先处理是不是由interrupt中断唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//接下来就是AQS里的内容了 这里是拿到锁以后的内容 是线程安全的
//被唤醒后重新去竞争锁 savedState/被释放的锁重入次数
//如果没抢到就会由继续parkAndCheckInterrupt阻塞,然后等待AQS队列中的线程唤醒
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
//清空无效线程
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
这个很好理解,就是在原有的AQS队列前面加了一个单向链表的等待队列。
- 接下来继续分析唤醒源码Signal()
//AbstractQueuedSynchronizer#ConditionObject#signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//当下第一个等待节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
//取出下一个等待的节点 判断条件是transferForSignal
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//过滤无效状态的节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//这个方法就很熟悉了 加到AQS队列
Node p = enq(node);
int ws = p.waitStatus;
//如果已经是待唤醒状态 直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒Condition队列的线程 又回到LockSupport.park(this);后面继续执行
LockSupport.unpark(node.thread);
return true;
}
这个源码仔细看起来还是很简单的,看不懂的多看几遍慢慢的就懂了。
5.Condition的实际应用
- 可以生成不懂的阻塞队列,基于需求唤醒不同的线程
- 阻塞队列、生产消费者
- 线程池
- 流量缓存
以上就是本章的全部内容了。
上一篇:J.U.C ReentrantLock可重入锁使用以及源码分析
下一篇:J.U.C中的阻塞队列使用及源码分析–ArrayBlockingQueue
志士惜日短,愁人知夜长