什么是有界优先队列?
有界优先队列(BoundedPriorityQueue
)是 Spark 中实现的一种数据结构,用于高效地在分布式环境下对数据进行部分排序或选择前 N 个元素。
- 它的核心特性是:队列的大小是固定的,当新元素进入队列时,会依据优先级(通常由比较器决定)决定是否插入,并丢弃优先级较低的元素,从而在内存中保持高效。
-
实现方式:
BoundedPriorityQueue
底层基于java.util.PriorityQueue
(堆实现)。
有界优先队列的作用
- 高效排序:当只需要前 N 个元素时,避免全量排序,降低计算和内存消耗。
- 局部优化:在 Spark 的分区级别操作中,可用于在单个分区内选取前 N 个数据。
- 支持懒加载:结合 Spark 的迭代器机制,避免不必要的数据加载。
有界优先队列使用在哪些算子上?
-
takeOrdered
:- 获取 RDD 中的前 N 个元素,基于自然排序或自定义排序。
- 实现上会在每个分区内使用
BoundedPriorityQueue
找出前 N 个元素,然后通过驱动端合并结果。
-
top
:- 类似
takeOrdered
,但会按照降序取前 N 个元素。 - 同样使用
BoundedPriorityQueue
实现分区内排序和全局合并。
- 类似
-
aggregate
和aggregateByKey
(间接使用):- 可以通过自定义聚合函数,结合有界优先队列实现分区内的部分排序或选择。
-
combineByKey
:- 适用于键值对 RDD 的聚合操作,用于分区内或分区间高效提取前 N 个元素。
源码分析
1. BoundedPriorityQueue
的实现
核心源码片段如下:
class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) extends Iterable[A] {
private val underlying = new java.util.PriorityQueue[A](maxSize, ord)
def +=(elem: A): this.type = {
if (underlying.size < maxSize) {
underlying.add(elem)
} else if (ord.compare(elem, underlying.peek()) > 0) {
underlying.poll()
underlying.add(elem)
}
this
}
// 返回队列元素
override def iterator: Iterator[A] = underlying.iterator.asScala
override def size: Int = underlying.size()
}
- 构造函数:初始化一个固定大小的优先队列。
-
+=
方法:根据传入元素的优先级判断是否插入队列。优先级低的元素在队列满时会被丢弃。 - 迭代器方法:支持遍历队列元素。
2. takeOrdered
源码
takeOrdered
在每个分区内和全局分别使用 BoundedPriorityQueue
:
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
val ordReverse = ord.reverse
val bcOrd = sparkContext.broadcast(ordReverse)
this.mapPartitions { items =>
val queue = new BoundedPriorityQueue[T](num)(bcOrd.value)
items.foreach(queue += _)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
-
分区内排序:为每个分区创建一个
BoundedPriorityQueue
,存储前 N 个元素。 -
分区间合并:通过
reduce
将各分区的队列合并。 - 最终排序:合并后的队列最终排序后返回。
举例说明
示例代码:takeOrdered
与 top
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("BoundedPriorityQueueExample").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(5, 1, 3, 9, 2, 6, 4, 8, 7), numSlices = 3)
// 使用 takeOrdered 获取前 5 个最小值
val smallest5 = rdd.takeOrdered(5)
println(s"Smallest 5: ${smallest5.mkString(", ")}")
// 使用 top 获取前 5 个最大值
val largest5 = rdd.top(5)
println(s"Largest 5: ${largest5.mkString(", ")}")
sc.stop()
输出:
Smallest 5: 1, 2, 3, 4, 5
Largest 5: 9, 8, 7, 6, 5
注意事项
-
内存占用:
BoundedPriorityQueue
在分区内保留部分数据,适合小规模排序;大规模排序可能会导致内存溢出。 -
排序方向:
takeOrdered
默认升序;top
默认降序。 -
性能影响:全局排序需要额外的
shuffle
,应避免在大规模数据上频繁使用。
总结
- 有界优先队列的作用:解决分布式环境下高效排序问题。
-
核心实现:用于
takeOrdered
、top
等算子中,通过分区内优先队列与全局合并实现。 - 优势:减少内存占用,适合提取部分数据。
- 使用场景:分布式排序和前 N 元素选取场景。