Spark中实现的一种数据结构BoundedPriorityQueue

什么是有界优先队列?

有界优先队列(BoundedPriorityQueue)是 Spark 中实现的一种数据结构,用于高效地在分布式环境下对数据进行部分排序或选择前 N 个元素。

  • 它的核心特性是:队列的大小是固定的,当新元素进入队列时,会依据优先级(通常由比较器决定)决定是否插入,并丢弃优先级较低的元素,从而在内存中保持高效。
  • 实现方式BoundedPriorityQueue 底层基于 java.util.PriorityQueue(堆实现)。

有界优先队列的作用

  1. 高效排序:当只需要前 N 个元素时,避免全量排序,降低计算和内存消耗。
  2. 局部优化:在 Spark 的分区级别操作中,可用于在单个分区内选取前 N 个数据。
  3. 支持懒加载:结合 Spark 的迭代器机制,避免不必要的数据加载。

有界优先队列使用在哪些算子上?

  1. takeOrdered

    • 获取 RDD 中的前 N 个元素,基于自然排序或自定义排序。
    • 实现上会在每个分区内使用 BoundedPriorityQueue 找出前 N 个元素,然后通过驱动端合并结果。
  2. top

    • 类似 takeOrdered,但会按照降序取前 N 个元素。
    • 同样使用 BoundedPriorityQueue 实现分区内排序和全局合并。
  3. aggregateaggregateByKey(间接使用)

    • 可以通过自定义聚合函数,结合有界优先队列实现分区内的部分排序或选择。
  4. combineByKey

    • 适用于键值对 RDD 的聚合操作,用于分区内或分区间高效提取前 N 个元素。

源码分析

1. BoundedPriorityQueue 的实现

核心源码片段如下:

class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) extends Iterable[A] {
  private val underlying = new java.util.PriorityQueue[A](maxSize, ord)

  def +=(elem: A): this.type = {
    if (underlying.size < maxSize) {
      underlying.add(elem)
    } else if (ord.compare(elem, underlying.peek()) > 0) {
      underlying.poll()
      underlying.add(elem)
    }
    this
  }

  // 返回队列元素
  override def iterator: Iterator[A] = underlying.iterator.asScala
  override def size: Int = underlying.size()
}
  • 构造函数:初始化一个固定大小的优先队列。
  • += 方法:根据传入元素的优先级判断是否插入队列。优先级低的元素在队列满时会被丢弃。
  • 迭代器方法:支持遍历队列元素。

2. takeOrdered 源码

takeOrdered 在每个分区内和全局分别使用 BoundedPriorityQueue

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
  val ordReverse = ord.reverse
  val bcOrd = sparkContext.broadcast(ordReverse)

  this.mapPartitions { items =>
    val queue = new BoundedPriorityQueue[T](num)(bcOrd.value)
    items.foreach(queue += _)
    Iterator.single(queue)
  }.reduce { (queue1, queue2) =>
    queue1 ++= queue2
    queue1
  }.toArray.sorted(ord)
}
  1. 分区内排序:为每个分区创建一个 BoundedPriorityQueue,存储前 N 个元素。
  2. 分区间合并:通过 reduce 将各分区的队列合并。
  3. 最终排序:合并后的队列最终排序后返回。

举例说明

示例代码:takeOrderedtop
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("BoundedPriorityQueueExample").setMaster("local[*]")
val sc = new SparkContext(conf)

val rdd = sc.parallelize(Seq(5, 1, 3, 9, 2, 6, 4, 8, 7), numSlices = 3)

// 使用 takeOrdered 获取前 5 个最小值
val smallest5 = rdd.takeOrdered(5)
println(s"Smallest 5: ${smallest5.mkString(", ")}")

// 使用 top 获取前 5 个最大值
val largest5 = rdd.top(5)
println(s"Largest 5: ${largest5.mkString(", ")}")

sc.stop()
输出:
Smallest 5: 1, 2, 3, 4, 5
Largest 5: 9, 8, 7, 6, 5

注意事项

  1. 内存占用BoundedPriorityQueue 在分区内保留部分数据,适合小规模排序;大规模排序可能会导致内存溢出。
  2. 排序方向takeOrdered 默认升序;top 默认降序。
  3. 性能影响:全局排序需要额外的 shuffle,应避免在大规模数据上频繁使用。

总结

  • 有界优先队列的作用:解决分布式环境下高效排序问题。
  • 核心实现:用于 takeOrderedtop 等算子中,通过分区内优先队列与全局合并实现。
  • 优势:减少内存占用,适合提取部分数据。
  • 使用场景:分布式排序和前 N 元素选取场景。
上一篇:单片机UART协议相关知识


下一篇:leetcode 找不同-输入:s = "", t = "y" 输出:"y" 提示: