PriorityBlockingQueue 分析

PriorityBlockingQueue 分析

PriorityBlockingQueue是PriorityQueue的线程安全版本,基本的功能和PriorityQueue是一样的,强烈建议看看 PriorityQueue分析 之后在看这里的源码是怎么写的。这样便于理解。

看看它是不是PriorityQueue的外套(在里面包含一个PriorityQueue,对PriorityBlockingQueue的操作都是基于PriorityQueue的操作,只不过在操作的时候添加锁了。)下面就来验证验证。

1. 属性分析

这里的属性和PriorityQueue的差不多,多了一个锁,并且多了一个和锁对应的Condition,还有一个allocationSpinLockPriorityQueue,有的属性看名字就很明了,有的属性并不是很清楚。在下面会有对他们的解释。

private static final long serialVersionUID = 5595510919245408276L;

/**
 * 默认大小
 */
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
 * The maximum size of array to allocate.
 */
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
 * 小头堆实现的数组,这个数组和PriorityQueue的功能操作基本都是一样的
 */
private transient Object[] queue;

/**
 * 大小
 */
private transient int size;

/**
 * 指定的comparator
 */
private transient Comparator<? super E> comparator;

/**
 * 所有public 操作的共用锁
 */
private final ReentrantLock lock;

/**
 * 当空的时候阻塞的Condition
 */
private final Condition notEmpty;

/**
 * 用于分配的自旋锁,需要配合cas操作
 */
private transient volatile int allocationSpinLock;

/**
 * 一个普普通通的PriorityQueue,在序列化的时候用。
 */
private PriorityQueue<E> q;

2. 构造方法分析

构造方法可以看出,可以传入初始容量和一个比较器。并且在构造方法里面会创建lock和Condition,初始化数组。

问题?

  1. 这里的锁只有一把。为啥就一把,不能像LinkBlockingQueue一样,有两个。

    这里不能用两个,因为put操作,会引起小头堆里面元素的变动,这个变动可能设计到头结点,所以,不能分开,在LinkBlockingQueue没有变动,只是单纯的一个取,一个放。

  public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

3. 几个重要的方法

put

注意,这里的put不会阻塞,因为底层的数组最大是Integer.MAX,当元素不够的话,就直接扩容,还是不够就直接报错(OutOfMemoryError)。

这里的扩容也是在put之前操作的,不是在之后操作。

可以看到,这里大体的代码逻辑和PriorityQueue差不多,只是多了一个获取锁和释放锁,唤醒等待的过程。

忘了在 PriorityQueue分析中说了,这是一个*的堆,堆中最多存放元素的个数为(Integer.MAX)。这里也是一样的,在扩容章节会有分析,如果不清楚PriorityQueue,建议去看看,回头再来看这个就很清晰明了了。

offer操作里面,在非空检查之后,就开始获取锁了。这个需要注意。

在利用Comparator比较的时候,组装堆的操作基本都是一致的,就是比较的部分是不相等的,其余都差不多,这里用siftUpComparable分析

public void put(E e) {
        offer(e); // never need to block
    }  

public boolean offer(E e) {
     // 为空检查
        if (e == null)
            throw new NullPointerException();
       // 获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
 
        int n; // 元素数量,
      int cap; // 数组长度
        Object[] array; // 数组引用。
         // 当元素的数量大于等于 数组容量的时候,说明需要扩容了。注意,这里没有负载因子。
        while ((n = size) >= (cap = (array = queue).length))
           // 放在扩容章节说
            tryGrow(array, cap);
        try {
           // 如果与比较器就用,没有就用默认的顺序
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
             // 唤醒等待的poll线程。
            notEmpty.signal();
        } finally {
           // 释放锁。
            lock.unlock();
        }
        return true;
    }

siftUpComparable

不出所料,这里和PriorityQueue里面的操作一模一样。这里就不用看了

 private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

poll

带等待时间的方法,会导致等待,本质就是调用Condition的awaitNanos方法

还是一样的操作,上来先上锁,在做操作。最后释放锁

  public E poll() {
      // 先上锁,注意这里的锁只有一把。
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

dequeue

可以看到这里的操作和PriorityQueue基本差不多,没有啥子可说的。下面要说一个take方法

  private E dequeue() {
        // 如果size-1小于0,说明堆里面没有元素了,直接返回一个null,
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
               // 这个就和PriorityQueue的操作都是一样。没啥可说的。建议去看看PriorityQueue
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

take方法

take方法是BlockingQueue里面的接口,这个接口会同步等待, 当出队的时候没有元素的时候 take线程就等待。这个方法没有什么可说的。别的大体都是一样的

  public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
               //等待
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

4. 扩容操作

 private void tryGrow(Object[] array, int oldCap) {
       // 扩容的操作是在put上锁之后的,一上来就释放锁,这是怎么想的。
       // 现在释放了锁,就说明这个时刻可能会有线程来操作。那就没有同步机制?
        lock.unlock(); 
        Object[] newArray = null;
        // 这里居然是利用cas来做同步操作的。
       // 只有一个线程能成功,并且去扩容。
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
               // allocationSpinLock 变为0,为了下一次的扩容操作,但要是这个瞬间有线程获取到锁了,还会new出一个array出来,重新计算一下。
                allocationSpinLock = 0;
            }
        }
     // 如果说newArray为null,就说明在上面操作完之后,有线程正在计算,正在操作。,当前线程就等等。
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
       // 重新获取锁
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
      //拷贝就好了
    }

这里扩容还是比较有意思的。

上来先释放锁,释放锁之后,通过cas操作将allocationSpinLockOffset变为1,作为计算新数组的同步机制。在计算完之后,将allocationSpinLockOffset重新变为0,如果newArray为null说明这个时刻有线程在计算newArray(因为上一个线程已经将allocationSpinLockOffset变为0,所以,就会有线程获取到锁。)。重新获取锁,做数组拷贝,这个线程就一直入队,一直到结束的时候释放锁。

问题?

  1. 一上来释放锁之后,那么之前堵塞的在lock的线程中一个线程就会运行,判断容量太小,都会来扩容。都会走到扩容的方法里面。那么扩容的里面就没有同步机制吗?

    allocationSpinLockOffset 就是他的同步机制,使用cas将他变为1,如果一个线程成功了之后,别的线程就不会进来了,同一时刻只有一个线程在操作。

  2. allocationSpinLockOffset 设置为1失败的线程会怎么做

    代码下面会有判断,如果newArray为null就会让出cpu的使用权。

  3. 如果计算完之后,a线程扩容完成之后,b线程获取到了锁,b线程会再次扩容吗?

    不会,因为后面有判断queue是否等于传递进来的array,当a线程操作完之后,就会一直操作到put操作结束,这时候b线程来操作,发现queue已经不是之前传递进来的array了。所以,就不会进去了。

画个图说明一下,这里挺有意思的。

需要注意,释放锁之后只有一个线程能通过,那对于扩容来说,只有可能有两个线程来操作,想通这个,下面的就很好理解了。
PriorityBlockingQueue 分析

关于PriorityBlockingQueue的分析就分析到这里了。 如有不正确的地方,欢迎指出。谢谢。

上一篇:LeetCode_剑指Offer 41. 数据流的中位数(优先队列 / 大顶堆、小顶堆)


下一篇:Java PriorityQueue优先队列详解(源码+图文步骤解析)