一、阻塞队列简介
队列常被用来解决生产——消费者问题,Java中定义了Queue
接口以及通用的一些抽象方法
public interface Queue<E> extends Collection<E> {
// 添加一个元素,添加成功返回true,如果队列满了就抛出异常
boolean add(E e);
//添加一个元素,添加成功返回true,如果队列满了返回false
boolean offer(E e);
// 删除并返回队首元素,队列为空则抛出异常
E remove();
// 移除并返回队首元素,队列为空则返回null
E poll();
// 返回队首元素,但并不移除,队列为空则抛出异常
E element();
// 返回队首元素,但并不移除,队列为空则返回null
E peek();
}
上面所列举出来的只是普通的队列的通用方法,而Java中的阻塞队列BlockingQueue
,继承了Queue
接口,同时又添加了两个具有阻塞功能的抽象方法,同时又提供了offer()
和poll
两个可阻塞的重载方法:
通过下面阻塞方法的定义可以看出,只要是会被阻塞的方法,都会抛出InterruptedException
异常
public interface BlockingQueue<E> extends Queue<E> {
// 添加元素,队列满时,插入线程会被阻塞,直到队列不满
void put(E e) throws InterruptedException;
// 移除并返回元素,队列为空时,获取元素线程会被阻塞,直到队列非空
E take() throws InterruptedException;
// 可以指定添加元素时线程被阻塞的超时时间
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 可以指定获取元素时线程被阻塞的超时时间
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
}
对BlockingQueue
的常用方法做一个归纳如下:
方法 | 抛出异常 | 返回特定值 | 阻塞 | 指定阻塞时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
出队 | remove() | poll() | take() | poll(time,unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
阻塞队列除了具有可阻塞的特性之外,还有另外一个重要的特性就是容量大小,分为有界和*。没有绝对意义的上的*,只是这个界限很大,可以放很多元素。以LinkedBlockingQueue
为例,它的容量大小为Integer.MAX_VALUE
,这是一个非常大的数字,我们通常认为它就是*的。也有一些阻塞队列是有界的,比如ArrayBlockingQueue
,如果达到最大容量之后,也不会进行扩容。所以一旦满了就无法再往里面放数据了。
BlockingQueue
同时也是线程安全的,它可以保证多线程的情况下,保证生产者和消费者的线程安全,其内部大多都是采用CAS
和ReentrantLock
来保证线程安全,业务代码无需再关注多线程安全的问题,直接向队列里面放或者取就可以了,如图所示:
同时,阻塞队列还启动了资源隔离的作用,在复杂业务中,业务A完成后,只需要将结果丢到队列中即可,不需要关心后面的步骤,业务B会从队列中获取任务来执行对应的业务,实现了业务之间的解耦,也可以提高安全性。
下面就介绍一些常用的阻塞队列和部分核心源码
二、常用阻塞队列及核心源码分析
2.1 ArrayBlockingQueue
ArrayBlockingQueue
是一个典型的有界的线程安全的阻塞队列,初始化时需要指定其容量大小,其内部元素使用数组进行存储,以put()
方法为例,使用ReentrantLock
来保证线程安全,通过条件队列的两个条件notEmpty
和notFull
来进行阻塞和唤醒
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 加锁保证线程安全
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 使用while而不是if,是为了防止虚假唤醒
while (count == items.length)
// 队列满了,生产者阻塞
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 如果添加一个元素队列满了之后,会被putIndex置为0,典型的环形数组的实现
if (++putIndex == items.length)
putIndex = 0;
count++;
// 条件队列转同步队列并唤醒线程
notEmpty.signal();
}
}
由于ArrayBlockingQueue
的put()
和take()
方法使用ReentrantLock
进行同步,同时只有一个方法可以执行,所以在高并发的情况下,性能会比较差。
思考:ArrayBlockingQueue为什么采用双指针环形数组的方式?
普通的数组,删除数组元素时需要进行移位操作,导致它的时间复杂度为O(n),而采用双指针环形数组,不需要进行移位,只需要分别移动两个指针即可。
2.2 LinkedBlockingQueue
LinkedBlockingQueue
是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE
,由于这个数值特别大,所以 LinkedBlockingQueue
也被称作*队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM
错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item; // 元素内容
Node<E> next; // 下一个元素节点 单链表结构
Node(E x) { item = x; }
}
// 初始化容量,默认Integer.MAX_VALUE
private final int capacity;
// 元素个数,因为读写操作的锁分离,这里使用线程安全的计数变量
private final AtomicInteger count = new AtomicInteger();
// 链表头,本身不存储元素信息,其item为null
transient Node<E> head;
// 链表尾元素
private transient Node<E> last;
// 获取元素的锁,锁分离,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// 添加元素的锁
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用put锁,可以被中断
putLock.lockInterruptibly();
try {
// 队列满了,阻塞生产者线程
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
// 返回旧值
c = count.getAndIncrement();
// 可能有很多线程阻塞在notFull这个条件上,而取元素时才会唤醒notFull,此处不用等到取元素时才唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 队列之前为空,现在新增了一个元素后,可以直接去唤醒获取元素的线程
if (c == 0)
signalNotEmpty();
}
}
LinkedBlockingQueue与ArrayBlockingQueue对比
- ArrayBlockingQueue使用一个独占锁,读写不分离,而LinkedBlockeingQueue使用两个独占锁,读写操作锁分离,性能更好
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是*的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在*的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
2.3 LinkedBlockingDeque
LinkedBlockingDeque
是对LinkedBlockingQueue
的增强,其顶层接口为Deque
,该接口定义了更加丰富的操作队列的方法,通过方法名就可以看出来,这些方法打破了队列先进先出的固有规则,提供了可以从头部或者尾部操作的API
public interface Deque<E> extends Queue<E> {
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
E removeFirst();
E removeLast();
E pollFirst();
E pollLast();
E getFirst();
E getLast();
E peekFirst();
E peekLast();
}
BlockingDeque
接口继承了Deque
,同时又提供了几个可阻塞的方法
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
void putFirst(E e) throws InterruptedException;
void putLast(E e) throws InterruptedException;
E takeFirst() throws InterruptedException;
E takeLast() throws InterruptedException;
}
而LinkedBlockingDeque
实现了BlockingDeque
接口,其内部通过双向链表来记录元素,通过一把ReentrantLock
来保证线程安全,该类可以看成是ArrayBlockingQueue
和LinkedBlockingQueue
的结合与增强
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
// 超过容量,直接返回
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
// 唤醒被阻塞的获取元素的线程
notEmpty.signal();
return true;
}
}
2.4 SynchronousQueue
SynchronousQueue
是一个没有缓冲的BlockingQueue
,生产者线程对元素的插入操作put()
必须等待消费者的移除操作take()
,其模型如下图:
如上图所示,SynchronousQueue
最大的不同在于,它的容量为0,没有地方来缓存元素,这就导致了每次添加元素都会被阻塞,直到有线程来取元素;同理,取元素也是一样,取元素的线程也会被阻塞,直到有线程添加元素。
由于SynchronousQueue
不需要持有元素,它的作用在于直接传递,所以它非常适用于传递性场景做交换工作,生产者线程和消费者线程同步传递某些信息、事务或任务
SynchronousQueue
常见的一个场景就是在Executors.newCachedThreadPool()
中,因为不确定生产者的请求数量(创建任务),而这些请求又需要被及时处理,那么使用SynchronousQueue
为每个生产者线程分配一个消费者线程就是处理效率最高的方式。线程池会根据需要(新任务到来)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60s之后会被回收。
下面结合源码来分析它的实现原理:
SynchronousQueue
内部抽象类Transferer
提供了任务传递的方法transfer()
,而该方法内部包含了线程阻塞与唤醒的逻辑,而Transferer
有两个实现类TransferQueue
和TransferStack
,可以理解为存储阻塞线程的方式有两种:队列和栈。根据这两者的特性,可以分为公平和非公平的实现,队列满足FIFO(先进先出)
的特性,所以是公平的实现;而栈满足LIFO(后进先出)
的特性,所以是非公平的实现。
下面SynchronousQueue
的构造方法,提供了公平和非公平的选项,默认为非公平实现
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
下面以TransferQueue
为例简要分析元素入队和出队的操作,SynchronousQueue
的put()
和take()
都会去调用transfer()
方法添加元素或获取元素
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
下面分析TransferQueue
的transfer()
方法,在分析该方法之前,先看一下它的内部类QNode
,TransferQueue
中使用QNode
来记录元素和被阻塞的线程,其中还利用UNSAFE
来获取元素和下一个节点的偏移量,直接通过CAS修改对应的数值,QNode
中还有很多的CAS
方法这里没有一一列举出来。
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
// 下一个节点的指针
volatile QNode next; // next node in queue
// 元素内容
volatile Object item; // CAS'ed to or from null
// 被阻塞的线程
volatile Thread waiter; // to control park/unpark
// 用于区分节点类型,false表示为取元素,true为添加元素
final boolean isData;
// 通过CAS修改下一个节点(多线程安全)
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// QNode属性的偏移量
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
// 根据Unsafe类计算属性在QNode类中的偏移量
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
// 头节点
transient volatile QNode head;
// 尾节点
transient volatile QNode tail;
// 初始化时就创建一个元素为null,数据类型为false的节点,头尾节点都指向该该节点
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
// 计算头尾节点的偏移量,通过CAS直接修改(保证线程安全)
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面介绍了TransferQueue
大致的内部构造,下面重点看transfer()
方法实现,
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 自旋等待初始化完成
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 为空或者当前节点
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 防止其他线程修改,这里再次判断
if (t != tail) // inconsistent read
continue;
// 如果当前尾节点后面还有节点,则通过CAS把后面的节点修改为尾节点
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
// 如果需要超时阻塞,但超时时间小于0(不能阻塞),直接返回null
// put或take方法中中断该线程并抛出中断异常
if (timed && nanos <= 0) // can't wait
return null;
// 创建一个节点,通过CAS添加到尾节点后面,这个节点可以是取元素的节点,也可以是添加元素的节点
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
// 新的节点添加完成之后,通过CAS将其修改为尾节点
advanceTail(t, s); // swing tail and wait
// 自旋阻塞线程 下面重点介绍
Object x = awaitFulfill(s, e, timed, nanos);
// 如果返回的节点为当前节点,表示该节点被取消了,直接清除掉
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
// 队列不为空,且新的节点类型与队列里面的节点类型不一致(说明可以唤醒线程了)
QNode m = h.next; // node to fulfill
// 为了保证线程安全,再次判断自旋
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
// 如果m节点已经被别的线程处理了,这里就修改头节点自旋
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 当前线程去修改头节点,阻塞线程节点出队
advanceHead(h, m); // successfully fulfilled
// 唤起m节点被阻塞的线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
在transfer()
中有一个重要的方法awaitFulfill
,它会去进行自旋阻塞
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 根据处理器的核数计算自旋次数
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果当前线程被中断了,就修改S节点的item属性为当前节点
//然后在判断节点是否被取消时就直接判断其item值是否为当前节点即可
if (w.isInterrupted())
s.tryCancel(e);
// 当节点取消就返回
Object x = s.item;
if (x != e)
return x;
// 过了超时时间就取消
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋,达到一定次数之后,填充S节点的waiter属性为当前线程,然后就阻塞
// 至此一个节点的内容就完整了
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
2.5 PriorityBlockingQueue
PriorityBlockingQueue
是一个*的基于数组的优先级阻塞队列,虽然它是*的,但在初始化的时候,它是可以指定数组初始化容量的,它的*是基于它可以进行动态扩容而言的。
如果没有指定初始化容量,它默认的容量为11,最大容量为Integer.MAX_VALUE - 8
private static final int DEFAULT_INITIAL_CAPACITY = 11;
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
同时,PriorityBlockingQueue
是一个优先级队列,它每次出队都会返回优先级最高或最低的元素,它的构造方法中提供了自定义Comparator
比较器,默认情况下使用自然顺序升序排序。
通过下面的构造方法也可以看出,该队列线程安全是由ReentrantLock
来保证的,同时需要注意的是PriorityBlockingQueue不能保证同等优先级元素的顺序
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
那么PriorityBlockingQueue
如果只是简单的使用数组操作来对插入元素移除进行排序,其性能将是非常低的,而它采用的是最大最小堆的方式来插入或移除数据,大小堆只是逻辑上的一种操作方式而已,其储存结构依然是数组
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。
最大最小堆满足以下特性:
- 最大堆:根结点的键值是所有堆结点键值中最大者
- 最小堆:根结点的键值是所有堆结点键值中最小者
下图展示了最小二叉堆的情况:
最大最小堆按照从上到下,从左到右来一次表示索引位置,上图中右下角的数字表示该元素在数组中的索引下标
在最大最小二叉堆中,插入或移除元素时,都可能涉及到元素位置调整,而在二叉堆中,利用元素的下标索引,可以很简单的计算其父节点以及左右节点的下标(以索引下标为t的元素为例):
父节点:P(t) = (t-1) >>> 1 <=> (t-1)/2
左节点:L(t) = t <<< 1 +1 <=> t*2 +1
右节点:R(t) = t <<< 1 + 2 <=> t*2 +2
下面结合源码分析它是如何添加和移除元素的
由于PriorityBlockingQueue
是*队列,所以添加元素时线程不需要阻塞,容量不够进行扩容就可以了
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
// 加锁保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 如果已经达到当前容量就进行扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 需要注意如果没有执行比较器,元素类必须实现Comparable接口
siftUpComparable(n, e, array);
else
// 指定了比较器,就是用自定义的来做比较
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 添加元素后,直接唤醒被阻塞的获取元素的线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
扩容的代码如下,tryGrow()
方法在offer()
方法的while
循环体内部,就实现了CAS+自旋的方式来实现线程安全的扩容
private void tryGrow(Object[] array, int oldCap) {
// 释放锁,下面通过CAS来进行扩容
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果原来的容量小于64,则容量就扩大一倍再+2,否则容量直接扩大为原来的三倍
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 容量不能超过最大值
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
// 其他线程在扩容时,当前线程就让出CPU
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
核心的方法在于siftUpComparable()
和siftUpUsingComparator()
这两个方法,这两个方法才是二叉堆入队的核心方法,以siftUpUsingComparator()
为例来分析
这里面是一个while
循环,进行元素的上浮操作,每次都是获取当前节点的父节点,然后与插入的元素进行比对,如果比较的结果满足最大最小堆的结构,就直接退出循环,否则就换位,继续进行比较,知道满足条件
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
获取元素的代码如下,因为获取元素的线程会被阻塞,所以这个方法会抛出中断异常
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
// 如果没有元素就进行阻塞
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
// 最后一个元素的索引下标
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
// 取出最后一个元素
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
// 第一次进来时,取得是第二层的两个节点进行比较
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
// 如果左节点与右节点比较,满足比较条件,就把右节点的值作为与最后一个节点比较的值
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
2.6 DelayQueue
DelayQueue
是一个支持延时获取元素的阻塞队列,内部采用PriorityQueue
存储元素,同时元素必须实现Delayed
接口,接口的getDelay()
方法可以返回延时时间延时的时间,方法参数为时间工具类TimeUnit
在获取元素是,只有延迟时间到了才能从队列中提取元素。
延迟队列的特点:并不是先进先出,而是按照延迟时间的长短进行排序,下一个被执行的任务排在队列的最前面。
由于队列元素必须实现Delayed
接口,而该接口又继承自Comparable
接口,所以,元素类还要去实现compareTo()
方法,这样在创建队列时就不需要在额外创建Comparator
对象了,元素本身就具有了排序的能力。
下面定义了一个元素类
class DelayObject implements Delayed {
private String name;
private long time; //延时时间
public DelayObject(String name, long delayTime) {
this.name = name;
this.time = System.currentTimeMillis() + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = time - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed obj) {
if (this.time < ((DelayObject) obj).time) {
return -1;
}
if (this.time > ((DelayObject) obj).time) {
return 1;
}
return 0;
}
}
使用Demo:
//实例化一个DelayQueue
BlockingQueue<DelayObject> blockingQueue = new DelayQueue<>();
//向DelayQueue添加2个元素对象,注意延时时间不同
blockingQueue.put(new DelayObject("lizhi", 1000 * 10)); //延时10秒
blockingQueue.put(new DelayObject("linan", 1000 * 30)); //延时30秒
// 取出lizhi
DelayObject lizhi = blockingQueue.take();
// 取出linan
DelayObject linan = blockingQueue.take();
下面看一下DelayQueue
的构造,使用ReentrantLock
来保证线程安全,取元素需要进行阻塞,底层使用PriorityQeue
进行存储,这是一个优先级队列,与上面PriorityBlockingQueue
是一样的,只是没有阻塞功能
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
下面看一下具体的put()
和take()
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
// 加锁保证线程安全
lock.lock();
try {
// 调用PriorityQueue添加元素
// 与PriorityBlockingQueue的逻辑基本一致
q.offer(e);
// 如果当前队列只有这一个元素,就去唤醒阻塞的线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
下面是take()
方法,要比put()
方法复杂一些
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁保证线程安全,由于线程可能会被阻塞,所以这里可中断
lock.lockInterruptibly();
try {
for (;;) {
// 取出队列第一个元素,如果没有,直接让当前线程阻塞
E first = q.peek();
if (first == null)
available.await();
else {
// 如果延迟时间已到,直接取出该元素
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
// 如果有线程已经在阻塞了,就让当前线程直接去阻塞
if (leader != null)
available.await();
else {
// 没有线程阻塞,则记录当前线程,然后让当前线程阻塞,阻塞的时间等于最近元素的延迟时间
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 当前线程被唤醒后,重置leader,然后自旋
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 出队成功后,如果leader为空,并且当前对了还有元素,就去唤醒下一个被阻塞的线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
延迟队列的应用场景**:**
- 订单超时关闭:下单后在规定时间内没有付款就取消订单
- 异步短信通知:外卖下单成功60S之后给用户发送短信
- 关闭空闲连接:连接池中,有一些非核心的连接在空闲一段时间后就关闭
三、选择合适的阻塞队列
我们接触的比较多的就是线程中使用阻塞队列,线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
- FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
- CachedThreadPool 选取的是 SynchronousQueue
- ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列
注:ScheduledThreadPool
中使用的阻塞队列并不是DelayQueue
,而是自定义实现的DelayedWorkQueue
一般从以下几个维度来选择合适的阻塞队列
-
功能
比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。
-
容量
是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。
-
能够扩容
因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
-
内存结构
我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
-
性能
从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。