LinkecBlockQueue链表阻塞队列,从命名可以看出,它是基于链表实现的。同样这也是个先进先出的队列,队头是队列里入队时间最长的元素,队尾则是入队时间最短的。理论上它的吞吐量要超出数组阻塞队列ArrayBlockingQueue。LinkedBlockQueue可以指定容量限制,在没有指定的情况下,默认为Integer.MAX_VALUE
1, 成员变量
1 //最大容量
2 private final int capacity;
3 //当前队列长度
4 pirvate final AtomicInteger count = new AtomaticInteger();
5 //头结点,用于标志队列头,其item永远为null,head.next为第一个入队节点
6 transient Node<E> head;
7 //尾结点
8 private transient Node<E> last;
9 //取值锁,用于take/poll方法
10 private final ReentrantLock takeLock = new ReentrantLock();
11 //表示队列非空的对象监视器
12 private final Condition notEmpty = takeLock.newCondition();
13 //存值锁,用于put、offer等方法。
14 private final ReentrantLock putLock = new ReentrantLock();
15 //表示队列非满的对象监视器
16 private final Condition notFull = putLock.newCondition();
与ArrayBlockingQueue相比,LinkedBlockingQueue的重入锁被分成了两份,分别对应应存值和取值。这种实现方法被称为双锁队列算法,这样做的好处在于,读写操作的lock操作是由两个锁来控制,互不干涉,因此可以同时进行读操作和写操作,这也是LinkedBlockQueue吞吐量超过ArrayBlockingQueue的原因。但是,使用两个锁要比一个锁复杂很多,需要考虑各种死锁的情况。
2, signalNotEmpty()和signalNotFull()
notEmpty/notFull分别对应非空和非满锁的条件监视器,signalNotEmpty()和signalNotFull()方法分别负责唤醒对应的入队、出队线程:
1 private void signalNotEmpty() {
2 final ReentrantLock takeLock = this.takeLock;
3 takeLock.lock();
4 try {
5 notEmpty.signal();
6 } finally {
7 takeLock.unlock();
8 }
9 }
10 private void signalNotFull() {
11 final ReentrantLock putLock = this.putLock;
12 putLock.lock();
13 try {
14 notFull.signal();
15 } finally {
16 putLoct.unlock();
17 }
18 }
LinkedBlockingQueue使用双锁算法来实现,在需要唤醒重入锁的时候,重入锁与监视器可能不是对应的,以put(E)方法为例,存值方法在执行完成后,如果队列内有值存在,那么需要对notEmpty进行唤醒,但是put方法明显使用putLock进行加锁的,而notEmpty则是用来监视takeLock,所以需要封装signal方法,以便调用。
3, add、put、offer方法
LinkedBlockingQueue提供的存值方法中,并未实现add方法,该方法由父类AbstractQueue实现,父类的add方法直接调用了offer方法,并在offer返回false时抛出异常。
put方法为入队方法,如果队列已满,那么阻塞当前线程,直到notFull被唤醒:
1 public void put(E e) throws InerruptedException {
2 if(e == null) throw new NulPointerException();
3 int c = -1;
4 Node<E> node = new Node<E>(e);
5 final ReentrantLock putLock = this.putLock;
6 final AtomicInteger count = this.count;
7 putLock.lockInterruptibly();
8 try {
9 //队列已满情况下,阻塞当前线程
10 while(count.get() == capacity) {
11 notFull.await();
12 }
13 enqueue(node);
14 c = count.getAndIncrement();
15 //c+1小于容量,说明可以通知等待队列非满状态的对象监视器
16 if(c + 1 < capacity) {
17 notFull.signal();
18 }
19 } finally {
20 putLock.unlock();
21 }
22 //c == 0意味着最新的count是由0变化为1,需要通知等待队列非空状态的线程
23 if(c == 0) {
24 signalNotEmpty();
25 }
26 }
offer(E)方法,尝试入队一次,如果失败,返回false,offer和put的唯一区别在于offer会判断count是否达到容量,也就是判断队列是否已满,队满则不再执行入队操作。offer(E,long,TimeUnit)与put非常相似,会不停的尝试入队,区别在于使用了awaitNanos(long)方法而不是await()。
4, remove、take、poll方法
remove用于移除指定对象
1 public boolean remove(Object o) {
2 if(o == null) return false;
3 fullyLock();
4 try {
5 for(Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
6 if(o.equals(p.item)) {
7 unlink(p, tail);
8 return true;
9 }
10 }
11 return false;
12 } finally {
13 fullyUnlock();
14 }
15 }
16 void fullyLock() {
17 putLock.lock();
18 takeLock.lock();
19 }
20 void fullyUnlock() {
21 takeLock.unlock();
22 putLock.unlock();
23 }
fullLock和fullyUnlock方法会对putLock和takeLock统一上锁或者解锁,因为LinkedBlockingQueue是一个双向链表,remove可能会同时影响到入队和出队操作。移除指定节点的原理是从头开始遍历,通过equals来确定是否一致。
take方法会不停地尝试从队头出队
1 public E take() throws InterruptedException {
2 E x;
3 int c = -1;
4 final AtomicInteger count = this.count;
5 final ReentrantLock takeLock = this.takeLock;
6 takeLock.lockInterruptibly();
7 try {
8 //队列已空的情况下,阻塞当前线程
9 while(count.get() == 0) {
10 notEmpty.await();
11 }
12 x = dequeue();
13 c = count.getAndDecrement();
14 if(c > 1) {
15 notEmpty.signal();
16 }
17 } finally {
18 takeLock.unlock();
19 }
20 //意味着刚刚从队满状态脱离需要唤醒等待notFull状态的线程
21 if(c == capacity) {
22 signalNotFull();
23 }
24 return x;
25 }
poll()方法用于从队尾出队,与take()的区别是他只会尝试一次,失败返回null。poll(long, TimeUnit)有超时的poll,则会不停的请求出队。
1 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
2 E x = null;
3 int c = -1;
4 long nanos = unit.toNanos(timeout);
5 final AtomicInteger count = this.count;
6 final ReentrantLock takeLock = this.takeLock;
7 takeLock.lockInterruptibly();
8 try {
9 while (count.get() == 0) {
10 if(nanos <= 0) {
11 return null;
12 }
13 nanos = notEmpty.awaitNanos(nanos);
14 }
15 x = dequene();
16 c = count.getAndDecrement();
17 if(c > 1) {
18 notEmpty.signal();
19 }
20 } finally {
21 takeLock.unlock();
22 }
23 if(c == capacity) {
24 signalNotFull();
25 }
26 return x;
27 }
Poll(long, TimeUnit)的实现和take()基本一致,不同之处在于notEmpty对象监视器的await方法换成了带超时的awaitNanos方法,这将使poll(long, TimeUnit)阻塞给定时间,直到出队成功或者抛出中断异常。
与ArrayBlockingQueue相比,LinkedBlockingQueue的实现更为复杂,但深究其实现,遵循以下原则:
入队后,如果队列未满,那么唤醒下一入队线程,如果队列原本是空队列,那么唤醒出队线程。
出队后,如果队列未空,那么唤醒下一出队线程,如果队列原本是满队列,那么唤醒入队线程。