JDK包含许多存在状态依赖的类,例如FutureTask、Semaphore和BlockingQueue,他们的一些操作都有前提条件,例如非空、任务已完成等。
创建状态依赖类的最简单的房就是在JDK提供了的状态依赖类基础上构造。例如ValueLactch,如果这些不满足,可以使用Java语言或者类库提供的底层机制来构造,包括
- 内置的条件队列
- condition
- AQS
这一章就介绍这些。
1 状态依赖性的管理 State Dependence
下一节会介绍使用条件队列解决阻塞线程运行。
下面先介绍通过轮询和休眠的方式(勉强)的解决。
标准模板:
void blockingAction() throws InterruptedException { acquire lock on object state while (precondition does not hold) { release lock wait until precondition might hold optionally fail if interrupted or timeout expires reacquire lock } perform action }
看看阻塞有界队列的几种实现方式。依赖的前提条件:
- 不能从空缓存中获取元素
- 不能将元素放入已满的缓存中
不满足条件候,依赖状态的操作可以:
- 抛出异常
- 返回一个错误状态(码)
- 阻塞直到进入正确的状态
下面是基类,线程安全,但非阻塞。
@ThreadSafe public abstract class BaseBoundedBuffer <V> { @GuardedBy("this") private final V[] buf; @GuardedBy("this") private int tail; @GuardedBy("this") private int head; @GuardedBy("this") private int count; protected BaseBoundedBuffer(int capacity) { this.buf = (V[]) new Object[capacity]; } protected synchronized final void doPut(V v) { buf[tail] = v; if (++tail == buf.length) tail = 0; ++count; } protected synchronized final V doTake() { V v = buf[head]; buf[head] = null; if (++head == buf.length) head = 0; --count; return v; } public synchronized final boolean isFull() { return count == buf.length; } public synchronized final boolean isEmpty() { return count == 0; } }
“先检查再运行”的逻辑解决方案:
调用者必须自己处理前提条件失败的情况。当然也可以返回错误消息。
当然调用者可以不Sleep,而是直接重试,这就是忙等待或者自旋等待(busy waiting or spin waiting),如果换成很长时间都不变,那么这将会消耗大量的CPU时间!!!所以调用者自己休眠,sleep让出CPU。但这个时间就很尴尬:
- sleep长了万一一会前提条件就满足了岂不是白等了从而响应性低
- sleep短了浪费CPU时钟周期
另外可以试试yield,但是这也不靠谱。
@ThreadSafe public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> { public GrumpyBoundedBuffer() { this(100); } public GrumpyBoundedBuffer(int size) { super(size); } public synchronized void put(V v) throws BufferFullException { if (isFull()) throw new BufferFullException(); doPut(v); } public synchronized V take() throws BufferEmptyException { if (isEmpty()) throw new BufferEmptyException(); return doTake(); } } class ExampleUsage { private GrumpyBoundedBuffer<String> buffer; int SLEEP_GRANULARITY = 50; void useBuffer() throws InterruptedException { while (true) { try { String item = buffer.take(); // use item break; } catch (BufferEmptyException e) { Thread.sleep(SLEEP_GRANULARITY); } } } }
优化让客户端舒服些:
@ThreadSafe public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> { int SLEEP_GRANULARITY = 60; public SleepyBoundedBuffer() { this(100); } public SleepyBoundedBuffer(int size) { super(size); } public void put(V v) throws InterruptedException { while (true) { synchronized (this) { if (!isFull()) { doPut(v); return; } } Thread.sleep(SLEEP_GRANULARITY); } } public V take() throws InterruptedException { while (true) { synchronized (this) { if (!isEmpty()) return doTake(); } Thread.sleep(SLEEP_GRANULARITY); } } }
这种方式测试失败,那么释放锁,让别人做,自己休眠下,然后再检测,不断的重复这个过程,当然可以解决,但还需要做权衡,CPU使用率与响应性之间的抉择。
那么我们想如果这种轮询和休眠的dummy方式不用,而是存在某种挂起线程的方案,并且这种方法能够确保当某个条件为 true 时,立刻唤醒线程,那将极大简化实现工作,这就是条件队列的实现。
Condition Queues的名字来源:
it gives a group of threads called the wait set a way to wait for a specific condition to become true. Unlike
typical queues in which the elements are data items, the elements of a
condition queue are the threads waiting for the condition.
每个Java对象都可以是一个锁,每个对象同样可以作为一个条件队列,并且Object的wait、notify和notifyAll就是内部条件队列的API。对象的内置锁(intrinsic lock )和内置条件队列是关联的,要调用X中的条件队列的任何一个方法,都必须持有对象X上的锁。
Object.wait自动释放锁,并且请求os挂起当前线程,其他线程可以获得这个锁并修改对象状态。当被挂起的线程唤醒时,它将在返回之前重新获取锁。
@ThreadSafe public class BoundedBuffer <V> extends BaseBoundedBuffer<V> { // CONDITION PREDICATE: not-full (!isFull()) // CONDITION PREDICATE: not-empty (!isEmpty()) public BoundedBuffer() { this(100); } public BoundedBuffer(int size) { super(size); } // BLOCKS-UNTIL: not-full public synchronized void put(V v) throws InterruptedException { while (isFull()) wait(); doPut(v); notifyAll(); } // BLOCKS-UNTIL: not-empty public synchronized V take() throws InterruptedException { while (isEmpty()) wait(); V v = doTake(); notifyAll(); return v; } // BLOCKS-UNTIL: not-full // Alternate form of put() using conditional notification public synchronized void alternatePut(V v) throws InterruptedException { while (isFull()) wait(); boolean wasEmpty = isEmpty(); doPut(v); if (wasEmpty) notifyAll(); } }
注意,如果某个功能无法通过“轮询和休眠"来实现,那么条件队列也无法实现。
2 使用条件队列
2.1 条件谓词(The Condition Predicate)
条件谓词是使某个操作成为状态依赖操作的前提条件:
- take方法的条件谓词是”缓存不为空“,take方法在执行之前必须首先测试条件谓词
- put方法的条件谓词是”缓存不满“
在条件等待中存在一种重要的三元关系:
- 加锁
- wait方法
- 条件谓词
条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象和条件队列对象必须是同一个对象。
wait释放锁,线程挂起阻塞,等待直到超时,然后被另外一个线程中断或被通知唤醒。唤醒后,wait在返回前还需要重新获取锁,当线程从wait方法中唤醒,它在重新请求锁时不具有任何特殊的优先级,和其他人一起竞争。
2.2 过早唤醒
其他线程中间插足了,获取了锁,并且修改了遍历,这时候线程获取锁需要重新检查条件谓词。
wait block ----------race to get lock ------------------------------------------get lock ----- ^ wait block --------> race to get lock ------get lock------> perform action ---> release lock ^ notifyAll
当然有时,比如一个你根本不知道为什么别人调用了notify或notifyAll,也许条件谓词压根就没满足,但线程还是获取了锁,然后test条件谓词,释放锁,其他线程都来了这么一趟,发生这就是“谎报军情”啊。
基于以上这两种情况,都必须重新测试条件谓词。
When using condition waits (Object.wait or Condition.await):
/
- Always have a condition predicate——some test of object state that must hold before proceeding;
- Always test the condition predicate before calling wait, and again after returning from wait;
- Always call wait in a loop;
- Ensure that the state variables making up the condition predicate are guarded by the lock associated with the condition queue;
- Hold the lock associated with the the condition queue when calling wait, notify, or notifyAll
- Do not release the lock after checking the condition predicate but before acting on it.
模板就是:
void stateDependentMethod() throws InterruptedException { // condition predicate must be guarded by lock synchronized(lock) { while (!conditionPredicate()) //一定在循环里面做条件谓词 lock.wait(); //确保和synchronized的是一个对象 // object is now in desired state //不要释放锁 } }