一、什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
常见的有以下几种
(1)ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
(2)LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,而在未指明容量时,容量默认为Integer.MAX_VALUE。
(3)LinkedBlockingDeque: 使用双向队列实现的双端阻塞队列,双端意味着可以像普通队列一样FIFO(先进先出),可以以像栈一样FILO(先进后出)
(4)PriorityBlockingQueue: 一个支持优先级排序的*阻塞队列,对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较,跟时间没有任何关系,仅仅是按照优先级取任务。
(5)DelayQueue:同PriorityBlockingQueue,也是二叉堆实现的优先级阻塞队列。要求元素都实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
(6)SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
二、Condition的原理
阻塞队列实际上是使用了Condition来模拟线程间协作。
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition。
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
注意事项
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
三、put和take的原理
这里用ArrayBlockingQueue来举例
它带有的属性如下
// 存储队列元素的数组,是个循环数组
final Object[] items;
// 拿数据的索引,用于take,poll,peek,remove方法
int takeIndex;
// 放数据的索引,用于put,offer,add方法
int putIndex;
// 元素个数
int count;
// 可重入锁
final ReentrantLock lock;
// notEmpty条件对象,由lock创建
private final Condition notEmpty;
// notFull条件对象,由lock创建
private final Condition notFull;
1.数据的添加
put:阻塞添加
public void put(E e) throws InterruptedException {
checkNotNull(e); // 不允许元素为空
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程
try {
while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里
notFull.await(); // 线程阻塞并被挂起,同时释放锁
insert(e); // 调用insert方法
} finally {
lock.unlock(); // 释放锁,让其他线程可以调用put方法
}
}
insert
private void insert(E x) {
items[putIndex] = x; // 元素添加到数组里
putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
++count; // 元素个数+1
notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
}
2.数据的获取
take:通知模式
Condition notEmpty:队列为空,notEmpty会await
插入元素会signal
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
try {
while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
return extract(); // 调用extract方法
} finally {
lock.unlock(); // 释放锁,让其他线程可以调用take方法
}
}
extract
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
items[takeIndex] = null; // 对应取索引上的数据清空
takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0
--count; // 元素个数-1
notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
return x; // 返回元素
}