【并发编程】支持按优先级排序的*阻塞队列PriorityBlockingQueue

什么是PriorityBlockingQueue

  • PriorityBlockingQueue是一个*的基于数组的优先级阻塞队列。
  • 数组的默认长度是11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止。
  • 每次出队都返回优先级别最高的或者最低的元素。
  • 默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。
  • PriorityBlockingQueue不能保证同优先级元素的顺序。

PriorityBlockingQueue的使用场景

  • 抢购活动,会员级别高的用户优先抢购到商品
  • 银行办理业务,vip客户插队

PriorityBlockingQueue的特点

  • 优先级高的先出队,优先级低的后出队。
  • 使用的数据结构是数组+二叉堆:默认容量11,可指定初始容量,会自动扩容,最大容量是(Integer.MAX_VALUE - 8)
  • 使用的锁是是ReentrantLock,存取是同一把锁

PriorityBlockingQueue的入队出队逻辑

  • 入队:*队列不阻塞。根据比较器进行堆化(排序)自下而上。传入比较器对象就按照比较器的顺序排序,如果比较器为 null,则按照自然顺序排序。
  • 出队:阻塞对象NotEmpty,队列为空时阻塞。优先级最高的元素在堆顶(弹出堆顶元素)。弹出前比较两个子节点再进行堆化(自上而下)。

二叉堆与数组

  • 完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
  • 二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。
  • 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值。
  • 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
  • 二叉堆与数组的转换规则
// 当前节点为i
int i;
// 当前节点的父节点计算
int parent = (i -1 )/ 2 ;
// 左面的子节点计算
int leftChild = 2 * i + 1;
// 右面的子节点计算
int rightChild = 2 * i + 2;

PriorityBlockingQueue的使用方式

// 创建优先级阻塞队列 Comparator为null,自然排序
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(5);
// 自定义Comparator
PriorityBlockingQueue<Integer> queue1 = new PriorityBlockingQueue<Integer>(5, new Comparator<Integer>() {
	@Override
	public int compare(Integer o1, Integer o2) {
		return o2 - o1;
	}
});

PriorityBlockingQueue的构造方法源码分析

/**
 * 无参构造,直接调用俩个参数的构造
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

/**
 * 一个参数的构造,直接调用俩个参数的构造
 * initialCapacity:数组的大小
 */
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

/**
 * 俩个参数的构造
 * initialCapacity:数组的大小
 * comparator:排序规则
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    // 数组长度小于0,抛异常
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    // 初始化锁
    this.lock = new ReentrantLock();
    // 初始化无数据等待的条件队列
    this.notEmpty = lock.newCondition();
    // 设置排序规则
    this.comparator = comparator;
    // 初始化数组
    this.queue = new Object[initialCapacity];
}

/**
 * 需要初始化数组的构造方法
 */
