PriorityBlockingQueue 分析
PriorityBlockingQueue是PriorityQueue的线程安全版本,基本的功能和PriorityQueue是一样的,强烈建议看看 PriorityQueue分析 之后在看这里的源码是怎么写的。这样便于理解。
看看它是不是PriorityQueue的外套(在里面包含一个PriorityQueue,对PriorityBlockingQueue的操作都是基于PriorityQueue的操作,只不过在操作的时候添加锁了。)下面就来验证验证。
1. 属性分析
这里的属性和PriorityQueue
的差不多,多了一个锁,并且多了一个和锁对应的Condition,还有一个allocationSpinLock
和PriorityQueue
,有的属性看名字就很明了,有的属性并不是很清楚。在下面会有对他们的解释。
private static final long serialVersionUID = 5595510919245408276L;
/**
* 默认大小
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* The maximum size of array to allocate.
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 小头堆实现的数组,这个数组和PriorityQueue的功能操作基本都是一样的
*/
private transient Object[] queue;
/**
* 大小
*/
private transient int size;
/**
* 指定的comparator
*/
private transient Comparator<? super E> comparator;
/**
* 所有public 操作的共用锁
*/
private final ReentrantLock lock;
/**
* 当空的时候阻塞的Condition
*/
private final Condition notEmpty;
/**
* 用于分配的自旋锁,需要配合cas操作
*/
private transient volatile int allocationSpinLock;
/**
* 一个普普通通的PriorityQueue,在序列化的时候用。
*/
private PriorityQueue<E> q;
2. 构造方法分析
构造方法可以看出,可以传入初始容量和一个比较器。并且在构造方法里面会创建lock和Condition,初始化数组。
问题?
这里的锁只有一把。为啥就一把,不能像LinkBlockingQueue一样,有两个。
这里不能用两个,因为put操作,会引起
小头堆
里面元素的变动,这个变动可能设计到头结点,所以,不能分开,在LinkBlockingQueue没有变动,只是单纯的一个取,一个放。
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
3. 几个重要的方法
put
注意,这里的put不会阻塞,因为底层的数组最大是Integer.MAX,当元素不够的话,就直接扩容,还是不够就直接报错(OutOfMemoryError)。
这里的扩容也是在put之前操作的,不是在之后操作。
可以看到,这里大体的代码逻辑和PriorityQueue
差不多,只是多了一个获取锁和释放锁,唤醒等待的过程。
忘了在 PriorityQueue分析中说了,这是一个*的堆,堆中最多存放元素的个数为(Integer.MAX)。这里也是一样的,在扩容章节会有分析,如果不清楚PriorityQueue
,建议去看看,回头再来看这个就很清晰明了了。
offer操作里面,在非空检查之后,就开始获取锁了。这个需要注意。
在利用Comparator
比较的时候,组装堆的操作基本都是一致的,就是比较的部分是不相等的,其余都差不多,这里用siftUpComparable
分析
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
// 为空检查
if (e == null)
throw new NullPointerException();
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
int n; // 元素数量,
int cap; // 数组长度
Object[] array; // 数组引用。
// 当元素的数量大于等于 数组容量的时候,说明需要扩容了。注意,这里没有负载因子。
while ((n = size) >= (cap = (array = queue).length))
// 放在扩容章节说
tryGrow(array, cap);
try {
// 如果与比较器就用,没有就用默认的顺序
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 唤醒等待的poll线程。
notEmpty.signal();
} finally {
// 释放锁。
lock.unlock();
}
return true;
}
siftUpComparable
不出所料,这里和PriorityQueue
里面的操作一模一样。这里就不用看了
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
poll
带等待时间的方法,会导致等待,本质就是调用Condition的awaitNanos方法
还是一样的操作,上来先上锁,在做操作。最后释放锁
public E poll() {
// 先上锁,注意这里的锁只有一把。
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
dequeue
可以看到这里的操作和PriorityQueue
基本差不多,没有啥子可说的。下面要说一个take方法
private E dequeue() {
// 如果size-1小于0,说明堆里面没有元素了,直接返回一个null,
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
// 这个就和PriorityQueue的操作都是一样。没啥可说的。建议去看看PriorityQueue
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
take方法
take方法是BlockingQueue
里面的接口,这个接口会同步等待, 当出队的时候没有元素的时候 take线程就等待。这个方法没有什么可说的。别的大体都是一样的
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
//等待
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
4. 扩容操作
private void tryGrow(Object[] array, int oldCap) {
// 扩容的操作是在put上锁之后的,一上来就释放锁,这是怎么想的。
// 现在释放了锁,就说明这个时刻可能会有线程来操作。那就没有同步机制?
lock.unlock();
Object[] newArray = null;
// 这里居然是利用cas来做同步操作的。
// 只有一个线程能成功,并且去扩容。
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// allocationSpinLock 变为0,为了下一次的扩容操作,但要是这个瞬间有线程获取到锁了,还会new出一个array出来,重新计算一下。
allocationSpinLock = 0;
}
}
// 如果说newArray为null,就说明在上面操作完之后,有线程正在计算,正在操作。,当前线程就等等。
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
// 重新获取锁
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
//拷贝就好了
}
这里扩容还是比较有意思的。
上来先释放锁,释放锁之后,通过cas操作将allocationSpinLockOffset变为1,作为计算新数组的同步机制。在计算完之后,将allocationSpinLockOffset重新变为0,如果newArray为null说明这个时刻有线程在计算newArray(因为上一个线程已经将allocationSpinLockOffset变为0,所以,就会有线程获取到锁。)。重新获取锁,做数组拷贝,这个线程就一直入队,一直到结束的时候释放锁。
问题?
一上来释放锁之后,那么之前堵塞的在lock的线程中一个线程就会运行,判断容量太小,都会来扩容。都会走到扩容的方法里面。那么扩容的里面就没有同步机制吗?
allocationSpinLockOffset 就是他的同步机制,使用cas将他变为1,如果一个线程成功了之后,别的线程就不会进来了,同一时刻只有一个线程在操作。
allocationSpinLockOffset 设置为1失败的线程会怎么做
代码下面会有判断,如果newArray为null就会让出cpu的使用权。
如果计算完之后,a线程扩容完成之后,b线程获取到了锁,b线程会再次扩容吗?
不会,因为后面有判断queue是否等于传递进来的array,当a线程操作完之后,就会一直操作到put操作结束,这时候b线程来操作,发现queue已经不是之前传递进来的array了。所以,就不会进去了。
画个图说明一下,这里挺有意思的。
需要注意,释放锁之后只有一个线程能通过,那对于扩容来说,只有可能有两个线程来操作,想通这个,下面的就很好理解了。
关于PriorityBlockingQueue的分析就分析到这里了。 如有不正确的地方,欢迎指出。谢谢。