java并发编程笔记--PriorityBlockingQueue实现

    PriorityBlockingQueue可以理解为线程安全的PriorityQueue,其实现原理与PriorityQueue类似,在此基础上实现了BlockingQueue接口,能够作为阻塞队列使用,由于PriorityBlockingQueue是*队列,因而使用put方法并不会阻塞,offer方法不会返回false。PriorityBlockingQueue也是基于最小二叉堆实现,对于堆数组中索引为k的节点,其父节点为(k-1)/2,其左右子节点分别为2k+1,2k+2。PriorityBlockingQueue使用ReentrantLock来控制所有公用操作的线程同步,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。

类图

java并发编程笔记--PriorityBlockingQueue实现

PriorityBlockingQueue的继承层次如上图:

  • Collection:所有集合的基类,定义集合的基本操作API;
  • AbstractCollection:实现公用的集合操作;
  • Iterable:赋予集合迭代器的能力;
  • Queue:定义队列的基本操作API,比如:offer、poll等;
  • AbstractQueue:实现公用的队列操作;
  • BlockingQueue:定义阻塞队列的基本操作API,比如:put、take等;
  • PriorityBlockingQueue:阻塞队列的实现类,继承AbstractQueue类,并实现BlockingQueue接口;

实现

主要成员变量

/**
 * 存放最小二叉堆的数组
 */
private transient Object[] queue;

/**
 * 优先队列中包含元素个数
 */
private transient int size;

/**
 * 比较器,用于定制元素比较规则;
 */
private transient Comparator<? super E> comparator;

/**
 * 用于同步队列操作的锁
 */
private final ReentrantLock lock;

/**
 * 当队列为空时,用于阻塞出队操作
 */
private final Condition notEmpty;

/**
 * 自旋锁标识字段,通过CAS操作进行比较更新;
 * 用于动态扩容操作;值为1时,表示加锁;为0时,标识未加锁;
 */
private transient volatile int allocationSpinLock;

可以看到PriorityBlockingQueue的成员变量和PriorityQueue的成员变量相差不大,多了3个用于控制线程安全的属性:lock,notEmpty,allocationSpinLock,分别用于队列公有操作、出队阻塞、动态扩容。

初始化

PriorityBlockingQueue的初始化同PriorityQueue的类似,多了对lock、notEmpty的初始化。

public PriorityBlockingQueue() {
    // 默认容量为11
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true if not known to be in heap order
    boolean screen = true;  // true if must screen for nulls
    // 如果传入集合是有序集,则无须进行堆有序化
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    // 如果传入集合是PriorityBlockingQueue类型,则不进行堆有序化
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    // 执行堆有序化
    if (heapify)
        heapify();
}

上浮、下沉操作

PriorityBlockingQueue也是基于下沉、上浮操作来实现元素的入队和出队操作的,实现代码与PriorityQueue的实现完全相同,只是为了保证线程安全,上层方法调用时需要放在加锁的环境下执行。

  • siftUpComparable:上浮操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
  • siftUpUsingComparator:上浮操作,通过Comparator进行元素比较;
  • siftDownComparable:下沉操作,通过默认的Comparable接口进行元素比较;队列中的元素必须实现Comparable接口;
  • siftDownUsingComparator:下沉操作,通过Comparator进行元素比较;

入队操作

PriorityBlockingQueue的入队操作包括4个方法:

  • add:队列满时抛出异常;由于为*队列,因而不会抛出异常;代码实现直接调用offer方法;
  • offer:队列满时返回false;由于为*队列,因而不会返回false;
  • offer带超时参数:队列满时阻塞等待直至超时或者数组有空出位置;由于为*队列,因而不会返回false、超时、阻塞;代码实现直接调用offer方法;
  • put:队列满时阻塞;由于为*队列,因而不会阻塞;代码实现直接调用offer方法;
/**
 * 入队操作实现
 * 由于是*队列,永远不会返回false;
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    
    // 通过CAS操作动态扩容
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    
    // 将新增元素上浮到合适位置    
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        
        // 唤醒阻塞线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

/**
 * 由于是*队列,永远不会被阻塞
 */
public void put(E e) {
    offer(e); // never need to block
}

/**
 * 由于是*队列,因而不存在阻塞、超时和返回false的情况;
 */
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}

public boolean add(E e) {
    return offer(e);
}

出队操作

PriorityBlockingQueue的出队操作包括4个方法,都是通过调用dequeue()方法实现:

  • dequeue:出队操作的具体实现,为保证线程安全,上层调用方法需要加锁;
  • take:队列为空时,阻塞直至有元素添加到队列中;
  • poll:队列为空时,直接返回null,不会阻塞;
  • poll带超时参数:队列为空时,阻塞直至超时或者有元素添加到队列中;
  • drainTo:批量从队首弹出元素到指定集合中,不会阻塞;