public PriorityBlockingQueue(Collection<? extends E> c) {
    // 初始化锁
    this.lock = new ReentrantLock();
    // 初始化无数据等待的条件队列
    this.notEmpty = lock.newCondition();
    // 一个标记:如果不知道堆的顺序,则为true
    boolean heapify = true; // true if not known to be in heap order
    // 一个标记:如果必须筛选空值,则为true
    boolean screen = true;  // true if must screen for nulls
    // 判断集合是否实现了SortedSet(一个可自动为元素排序的集合)
    if (c instanceof SortedSet<?>) {
        // 数据强转
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        // 设置排序规则
        this.comparator = (Comparator<? super E>) ss.comparator();
        // 标记已经知道如何排序了
        heapify = false;
    }
    // 传入的集合是当前这种类型
    else if (c instanceof PriorityBlockingQueue<?>) {
        // 强转!
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        // 设置排序规则
        this.comparator = (Comparator<? super E>) pq.comparator();
        // 不筛选空的值:由于PriorityBlockingQueue本身不容许空的值,所以传入的集合中肯定没有空值
        screen = false;
        // 如果类型相同:说明元素已经是有序的
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    // 元素转数组
    Object[] a = c.toArray();
    // 得到数组的长度
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    // 如果集合c转化为数组不是Object数组,就将其转化为Object数组
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    // 数组中可能有空值,进行筛选。
    // 数组只有一个元素的时候(不存在顺序),或者排序器存在的时候才会进行筛选
    if (screen && (n == 1 || this.comparator != null)) {
        // 原理数组中的每个元素
        for (int i = 0; i < n; ++i)
            // 元素为null,抛异常!
            if (a[i] == null)
                throw new NullPointerException();
    }
    // 将数组赋值到PriorityBlockingQueue的实际数组中
    this.queue = a;
    // 赋值数组长度
    this.size = n;
    // 堆的顺序可能不对的时候,进行数组建堆
    if (heapify)
        // 数组建堆:基于堆的排序算法
        heapify();
}

PriorityBlockingQueue的入队方法源码分析

/**
 * PriorityBlockingQueue的入队方法
 */
public void put(E e) {
    // 不需要阻塞,直接调用offer方法
    offer(e); // never need to block
}

/**
 * 有返回值的入队方法
 */
public boolean offer(E e) {
    // 入队元素为null,抛异常
    if (e == null)
        throw new NullPointerException();
    // 得到当前队列的锁
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    // 定义俩个变量:n是size的长度 cap是数组的长度
    int n, cap;
    // 定义临时一个数组
    Object[] array;
    // size的长度大于数组的长度
    while ((n = size) >= (cap = (array = queue).length))
        // 尝试扩容
        tryGrow(array, cap);
    try {
        // 得到排序器
        Comparator<? super E> cmp = comparator;
        // 将当前节点放在应该放的二叉堆上的位置。这里进行了排序!
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // 长度加一
        size = n + 1;
        // 消费者线程条件队列转同步队列
        notEmpty.signal();
    } finally {
        // 唤醒真正的消费者线程
        lock.unlock();
    }
    // 返回入队成功
    return true;
}

/**
 * 扩容方法
 */
private void tryGrow(Object[] array, int oldCap) {
    // 释放锁:防止阻塞出队的操作
    lock.unlock(); // must release and then re-acquire main lock
    // 定义新的数组
    Object[] newArray = null;
    // 通过CAS的方式去获取锁,执行下面的代码
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 新的长度:长度小于64每次+2,大于64每次变为1.5倍
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 扩容后超过最大容量
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                // 旧的长度加一
                int minCap = oldCap + 1;
                // 旧的长度加一就超过了最大的长度,抛异常!
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                // 直接设置为最大的长度
                newCap = MAX_ARRAY_SIZE;
            }
            // 新的长度大于旧的长度并且没有其他线程改原有的长度
            if (newCap > oldCap && queue == array)
                // 新数组赋值
                newArray = new Object[newCap];
        } finally {
            // 扩容完成,交给其他线程处理
            allocationSpinLock = 0;
        }
    }
    // 另一个线程正在分配,释放时间片
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 加锁
    lock.lock();
    // 新数组不为空(获取过扩容机制)并且数组没有改变
    if (newArray != null && queue == array) {
        // 新数组直接执行
        queue = newArray;
        // 复制数据
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

PriorityBlockingQueue的出队方法源码分析

/**
 * PriorityBlockingQueue的出队方法
 */
public E take() throws InterruptedException {
    // 获取当前队列的锁对象
    final ReentrantLock lock = this.lock;
    // 获取锁操作:优先响应中断
    lock.lockInterruptibly();
    // 定义结果变量
    E result;
    try {
        // 出队的数据为null,进行等待
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        // 释放锁,唤醒下一个线程
        lock.unlock();
    }
    // 返回出队的元素
    return result;
}

/**
 * 出队
 */
private E dequeue() {
    // 长度减一
    int n = size - 1;
    // 长度小于0,直接返回null,说明没有元素了
    if (n < 0)
        return null;
    else {
        // 定义新的数组
        Object[] array = queue;
        // 得到第一个元素(二叉堆的最*节点)
        E result = (E) array[0];
        // 得到最后一个节点
        E x = (E) array[n];
        // 最后一个节点变为null
        array[n] = null;
        // 得到排序器
        Comparator<? super E> cmp = comparator;
        // 将最后一个节点放在应该放的二叉堆上的位置。线放到头结点,然后进行排序
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        // 长度变化
        size = n;
        // 返回出队的具体元素
        return result;
    }
}

结束语

  • 获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!
  • 关注公众号,可以让你对MySQL有非常深入的了解
  • 关注公众号,每天持续高效的了解并发编程!
  • 关注公众号,后续持续高效的了解spring源码!
  • 这个公众号,无广告!!!每日更新!!!
    【并发编程】支持按优先级排序的*阻塞队列PriorityBlockingQueue
上一篇:初见 | 数据结构 | ODT


下一篇:【模拟】AcWing 155. 内存分配