前言Java的线程池中,在核心线程数已满的情况下,任务会存储在阻塞队列中,那么什么是阻塞队列呢?
阻塞队列首先是个队列,在队列的基础上,支持另外两个附加操在队列为空时,获取元素的线程会等待队列变为非空
- 在队列满时,添加元素的线程会等待队列可用
那么阻塞队列是如何实现阻塞的?
自己实现一个阻塞队列Synchronized、wait、notifyAll实现的阻塞队列
public class BlockingQueue {
// 放置元素索引
private int inputIndex;
// 取出元素索引
private int takeIndex;
// 元素数组
private String[] elements;
// 数组中元素数量
private int count;
public BlockingQueue(int capacity) {
elements = new String[capacity];
}
public Object take() throws InterruptedException {
synchronized(this) {
// 这里使用while的原因是线程被唤醒之后需要再判断一次数组是否已经有元素
while (count == 0) {
// 数组没有元素了,等待
this.wait();
}
Object e = dequeue();
this.notify();
System.out.println("take method: " + Arrays.toString(elements));
return e;
}
}
public void put(String str) throws InterruptedException {
synchronized (this) {
// 这里使用while的原因是线程被唤醒之后需要再判断一次数组元素是否有空闲位置
while (count == elements.length) {
// 数组元素满了,等待
this.wait();
}
enqueue(str);
System.out.println("put method: " + Arrays.toString(elements));
this.notify();
}
}
/**
* 入队方法
* @param e 元素
*/
private void enqueue(String e) {
elements[inputIndex] = e;
// 如果数组已满,input返回开头
if (++inputIndex == elements.length) {
inputIndex = 0;
}
count ++;
}
/**
* 出队方法
* @return
*/
private Object dequeue() {
Object e = elements[takeIndex];
elements[takeIndex] = null;
// 如果takeIndex已到数组终点,返回开头
if (++takeIndex == elements.length) {
takeIndex = 0;
}
count --;
return e;
}
}
public static void main(String[] args) {
BlockingQueue queue = new BlockingQueue(10);
// 10个线程不断放置元素
IntStream.range(0, 10).forEach(i -> {
Thread a = new Thread(() -> {
try {
queue.put("element");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
a.start();
});
// 10个线程取出元素
IntStream.range(0, 10).forEach(i -> {
Thread b = new Thread(() -> {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
b.start();
});
}
condition、await、singal实现的阻塞队列
public class BlockingQueueWithCondition {
// 放置元素索引
private int inputIndex;
// 取出元素索引
private int takeIndex;
// 元素数组
private String[] elements;
// 数组中元素数量
private int count;
ReentrantLock lock = new ReentrantLock();
Condition notEmpty = lock.newCondition();
Condition notFull = lock.newCondition();
public BlockingQueueWithCondition(int capacity) {
elements = new String[capacity];
}
public String take() throws InterruptedException {
lock.lock();
try {
// 数组没有元素了,等待
while (count == 0) {
notEmpty.await();
}
String str = elements[takeIndex];
elements[takeIndex] = null;
// 如果takeIndex已到数组终点,返回开头
if (++takeIndex == elements.length) {
takeIndex = 0;
}
notFull.signal();
System.out.println("take method: " + Arrays.toString(elements));
count--;
return str;
} finally {
lock.unlock();
}
}
public void put(String str) throws InterruptedException {
lock.lock();
try {
// 数组元素满了,等待
while (count == elements.length) {
notFull.await();
}
elements[inputIndex] = str;
// 如果inputIndex已到数组终点,返回开头
if (++inputIndex == elements.length) {
inputIndex = 0;
}
notEmpty.signal();
System.out.println("put method: " + Arrays.toString(elements));
count++;
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
BlockingQueueWithCondition queue = new BlockingQueueWithCondition(10);
// 10个线程不断放置元素
IntStream.range(0, 10).forEach(i -> {
Thread a = new Thread(() -> {
try {
queue.put("element");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
a.start();
});
// 10个线程取出元素
IntStream.range(0, 10).forEach(i -> {
Thread b = new Thread(() -> {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
b.start();
});
}
ReentrantLock、 Condition(await与signal)与synchronized、wait、notify非常相似,那么两者有什么差别呢?
- 调用wait时,首先需要确保调用了wait方法的线程已经持有了对象的锁,调用wait后,该线程会释放掉这个对象的锁,进入等待队列(wait set)
- 当调用notify时,系统会随机唤醒该对象等待队列中任意一个线程,当这个线程被唤醒后,它就会与其它线程一同竞争对象的锁
- synchronized获取锁和释放锁都是通过JVM底层来操作,无需开发者关注
- ReentrantLock获取锁和释放锁可以由开发者操作,更加灵活,调用await方法的线程会进入对象的等待队列中,调用singal方法时可以指定唤醒某个对象等待队列中的阻塞任务
JDK中提供了非常多的阻塞队列,这里只解析LinkedBlockingQueue,如果理解了上面阻塞队列的写法,可以很快理解JDK阻塞队列的源码
一些重要的参数
// 阻塞队列中的元素会包装成一个节点,有链表必有节点
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 阻塞队列容量,
private final int capacity;
// 阻塞队列当前元素个数
private final AtomicInteger count = new AtomicInteger();
// 阻塞队列头节点
transient Node<E> head;
// 阻塞队列尾节点
private transient Node<E> last;
// LinkedBlockingQueue使用了两把锁,存取互不排斥
// take锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列中无元素时,take锁会阻塞,直到被其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
// put锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列中元素已满时,put锁会阻塞,直到被其它线程唤醒
private final Condition notFull = putLock.newCondition();
put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// put锁进行加锁
putLock.lockInterruptibly();
try {
// 如果队列元素已满,阻塞在notFull条件上
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 注意:这里是先获取出队前队列长度,再加一
c = count.getAndIncrement();
// 如果当前队列元素加一还未达队列元素上线,则再唤醒一个线程,因为可能有很多线程阻塞在notFull
// 条件上
if (c + 1 < capacity)
notFull.signal();
} finally {
// put锁解锁
putLock.unlock();
}
// 加了一个元素后,唤醒阻塞在notEmpty条件上的线程来取元素
if (c == 0)
signalNotEmpty();
}
take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// takeLock进行加锁
takeLock.lockInterruptibly();
try {
// 如果链表元素为空,阻塞在notEmpty上
while (count.get() == 0) {
notEmpty.await();
}
// 元素出队
x = dequeue();
// 注意:这里是先获取出队前队列长度,再减一
c = count.getAndDecrement();
// 如果链表中元素> 1,唤醒指定对象的等待队列中的阻塞任务
if (c > 1)
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 如果出队前队列长度已满,现在减了一个元素后,唤醒阻塞在notFull条件上的线程
if (c == capacity)
signalNotFull();
return x;
}
最后
最近我整理了整套《JAVA核心知识点总结》,说实话 ,作为一名Java程序员,不论你需不需要面试都应该好好看下这份资料。拿到手总是不亏的~我的不少粉丝也因此拿到腾讯字节快手等公司的Offer
进【Java进阶之路群】,找管理员获取哦-!