1、LinkedBlockingQueue概述
LinkedBlockingQueue,顾名思义,一个链式的(linked)、阻塞的(Blocking)队列(Queue)。
Queue,首先想到的是FIFO特性。
Linked,Queue其结构本质上也是线性表,可以由链表和顺序表实现,LinkedBlockingQueue就是链表实现,ArrayBlockingQueue是顺序表实现。因Queue 只在首尾操作,所以操作链表和顺序表的时间复杂度是一样的,但顺序表的实现会占用更少的空间,因为不需要“指针”域(next),但空间必须是连续的;链式实现不需要连续空间,但需要使用next
来指向下一个节点位置,以下LinkedBlockingQueue的节点结构。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
Blocking,阻塞,LinkedBlockingQueue是线程安全的,当队列满了以后,所有的入队操作将会被阻塞;当队列空了,所有的出队操作将会被阻塞。队列初始化的时候,我们可以指定队列长度capacity,如果没有指定,LinkedBlockingQueue的默认capacity是Integer.MAX_VALUE
。显然,capacity还是一个不可更改的值。
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
2、LinkedBlockingQueue实现代码详解
如果要看懂LinkedBlockingQueue的实现,需要熟悉wait/notify以及AbstractQueuedSynchronizer(AQS)。题外话,个人认为并发编程中有三个非常重要的东西:等待通知机制、CAS以及AQS。
2.1 head和tail
- head和tail分别指示队列的首尾,可快速地定位take和put操作位置。注意头结点head和首节点first的区分。
transient Node<E> head;
private transient Node<E> last;
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
2.2 count
- count表示当前队列中元素的个数,其使用并发包下的AtomicInteger类来实现原子操作,该类的核心还是cas操作。AtomicInteger类型的count对于队列的线程安全有着至关重要的作用,因为接下来会看到take和put操作是两个独立的锁。
- 有兴趣的话,可以看看ArrayBlockingQueue,其只使用了一个锁来保证线程安全,所以它的count没有使用AtomicInteger,而是一个int类型。
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
2.3 锁与条件
- takeLock以及putLock分别定义了take操作以及put操作锁。
- take操作的条件是notEmpty,所以在执行take操作时会先判断当前队列是否还有元素可以take,如果没有那么就要执行
notEmpty.await()
让take线程阻塞。 - put操作的条件是notFull,所以在执行put操作时会先判断当前队列是否还有空间可以put元素,如果没有剩余空间那么就要执行
notFull.await()
。 - 成功 take以后,会判断一下take之前队列是不是满的,如果是,说明可能会有put线程被阻塞了,所以会调用
signalNotFull()
方法去唤醒那些put线程。 - 成功put以后,会判断一下在put之前队列是不是空的,如果是,说明可能会有take线程是阻塞的,所以会调用
signalNotEmpty()
去唤醒那些take线程。 - 4、5两步设计的相当好,先判断是不是Empty或者Full,然后再去调用唤醒方法,避免无谓的唤醒操作。但是这一步在理解的时候有点费解。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
2.4 put
- 在对尾插入一个指定的元素e,如果没有空间,线程将会等待。
- e不允许为空,该队列不存储null元素,否则抛NullPointerException
- 局部变量c初始值为-1,其存储当前队列的元素个数,准确地说是put操作之前的元素个数,因为
c = count.getAndIncrement()
,而getAndIncrement()
返回的是previous值(c的值很重要,不然无法理解唤醒操作)。 -
putLock.lockInterruptibly()
,获取put lock。 - 如果
count.get() == capacity
,即队列已经没有剩余空间了,那么条件为not Full
的操作,即put操作线程要执行语句notFull.await()
进入等待;否则正常入队。 - 正常入队后,count加1,c获取的是入队前的值(这点需要注意)。
-
c + 1 < capacity
表示如果当前队列的元素个数小于capacity,那么就可以唤醒一下那些条件为not Empty
的put操作线程(当然,此时不一定会有等待线程)。 - 如果
(c == 0)
,即put之前队列是空的,那么就有可能有take操作线程在等待,所以执行signalNotEmpty()
,该方法会先获取take锁,然后唤醒等待的take线程来take。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
2.5 take
- take和put原理上是相同的,take是从first节点开始出队,注意区分head;如果队列中没有节点,那么take线程就需要等待。
- 局部变量c初始值为-1,其存储当前队列的元素个数,准确地说是take操作之前的元素个数。
-
takeLock.lockInterruptibly()
获取take lock。 -
count.get() == 0
,如果当前队列中没有元素,那么条件为not Empty
的take操作线程将要等待;否则正常出队。 -
c > 1
表示take以前队列中至少是有2个元素,那么可以唤醒其它在等待的take线程,操作为notEmpty.signal()
。 -
c == capacity
表示take操作前队列是满的,那么就有可能有put线程在等待着,因此执行signalNotFull()
,该方法首先获取put锁,然后唤醒那些可能在等待的put线程。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
2.6 offer
- 重载的两个offer方法本质上也是put操作,但在操作上略有不同。
- 一个offer方法提供了线程等待时间,其先进入条件的等待队列等待。
- 另一个offer方法是能入队就入队,不能就返回false,不等了。
- 这两种offer方法可根据实际需要来适当选择。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
2.7 poll与peek
- 两个poll方法也是take的改版,一个是超时等待,一个干脆就不等了,有就取,没有就算了。两种方法可在实际应用中按需选用。
- peek方法和take方法不同的是没有出队,只是"看看"首元素first.item。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
3、总结
- LinkedBlockingQueue体现了生产者/消费者模型,借助wait/notify机制,可实现take、put操作线程的等待与唤醒。
- AtomicInteger类型的count(队列中当前元素个数)以及双锁机制(take和put锁)共同使得LinkedBlockingQueue是线程安全的。实现方式值得学习和体会。
能力与时间有限(当然主要是能力),错漏之处还请评论指正。