- checkNotNull(Object v)
- enqueue(E x)
- put(E e)
- offer(E e, long timeout, TimeUnit unit)
- dequeue()
- take()
- poll(long timeout, TimeUnit unit)
- removeAt(final int removeIndex)
- remove(Object o)
阻塞队列可以用于线程池的等待队列,生产者消费者的通信通道,本文讲解ArrayBlockingQueue。参考Collection之BlockingQueue)
根据类名,可以知道这个数据结构是队列,因此数据的进出顺序是FIFO;阻塞的含义为,当需要生产/消费时,队列没有空间/数据,则对应的操作会阻塞住,直到其他操作解除这个阻塞状态
ArrayBlockingQueue作为阻塞队列,特点是内部使用Object数组(使用成了环形),并且创建时需要指定大小,有两个指针分别对应生产和消费操作
里面的迭代器弄个专题一起看吧
成员变量:
/** 队列里元素数量 */
int count;
/** 存储结构 */
final Object[] items;
/** 为下一次执行 take, poll, peek or remove 操作提供的index */
int takeIndex;
/** 为下一次执行 put, offer, or add 操作提供的index */
int putIndex;
/** 进行并发控制,以及线程间通信的锁(通信主要靠condition实现) */
final ReentrantLock lock;
/** 当队列为空时获取等待 */
private final Condition notEmpty;
/** 当队列满时插入等待 */
private final Condition notFull;
主要方法:
// 在不超过队列容量的情况下立即插入指定的元素,成功后返回true,如果队列已满则抛出IllegalStateException。
public boolean add(E e)
// 在不超过队列容量的情况下立即在队列末尾插入指定的元素,如果成功则返回true,如果队列已满则返回false。此方法通常比add(E)方法更好,后者插入元素失败只能抛出异常。
public boolean offer(E e)
// 将指定的元素插入到此队列的末尾,如果队列已满则等待直到有可用的空间。
public void put(E e) throws InterruptedException
// 将指定的元素插入到此队列的末尾,如果队列已满,则在指定的超时时间之内等待空间可用,超时返回false。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
// 检索并删除此队列的头,如果此队列为空,则返回null。
public E poll()
// 检索并删除此队列的头,如有必要则等待,直到某个元素可用为止。
public E take() throws InterruptedException
// 检索并删除此队列的头,如果有必要则在指定的等待时间之内等待元素可用,超时返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException
// 检索但不删除此队列的头,或在此队列为空时返回null。
public E peek()
// 返回此队列中的元素数量。
public int size()
// 返回此队列在理想情况下(在没有内存或资源约束的情况下)可以不阻塞地接受的新元素的数量。它总是等于这个队列的初始容量减去这个队列的当前大小。
public int remainingCapacity()
// 如果指定元素存在,则从此队列中移除该元素的单个实例。更正式地说,如果队列中包含一个或多个这样的元素,则只删除匹配到的第一个元素
public boolean remove(Object o)
// 如果此队列包含至少一个指定的元素,则返回true。
public boolean contains(Object o)
// 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列。返回的数组将是“安全的”,因为此队列不维护对它的引用。
public Object[] toArray()
// 返回一个数组,该数组包含此队列中的所有元素,按适当的顺序排列;返回数组的运行时类型是指定数组的运行时类型。
public <T> T[] toArray(T[] a)
// 返回此集合的字符串表示形式。
public String toString()
// 删除此队列中的所有元素。此调用返回后,队列将为空。
public void clear()
// 从此队列中删除所有可用元素并将它们添加到给定集合中。此操作可能比重复轮询此队列更有效。在试图将元素添加到集合c时遇到失败抛出相关异常时可能会导致:元素不在原集合或者集合c中,或者两个集合中都没有。
public int drainTo(Collection<? super E> c)
// 从该队列中最多删除给定数量的可用元素,并将它们添加到给定集合中。异常情况同上
public int drainTo(Collection<? super E> c, int maxElements)
// 按适当的顺序返回此队列中元素的迭代器。元素将按从第一个(head)到最后一个(tail)的顺序返回。返回的迭代器是弱一致的。
public Iterator<E> iterator()
// 返回该队列中元素的Spliterator。返回的spliterator是弱一致的。
public Spliterator<E> spliterator()
方法中能和阻塞联系起来的,是put和take,同时offer和poll,也提供了对应的超时控制,重点关注这四个方法
checkNotNull(Object v)
/**
* Throws NullPointerException if argument is null.
*
* @param v the element
*/
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
enqueue(E x)
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
//获取到Object[]的引用
final Object[] items = this.items;
//将putIndex位置上位置为x
items[putIndex] = x;
//把数组当成了一个环形的去使用
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知说现在队列有数据了
notEmpty.signal();
}
put(E e)
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
//对e进行非空判断
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//核心是这个while循环,阻塞在notFull上,等待notFull.signal
//关于conditon,还需要再搞搞AQS的源码,不然不懂为啥这里是while,不能用if
while (count == items.length)
notFull.await();
//将e入队
//这里为什么不需要对迭代器进行操作呢?
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
offer(E e, long timeout, TimeUnit unit)
/**
* Inserts the specified element at the tail of this queue, waiting
* up to the specified wait time for space to become available if
* the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//这个方法也是在AQS里面实现的,后面再专门弄下吧
//我猜测是等待nanos时间,到点了就返回一个《=0的值
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
dequeue()
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//保存takeIndex对应的数据
E x = (E) items[takeIndex];
items[takeIndex] = null;
//当作一个环形数组使用
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//这里对我来说也是很不熟练的地方,关于迭代器,后面也得专门弄一弄,比如ArrayList,Map之类的
if (itrs != null)
itrs.elementDequeued();
//发送notFull的信号
notFull.signal();
//返回保存的中间结果
return x;
}
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//当前队列没有数据,则等待直到notEmpty发来信号
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
poll(long timeout, TimeUnit unit)
和前面offer一样,都只是加上了一个等待时间的机制
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
removeAt(final int removeIndex)
/**
* Deletes item at array index removeIndex.
* Utility for remove(Object) and iterator.remove.
* Call only when holding lock.
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
//如果要删除的下标刚好是takeIndex,当作一次普通的出队即可
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//对迭代器进行操作
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
//把removeIndex后面的数据,都向前挪动一位
for (int i = removeIndex;;) {
int next = i + 1;
//根据环形队列,来获取i的next下标
if (next == items.length)
next = 0;
//如果next不是putIndex,说明不为null,向前挪动
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
//此时 i == putIndex,将i处的数据置为null(上一步已经向前挪动了)
//将putIndex更新为i
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
//更新迭代器
if (itrs != null)
itrs.removedAt(removeIndex);
}
//发送notFull的信号
notFull.signal();
}
remove(Object o)
特定删除,服用了removeAt()方法
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//先找到o对应的下标i
if (o.equals(items[i])) {
//针对具体的下标,进行删除
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex) //如果i遍历到了putIndex,说明队列中没有o;
}
return false;
} finally {
lock.unlock();
}
}