简介
LinkedBlockingQueue 是一个基于链表的有限队列(理论上它是无限的)。只支持一段入队,另一端出队。无阻塞对应LindedList,它们除了一个线程安全一个线程不安全以外,最大的区别是LinkedList可以放null值。
LinkedBlockingQueue 类
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
继承AbstractQueue,并实现BlockingQueue接口
重要内部类Node
static class Node<E>
Node 属性
// 元素
E item;
// 下级节点
Node<E> next;
Node 构造函数
Node(E x) { item = x; }
Node 是对元素封装
LinkedBlockingQueue 属性
// 链表最大长度
private final int capacity;
// 元素个数
private final AtomicInteger count = new AtomicInteger();
// 头结点
transient Node<E> head;
// 尾结点
private transient Node<E> last;
// 读锁
private final ReentrantLock takeLock = new ReentrantLock();
// 空监控
private final Condition notEmpty = takeLock.newCondition();
// 写锁
private final ReentrantLock putLock = new ReentrantLock();
// 满监控
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue 构造函数
// 默认构造函数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 固定长度构造函数
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 使用线性集合初始化
public LinkedBlockingQueue(Collection<? extends E> c) {
// 初始化队列
this(Integer.MAX_VALUE);
// 获取写锁
final ReentrantLock putLock = this.putLock;
// 加锁
putLock.lock();
try {
int n = 0;
// 遍历参数
for (E e : c) {
// 元素为空抛异常(一样不能添加空元素)
if (e == null)
throw new NullPointerException();
// 满了抛异常
if (n == capacity)
throw new IllegalStateException("Queue full");
// 存入元素
enqueue(new Node<E>(e));
++n;
}
// 元素个数
count.set(n);
} finally {
// 解锁
putLock.unlock();
}
}
LinkedBlockingQueue 基础方法
长度
public int size() {
return count.get();
}
剩余长度
public int remainingCapacity() {
return capacity - count.get();
}
放开读限制
private void signalNotEmpty() {
// 获取读锁
final ReentrantLock takeLock = this.takeLock;
// 加锁(已经获取锁的线程处于等待,其他线程是可以获取锁的)
takeLock.lock();
try {
// 放开读限制
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
}
放开写限制
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
// 加锁(已经获取锁的线程处于等待,其他线程是可以获取锁的)
putLock.lock();
try {
// 放开写限制
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
}
获取读锁和写锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
释放读锁和写锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
LinkedBlockingQueue 添加
添加,满时阻塞
public void put(E e) throws InterruptedException {
// 添加空元素抛异常
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
// 获取写锁
final ReentrantLock putLock = this.putLock;
// 获取当前元素个数
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
// 元素个数是否等于最大长度
while (count.get() == capacity) {
// 满监控等待
notFull.await();
}
// 存入
enqueue(node);
// 元素个数加1(先返回后加)
c = count.getAndIncrement();
// 判断是否满了
if (c + 1 < capacity)
// 没满,放开写入限制
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
// count原值为0时,才会去放开读限制
if (c == 0)
signalNotEmpty();
}
添加,满时等待,超时退出
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 添加空元素抛异常
if (e == null) throw new NullPointerException();
// 等待时间
long nanos = unit.toNanos(timeout);
int c = -1;
// 获取写锁
final ReentrantLock putLock = this.putLock;
// 获取当前元素个数
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
// 元素个数是否等于最大长度
while (count.get() == capacity) {
// 等待时间是否小于0
if (nanos <= 0)
return false;
// awaitNanos超时,则返回值小于等于0
nanos = notFull.awaitNanos(nanos);
}
// 添加
enqueue(new Node<E>(e));
// 元素个数加1(先返回后加)
c = count.getAndIncrement();
// 没满,放开写入限制
if (c + 1 < capacity)
notFull.signal();
} finally {
// 解锁
putLock.unlock();
}
// count原值为0时,才会去放开读限制
if (c == 0)
signalNotEmpty();
return true;
}
添加,失败返回false
public boolean offer(E e) {
// 添加空元素抛异常
if (e == null) throw new NullPointerException();
// 获取当前元素个数
final AtomicInteger count = this.count;
// 已满返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
// 获取锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 未满添加
if (count.get() < capacity) {
enqueue(node);
// 元素个数加1(先返回后加)
c = count.getAndIncrement();
// 未满,放开写限制
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 解锁
putLock.unlock();
}
// count原值为0时,才会去放开读限制
if (c == 0)
signalNotEmpty();
return c >= 0;
}
实际添加
private void enqueue(Node<E> node) {
// 链表尾部添加元素
last = last.next = node;
}
LinkedBlockingQueue 出队
出队,空则等待
public E take() throws InterruptedException {
E x;
int c = -1;
// 元素个数
final AtomicInteger count = this.count;
// 获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 元素个数等于0则等待
while (count.get() == 0) {
notEmpty.await();
}
// 实际出队
x = dequeue();
// 元素个数减1(先返回后减)
c = count.getAndDecrement();
// 未空,放开读限制
if (c > 1)
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
// count原值为capacity时,才会去放开写限制
if (c == capacity)
signalNotFull();
return x;
}
出队,空则等待,超时退出
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 超时时间
long nanos = unit.toNanos(timeout);
// 元素个数
final AtomicInteger count = this.count;
// 获取读锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 元素个数等于0则等待
while (count.get() == 0) {
// 等待时间 小于0退出
if (nanos <= 0)
return null;
// awaitNanos超时,则返回值小于等于0
nanos = notEmpty.awaitNanos(nanos);
}
// 出队
x = dequeue();
// 元素个数减1(先返回后减)
c = count.getAndDecrement();
// 未空,放开读限制
if (c > 1)
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
// count原值为capacity时,才会去放开写限制
if (c == capacity)
signalNotFull();
return x;
}
出队,空则返回null
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
// 实际出队
x = dequeue();
// 元素个数减1(先返回后减)
c = count.getAndDecrement();
// 未空,放开读限制
if (c > 1)
notEmpty.signal();
}
} finally {
// 解锁
takeLock.unlock();
}
// count原值为capacity时,才会去放开写限制
if (c == capacity)
signalNotFull();
return x;
}
实际出队操作
private E dequeue() {
// 头节点指向下一个节点,原头节点清空
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
LinkedBlockingQueue 删除元素
public boolean remove(Object o) {
// 空元素返回false
if (o == null) return false;
// 获取双锁
fullyLock();
try {
// 遍历所有元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 查找第一一样的元素
if (o.equals(p.item)) {
// 删除
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 释放双锁
fullyUnlock();
}
}
实际删除
void unlink(Node<E> p, Node<E> trail) {
// 删除元素
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
// 原来是满的,就放开写限制
if (count.getAndDecrement() == capacity)
notFull.signal();
}
LinkedBlockingQueue 是否包含
public boolean contains(Object o) {
// 空元素返回false
if (o == null) return false;
// 获取双锁
fullyLock();
try {
// 遍历元素
for (Node<E> p = head.next; p != null; p = p.next)
// 查找一样的元素
if (o.equals(p.item))
return true;
return false;
} finally {
// 释放双锁
fullyUnlock();
}
}
LinkedBlockingQueue 清空
public void clear() {
// 获取双锁
fullyLock();
try {
// 遍历链表
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
// 清空所有元素
h.next = h;
p.item = null;
}
head = last;
// 原来是满的,就放开写限制
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
// 释放双锁
fullyUnlock();
}
}