Condition接口
Condition是一个接口,其提供的就两个核心方法,await和signal方法。分别对应着Object的wait和notify方法。调用Object对象的监视器方法的这两个方法,
需要在同步代码块里面,即必须先获取到锁才能执行这两个方法。同理,Condition调用这两个方法,也必须先获取到锁,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。
Object的监视器方法与Condition接口的对比
Condition简单用法
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说Condition是依赖Lock对象的。Condition的使用方式比较简单,需要注意在调用方法前获取锁.如下面代码所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。
package com.brian.mutilthread.condition; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class ConditionDemo { private static Lock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(ConditionDemo::run); thread.start(); try { Thread.sleep(1000); } catch (Exception e) { } lock.lock(); // 唤醒 condition.signal(); lock.unlock(); log.info(" === {} ===: {} 33333",Thread.currentThread().getName()); } private static void run() { lock.lock(); try { log.info("=== {} ===: {} 11111", Thread.currentThread().getName()); // 等待 condition.await(); log.info("=== {} ===: {} 22222", Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }
Condition常用API
手写基于condition的队列
package com.brian.mutilthread.condition; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class BrianQueue<T> { private Object[] items; // 添加的下标,删除的下标和数组当前数量 private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition emptyCondition = lock.newCondition(); private Condition fullCondition = lock.newCondition(); public BrianQueue(int size) { items = new Object[size]; } // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位" public void add(T t) throws InterruptedException { lock.lock(); try { while (count == items.length) { log.info("===== queue is full and be blocked ======"); fullCondition.await(); } items[addIndex] = t; if (++addIndex == items.length) { addIndex = 0; } log.info("=== add() ===: {}", addIndex); ++count; emptyCondition.signal(); } finally { lock.unlock(); } } // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素 @SuppressWarnings("unchecked") public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) { log.info("===== queue is empty and be blocked ======"); emptyCondition.await(); } Object x = items[removeIndex]; if (++removeIndex == items.length) { removeIndex = 0; } log.info("=== remove() ===: {}", removeIndex); --count; fullCondition.signal(); return (T) x; } finally { lock.unlock(); } } }
测试类
package com.brian.mutilthread.condition; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class BrianQueueDemo { public static void main(String[] args) throws InterruptedException { BrianQueue<Integer> brianQueue = new BrianQueue<>(5); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(()->{ Integer num = 0; while (true){ try { brianQueue.add(++num); } catch (InterruptedException e) { e.printStackTrace(); } } }); executorService.execute(()->{ while (true){ try { brianQueue.remove(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
Condition await() 和 signal()源码解读
此处以Condition的实现类ConditionObject,ConditionObject是同步器AbstractQueuedSynchronizer的内部类来分析
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 将当前节点添加到最后一个节点 Node node = addConditionWaiter(); //释放锁的状态 long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //重新获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //获取单向链表, Node first = firstWaiter; if (first != null) doSignal(first); }
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。