首先,我们先了解一下什么是阻塞队列:
-
当队列满了时,队列会阻塞插入元素的线程,直到队列不满;
-
当队列为空时,获取元素的线程会等待队列变成非空。
常用到的方法
上面是对阻塞队列的简单了解,下面重点分析一下LinkedBlockingQueue。
源码分析
Node节点
- 可以看出是单向的链表结构
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
构造方法和参数
- 如果未设置初始容量,则默认是Integer.MAX_VALUE;
/** 队列容量 */
private final int capacity;
/** 目前元素数量 */
private final AtomicInteger count = new AtomicInteger();
/** 头节点 */
transient Node<E> head;
/** 末尾节点 */
private transient Node<E> last;
/** take, poll 方法的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 等待获取 条件队列*/
private final Condition notEmpty = takeLock.newCondition();
/** put, offer 方法的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 等待存入的 条件队列 */
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//初始化的时候设置头节点和尾节点为两个空节点
}
插入
put 方法
- 如果队列已经满了,则放入到条件队列中。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);//创建新节点
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//获取put锁
try {
//判断存入的元素个数和配置的数量是否相等,如果相等。那么将当前线程放入到条件队列中
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);//将节点插入到末尾
c = count.getAndIncrement();//元素数量+1
if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
notFull.signal();
} finally {
putLock.unlock();释放put锁
}
// 唤醒获取条件队列的头节点
if (c == 0)
signalNotEmpty();
}
//将节点设置为尾节点
private void enqueue(Node<E> node) {
last = last.next = node;
}
// 唤醒“获取条件队列”中的首节点
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//t获取ake锁
try {
notEmpty.signal();//唤醒“获取条件队列”中的首节点
} finally {
takeLock.unlock();
}
}
offer 方法
- 如果超过容量就无法插入
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)//如果超过容量直接返回false,表示不能再插入数据
return false;
int c = -1;
Node<E> node = new Node<E>(e);//新建节点
final ReentrantLock putLock = this.putLock;
putLock.lock();//获取put锁
try {
if (count.get() < capacity) {//小于容量时增加节点,并唤醒“存入条件队列”头节点到同步队列
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
notFull.signal();
}
} finally {
putLock.unlock();
}
// 唤醒获取条件队列的头节点
if (c == 0)
signalNotEmpty();
return c >= 0;
}
take 方法
- 获取链表中的头节点。如果不存在元素,则将当前线程放入到条件队列中。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//获取锁
try {
while (count.get() == 0) {//如果为空则放入“获取条件队列”
notEmpty.await();
}
x = dequeue();//将节点加入到链表最后
c = count.getAndDecrement();//数量减1
if (c > 1)//如果元素书大于1,则调用“获取条件队列”中的元素放入同步队列
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 唤醒存入条件队列的头节点
if (c == capacity)
signalNotFull();
return x;//返回头节点
}
// 获取头节点元素
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
// 唤醒“存入条件队列”的头节点到同步队列
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll方法
- 如果没有数据立即返回null
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;//获取锁
takeLock.lock();
try {
if (count.get() > 0) {//当前容器数量大于0时
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 唤醒存入条件队列的头节点
if (c == capacity)
signalNotFull();
return x;
}
总结
1.如何保证当队列没有消息或者消息满了的时候,进行监听?
上面看代码的时候,两段代码刚开始是有点懵的。
1.存入的方法
// 唤醒获取条件队列的头节点
if (c == 0) signalNotEmpty();
2.获取的方法
// 唤醒存入条件队列的头节点
if (c == capacity) signalNotFull();
其实这就监听的重要环节。
逻辑是这样的。以存入为例(获取同样的道理):
1.如果当前节点为0,说明队列中没有任务;
2.唤醒“获取条件队列”的头节点,去尝试获取元素。如果获取到则执行,如果没有,则依然放入到“获取条件队列”的末尾;
3.这样就可以保证在存入数据的时候,实时监听获取节点元素了。