一、前言
??这几天准备研究一下Java
中阻塞队列的实现。Java
中的阻塞队列有七种,我准备逐一研究它们的源码,然后每一个阻塞队列写一篇分析博客,这是其中的第二篇。这篇博客就来说一说阻塞队列中比较常用的一种——LinkedBlockingQueue
。
??之前写了一篇分析ArrayBlockingQueue
源码的博客,感兴趣的可以看一看:阻塞队列——ArrayBlockingQueue源码分析。
二、正文
2.1 什么是阻塞队列
??在正式分析前,先简单介绍一下什么是阻塞队列。在说阻塞队列前,先要了解生产者消费者模式:
生产者消费者模式:生产者生产产品,将生产好的产品放入一个缓冲区域,消费者消费产品,它从缓冲区域获取生产者生产的产品进行消费。缓冲区域有容量限制,若缓存区域已经满了,则生产者需要停止生产,等待缓冲区有空闲位置后,再恢复生产;若缓冲区为空,则消费者需要等待,直到缓冲区中有产品后,才能进行消费;
??阻塞队列就是基于这种模式实现的队列型容器。阻塞队列的一般实现是:我们创建队列时,指定队列的容量,当队列中元素的个数已经满时,向队列中添加元素的线程将被阻塞,直到队列不满才恢复运行,将元素添加进去;当队列为空时,向队列获取元素的线程将被阻塞,直到队列不空才恢复运行,从队列中拿出元素。
??以上是阻塞队列的一般实现,根据具体情况的不同,也会有所差异,比如有的是基于链表实现,有的是基于数组实现;有的是阻塞队列的没有容量限制(*),而有的是有限制的(有界)。我们现在要分析的LinkedBlockingQueue就是一个基于链表实现的有界阻塞队列。下面我们就来从源码的角度分析一下LinkedBlockingQueue
。
2.2 LinkedBlockingQueue类的成员变量
??我们先来看看LinkedBlockingQueue
类的成员变量,了解它的成员变量对于我们理解它的实现原理会用很大的帮助:
/** 记录阻塞队列允许的最大容量 */
private final int capacity;
/** 使用int的原子类记录队列中元素的个数 */
private final AtomicInteger count = new AtomicInteger();
/** LinkedBlockingQueue基于链表实现,head记录链表头节点 */
transient Node<E> head;
/** LinkedBlockingQueue基于链表实现,last记录链尾头节点 */
private transient Node<E> last;
/** ReentrantLock锁对象,用来保证获取元素时的线程同步 */
private final ReentrantLock takeLock = new ReentrantLock();
/** takeLock上创建的条件对象,在队列为空时,通过此对象来阻塞消费者线程 */
private final Condition notEmpty = takeLock.newCondition();
/** ReentrantLock锁对象,用来保证添加元素时的线程同步 */
private final ReentrantLock putLock = new ReentrantLock();
/** putLock上创建的条件对象,在队列满时,通过此对象来阻塞生产者线程 */
private final Condition notFull = putLock.newCondition();
??通过以上成员变量,我们可以知道很多信息:LinkedBlockingQueue是基于链表实现的,且有capacity变量证明它是一个有界阻塞队列;成员变量中有两个lock对象,分别用来同步生产者线程和消费者线程,减小了锁的粒度,生产者和消费者可以同时运行,这两个锁对象使用默认构造函数创建,也就是说创建的是非公平锁。既然LinkedBlockingQueue
是由链表实现,那我们就来看看链表的节点实现:
static class Node<E> {
// 节点值
E item;
// 下一个节点的引用
Node<E> next;
Node(E x) { item = x; }
}
??可以看到,Node
类的实现非常简单,一个存储值的变量,一个指向下一个节点的引用,仅此而已。接下来我们看看构造方法:
2.3 LinkedBlockingQueue的构造方法
/** 带参构造方法,参数为阻塞队列的容量 */
public LinkedBlockingQueue(int capacity) {
// 容量必须大于0
if (capacity <= 0) throw new IllegalArgumentException();
// 记录容量
this.capacity = capacity;
// 初始化链表,创建一个无值的Node,头尾指针均指向它
last = head = new Node<E>(null);
}
// 默认构造方法
public LinkedBlockingQueue() {
// 调用带参构造方法,默认容量为int的最大值
this(Integer.MAX_VALUE);
}
??构造方法的逻辑也比较简单,唯一值得注意的是:若使用默认构造方法创建,则阻塞队列的默认容量为int的最大值。
2.4 入队方法的实现
??对于一个阻塞队列来说,最核心的就是它入队和出队的实现,下面我们就来分析一下LinkedBlockingQueue
类中入队方法的实现。元素入队的方法有多个,我们先来分析其中最核心的方法——put
方法:
public void put(E e) throws InterruptedException {
// 新元素不能为空
if (e == null)
throw new NullPointerException();
// 初始化一个c变量,后面用于记录插入新元素前,队列中元素的个数
int c = -1;
// 将新元素封装成一个Node
Node<E> node = new Node<E>(e);
// 因为是添加元素,所以这里使用put锁进行线程同步,先获取put锁
final ReentrantLock putLock = this.putLock;
// 获取记录元素个数的变量
final AtomicInteger count = this.count;
// 在正式操作前,先使用putLock加锁,调用的是lockInterruptibly方法
// 这个方法在在线程被阻塞时可以响应中断,使用它是防止线程一直无法添加成功,
// 长期被阻塞在此处
putLock.lockInterruptibly();
try {
// 添加元素前线判断队列中元素是否已经满了,
// 若满了则使用notFull对象,让当前线程阻塞,直到被另一个线程唤醒
// 使用while而不是if,目的是防止线程被唤醒时,
// 队列仍然是满的,所以需要重复判断
while (count.get() == capacity) {
notFull.await();
}
// 调用enqueue方法将新节点加入队列的尾部
enqueue(node);
// getAndIncrement方法返回count的旧值,然后让count+1
c = count.getAndIncrement();
/**************关键点1****************/
// c + 1就是插入当前这个节点后,队列中元素的数量
// 若插入这个元素后,队列依旧没有满,则唤醒一个生产者线程
// 也就是向队列中添加元素的线程(前提是有这么一个线程)。
// 为什么是在添加一个元素后,唤醒另外一个生产者线程,
// 而不是在有线程获取元素后,唤醒一个生产者线程呢,这不是才正常吗?
// 答案就是消费者线程使用的是take锁,而唤醒生产者线程需要的是put锁,
// 为了减少额外加锁解锁的次数,我们就可以在这里唤醒生产者线程,
// 因为这里在添加元素前,已经获取了put锁了,不需要重复获取。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
/**************关键点2****************/
// c表示插入之前,队列中元素的个数,若插入之前c == 0,
// 表示在插入前,队列是空,意味着很有可能存在正在等待的消费者线程
// 于是调用signalNotEmpty方法唤醒一个消费者线程。
// 可以看到,只有当添加元素之前,队列为空,这里才会唤醒一个消费者线程。
// 但是可能有多个消费者线程在等待,此时唤醒一个,那剩下的那些怎么办?
// 答案就是可以在获取元素的方法中唤醒它们,原理就是上面那段很长的注释
if (c == 0)
signalNotEmpty();
}
/** 此方法将节点加入到链表的末尾 */
private void enqueue(Node<E> node) {
/*
* 以下代码可以分解为:
* last.next = node;
* last = node
*/
last = last.next = node;
}
/** 此方法用于唤醒一个消费者线程 */
private void signalNotEmpty() {
// 因为消费者线程是获取元素,所以使用的是take锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// notEmpty由take锁创建,所以需要先锁定take锁,再调用signal方法
// 否则将抛出异常
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
??以上就是put
方法的实现,逻辑还是比较简单的,它的过程概括来说就是:将需要添加的元素封装成为一个Node,然后获取put锁,在添加前先判断当前队列是否已经满了,若满了,则会被阻塞等待,直到被其他线程唤醒;队列未满时,将元素添加到链表的末尾,若添加完后,队列依旧没有满,则再唤醒一个生产者线程。添加元素完成后,若判断添加前,队列为空,则很有可能有消费者线程在等待,于是唤醒一个消费者线程。put
方法中,我用注释标记出了两个关键点,这两个关键点是作者对锁的一个优化,take
方法中也有这两个关键点,它们相互对应,请结合take
方法理解。下面我们再来看看另一个添加元素的方法offer
:
/** 向队列中添加元素,但是不会阻塞 */
public boolean offer(E e) {
// 判空
if (e == null)
throw new NullPointerException();
// 获取记录元素数量的count变量
final AtomicInteger count = this.count;
// 如果当前队列已经满了,则直接返回false,不进行添加
if (count.get() == capacity)
return false;
// 此变量的作用于put方法中一致
int c = -1;
// 封装成Node
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 因为是添加元素,所以使用put锁进行锁定,这里调用的lock()方法
// lock方法不响应中断,这里不需要响应中断,所以选择使用lock,
// 不需要响应中断是因为这个方法并不会阻塞线程
putLock.lock();
try {
// 再次判断当前队列是否满了,为什么再次判断?
// 因为在上一次判断后,CPU可能暂停了当前线程,转而执行其他线程
// 在这个过程中可能有其他线程向队列中添加了元素
if (count.get() < capacity) {
// 调用enqueue方法将元素添加到队列的尾部
enqueue(node);
// 此处操作与put方法中相同,不重复描述
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 解锁
putLock.unlock();
}
// 此处操作与put方法相同
if (c == 0)
signalNotEmpty();
// c记录的是添加前,队列中元素的个数,不可能出现负数,所以此处返回的一定是true
return c >= 0;
}
/**
* 此方法向队列中添加元素,若队列已经满了,则线程被阻塞,但是参数中限制了阻塞时间
* 若超时后,当前线程还没有添加成功,则不继续等待,直接返回
* 参数:
* 1、timeout:超时时间的数量级,一个long类型的整数
* 2、unit:timeout的单位,例如TimeUnit.SECOND表示的就是秒
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null)
throw new NullPointerException();
// 将超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
// 以下几句与put方法相同
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 若当前队列已经满了,则线程需要进入等待
while (count.get() == capacity) {
// 判断等待的剩余时间是否<=0,若满足此条件,表示等待时间已经超时
if (nanos <= 0)
return false;
// 使用notFull对象让当前线程阻塞,传入需要阻塞的时间,但是这个方法并不精确
// 所以会返回剩余需要阻塞的时间,这也就是为什么上一句需要判断nanos <= 0
nanos = notFull.awaitNanos(nanos);
}
// 以下操作和put相同
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
??以上就是offer
方法的实现,可以看到,offer
方法被重载了两次,第一个是直接向队列中添加元素,不会被阻塞,添加成功返回true
,失败则返回false
;而另外一个offer
方法,在无法添加时会被阻塞,但是限定了阻塞的超时时间,若超时还未添加成功,则不会继续等待。
2.5 出队方法的实现
??看完了入队的方法实现,下面再来看看出队的方法实现。出队的方法主要有两个,poll
方法,以及阻塞队列的核心方法之一——take
方法。下面我们就先来看看take
方法:
public E take() throws InterruptedException {
E x;
// 此变量的作用与put方法中一致,用来记录插入前,队列中元素的个数
int c = -1;
final AtomicInteger count = this.count;
// 由于是向队列中获取元素,所以使用的是take锁
final ReentrantLock takeLock = this.takeLock;
// 实际操作前先锁定,调用lockInterruptibly锁定,且这个方法响应中断
// 此处需要响应中断,因为这个方法会阻塞线程
takeLock.lockInterruptibly();
try {
// 若队列为空,则当前线程需要等待,直到被其他线程唤醒
while (count.get() == 0) {
notEmpty.await();
}
// 调用dequeue获取队列的队头元素
x = dequeue();
// 获得count的值,然后count + 1
c = count.getAndDecrement();
/*************关键点1***************/
// 若队列中原来的元素数量>1,则表示当前线程拿走一个元素后,队列中还有元素
// 于是此处唤醒其他消费者线程,让他们获取元素
// 此处很关键,对应着put方法中的关键点2(注意是put,而不是此方法take),put方法中,
// 只有添加元素前,队列为空,才会唤醒一个消费者线程,而剩余的消费者线程在此处唤醒
// 因为这里已经拿到了take锁,不需要为了唤醒消费者线程再次获取take锁
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
/*******************关键点2*******************/
// 此处判断,若在获取元素之前,队列是满的,那说明很有可能有生产者线程在等待
// 因为这里拿走了一个元素,所以队列有空位了,于是就唤醒一个生产者线程,添加元素
// 值得注意的是,等待的生产者线程可能不止一个,这里只唤醒了一个,剩下的怎么办,
// 答案就在put方法的关键点2那里,由put方法唤醒了剩下的生产者线程
if (c == capacity)
signalNotFull();
return x;
}
/** 此方法获取队列头节点的值,并将头节点从队列删除 */
private E dequeue() {
// 获取头节点
Node<E> h = head;
// 获取头节点的下一个节点
Node<E> first = h.next;
// 让头节点指向自己,也就是让头节点从链表上断开
h.next = h; // help GC
// 设置新的头节点,也就是原来头节点的下一个节点
head = first;
// 获取原来头节点的值
E x = first.item;
// 将原来头节点的值置为空,有助于垃圾回收
first.item = null;
// 返回结果
return x;
}
??以上方法的逻辑也是比较简单的,相信有了注释理解起来不会太难。上面的take
方法中,需要中点关注的就是我用注释标记出的关键点1和关键点2,它们分别对应于put
方法中的关键点2和关键点1,这样编写代码的意图就是为了尽量少获取锁,减少频繁获取和释放锁导致的资源消耗,提高性能。除了take
方法,还有另外一个元素出队的方法poll
,他被重载了两次,下面来看一看:
public E poll() {
// 获取元素数量数量
final AtomicInteger count = this.count;
// 若队列为空,则直接返回null,表示获取失败
if (count.get() == 0)
return null;
// 以下几句代码于take方法相同
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
// 调用lock()方法加锁,不响应中断,因为当前方法不会阻塞,所以可以不用响应中断
takeLock.lock();
try {
// 再次判断队列是否为空,因为在上一次判断之后,CPU可能暂停了这个线程,
// 转而执行其他线程,这个过程中可能有线程向队列中添加了元素
if (count.get() > 0) {
// 以下代码均与take方法中相同,不重复解释
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
* 此方法向队列中获取元素,若队列为空线程将会被阻塞,但是需要指定超时时间,
* 超时后,线程还未获取元素,直接返回;
* 参数timeout和unit的含义与会超时的offer方法相同,
* 分别表示超时时间的数量级,已经时间的单位
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 将超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
// 以下方法与take方法相同,不重复解释
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 若队列为空,则线程需要等待
while (count.get() == 0) {
// 判断等待的剩余时间,若剩余时间<=0,表示已经超时,直接返回null
if (nanos <= 0)
return null;
// 让当前线程在notEmpty中的等待nanos纳秒,因为awaitNanos方法不精确,
// 所以这个方法会返回一个值,表示剩余需要等待的时间,所以才有了上一句的if
nanos = notEmpty.awaitNanos(nanos);
}
// 以下代码与take相同,不重复解释
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
??好了,关于LinkedBlockingQueue
中的方法就说到这里。我们仔细观察这些方法会发现,它们的实现步骤大同小异,我写的很多注释都是重复的,因为每一个方法中都有类似的操作。
2.6 LinkedBlockingQueue的优势与劣势
??最后,我们来分析以下LinkedBlockingQueue
的优势。这里主要介绍它相对于ArrayBlockingQueue
的优势和劣势:
-
LinkedBlockingQueue
内部使用了两把锁进行线程同步,一把锁同步消费者线程,一把锁同步生产者线程,这也就意味着在添加元素时,不会影响获取元素,反之亦然。由于消费者操作的是队头,而生产者操作的是队尾,所以也不会发生线程安全问题,这样大大提高了队列的吞吐量。但是ArrayBlockingQueue
内部只使用一把锁,生产者执行时,消费者也无法执行。 -
LinkedBlockingQueue
基于链表实现,所以如果我们要对队列进行随机删除操作,将会非常高效;但是ArrayBlockingQueue
基于数组实现,随机删除操作的消耗会很高,以为需要重整元素在数组中的位置。当然,队列是一个尾进头出的容器,所以在使用时还是不要进行随机删除操作。
??再说说它的劣势:
-
LinkedBlockingQueue
中的lock
对象使用的是非公平锁,无法根据需求切换为公平锁;而ArrayBlockingQueue
可以根据实际情况,选择是否使用公平锁;
2.7 LinkedBlockingQueue的误区
??这里再提一个很多人对LinkedBlockingQueue
的误区。有许多人认为LinkedBlockingQueue
既可以是一个有界队列,也可以是一个*队列,这是一种错误的理解,LinkedBlockingQueue就是一个有界的阻塞队列。有人会这样认为的原因是创建LinkedBlockingQueue
对象时可以不指定容量,但是不要忘记,如果我们不指定容量,底层实现也会指定一个默认的容量,即int
的最大值。在我们向其中添加元素时,底层实现可以确保元素数量不会超过设置好的容量,这就是有界的。
??而什么是*?举一个最简单的例子,Java
中的另一个阻塞队列PriorityBlockingQueue
就是*的,它底层用数组存储元素,我们也可以指定容量,但是当元素个数到达容量时,PriorityBlockingQueue
并不会阻塞,而是进行扩容,这就是*。
三、总结
??关于LinkedBlockingQueue
就先说这么多吧,重点是要理解各种阻塞队列的结构,以及最关键的take
和put
方法实现的原理,只要理解了这些,就能有选择性地,更好地使用这些阻塞队列。