/**
 * 出队操作的具体实现,仅在有锁环境下调用;
 */
private E dequeue() {
    int n = size - 1;
    
    // 队列为空返回null;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        
        // 将队尾元素提至队首位置
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        
        // 下沉队首元素到合适位置
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

/**
 * 队列为空时,阻塞;
 */
public E take() throws InterruptedException {
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 如果队列为空,则阻塞;
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * 队列为空时,返回null;
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

/**
 * 队列为空时,阻塞直至超时或者获取到元素;
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 如果队列为空,则阻塞直至超时或者获取到元素;
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

/**
 * 批量获取元素
 */
public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = Math.min(size, maxElements);
        
        // 循环遍历,不断弹出队首元素;
        for (int i = 0; i < n; i++) {
            c.add((E) queue[0]); // In this order, in case add() throws.
            dequeue();
        }
        return n;
    } finally {
        lock.unlock();
    }
}

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

查找&删除元素

    PriorityBlockingQueue中查找元素的效率是偏低的,由于二叉堆并没有限制左右子节点的大小规则,因而需要变量整个数组进行查找,因而效率为$O(n)$。一些优先队列的实现会对此进行优化,给每个元素添加一个索引字段用于标记元素在堆数组中的位置,比如:ScheduledThreadPoolExecutor.DelayedWorkQueue通过ScheduledFutureTask中的heapIndex来标记任务在堆数组中的位置。

/**
 * 查找元素,效率O(n)
 */
private int indexOf(Object o) {
    if (o != null) {
        Object[] array = queue;
        int n = size;
        // 遍历数组查找,效率较低
        for (int i = 0; i < n; i++)
            if (o.equals(array[i]))
                return i;
    }
    return -1;
}

/**
 * 删除指定位置的元素
 */
private void removeAt(int i) {
    Object[] array = queue;
    int n = size - 1;
    
    // 如果索引位置在队尾,则删除队尾元素
    if (n == i) // removed last element
        array[i] = null;
    else {
        // 使用队尾元素替换删除元素的位置
        E moved = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        
        // 下沉元素到合适位置
        if (cmp == null)
            siftDownComparable(i, moved, array, n);
        else
            siftDownUsingComparator(i, moved, array, n, cmp);
        
        // 如果元素未下沉,则上浮到合适位置
        if (array[i] == moved) {
            if (cmp == null)
                siftUpComparable(i, moved, array);
            else
                siftUpUsingComparator(i, moved, array, cmp);
        }
    }
    size = n;
}

/**
 * 删除队列中的特定元素
 */
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 查找元素
        int i = indexOf(o);
        if (i == -1)
            return false;
        
        // 删除元素
        removeAt(i);
        return true;
    } finally {
        lock.unlock();
    }
}

动态扩容

    在插入元素时,如果堆数组长度不足,则需要新建一个更长的数组,拷贝现有元素到新数组中,从而实现扩容的目的。为保证动态扩容不阻塞队列元素的出队,PriorityBlockingQueue通过CAS操作实现的自旋锁来控制扩容操作:使用allocationSpinLock来标记是否加锁,值为1时,表示加锁,值为0时,表示未加锁;


// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;

// 存放allocationSpinLock属性的偏移量,方便后面CAS更新
private static final long allocationSpinLockOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = PriorityBlockingQueue.class;
        allocationSpinLockOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("allocationSpinLock"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

private void tryGrow(Object[] array, int oldCap) {
    // 扩容时,释放锁,防止阻塞出队操作
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    
    // 加锁,变更allocationSpinLock的值为1;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
                                
        try {
            // 容量小于64,则每次容量+2,否则增加一倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
                                  
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
           
            // 如果没有其它线程扩容,则创建新数组;
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 解锁,因为只有一个线程到此,因而不需要CAS操作;
            allocationSpinLock = 0;
        }
    }
    
    // 如果此时有其它线程已经加锁,则让出执行权,等待其它线程执行完成
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
        
    // 加锁,替换现有数组的引用,拷贝数组元素;
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

总结

    PriorityBlockingQueue可以理解为public操作都加锁的PriorityQueue,通过排他锁保证了操作的线程安全。PriorityBlockingQueue扩容时,因为增加堆数组的长度并不影响队列中元素的出队操作,因而使用自旋CAS操作实现的锁来控制扩容操作,仅在数组引用替换和拷贝元素时才加锁,从而减少了扩容对出队操作的影响。自旋锁的实现思路以及应用场景值得我们学习借鉴。

上一篇:数据结构--堆排序


下一篇:Elastic Stack学习--logstash入门