RangePartitioner 实现简记

摘要:

  1.背景

  2.rangeBounds 上边界数组源码走读

  3.RangePartitioner的sketch 源码走读

  4.determineBounds 源码走读

  5.关于RangePartitioner和sortByKey实验

内容:  

1.背景:这是一个填之前Spark RDD 核心总结这篇博文中RangePartitioner留下的坑,没想到又发现一个坑(XORShiftRandom:生成随机数的一个算法,有时间再来总结)

RangePartitioner 是Spark Partitioner 中的一种分区方式,在排序算子(sortByKey)中使用;相比HashPartitioner,RangePartitioner分区会尽量保证每个分区中数据量的均匀

2.rangeBounds 上边界数组源码走读

rangeBounds是一个Array,保存着每个分区的上界(upper bounds)值;

一般是过采样抽样大小的3倍来保证采样样本是基本平衡的;

然后调用sketch(rdd.map(_._1), sampleSizePerPartition) 方法进行抽样,下文会详细说明;

如果一个分区抽样的样本数比平均抽样的样本数还多,会调用rdd.sample再次对不平衡样本进行采样。

之后调用determineBounds(candidates, partitions)来返回分区对用的rangeBounds,下文也会详细介绍这个方法

// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
}

  

3.RangePartitioner的sketch 源码走读

  

下面代码跟到了RangePartitioner这个伴生对象,其主要包括如下两个方法:

sketch(rdd.map(_._1), sampleSizePerPartition) 这个方法会返回抽样的总数和一个元素为(分区id,分区总数,以及抽样到的所有Key)的三元组的Array,其中使用到了水塘抽样算法,可以查看蓄水池(Reservoir_sampling)抽样算法简记

private[spark] object RangePartitioner {

  /**
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}

4.determineBounds 源码走读:

determineBounds(candidates, partitions)这个方法返回实际Key对应的分区上界值,其中candidates包含Key和Key所占的比例(weight)

/**
* Determines the bounds for range partitioning from candidates with weights indicating how many
* items each represents. Usually this is 1 over the probability used to sample this candidate.
*
* @param candidates unordered candidates with weights
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}

  

5.关于RangePartitioner和sortByKey实验

RangePartitioner在SortByKey中的应用:

返回的就是一个以RangePartitioner作为分区函数的ShuffledRDD

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

以下是做的有关RangePartition和SortByKey的实验:

RangePartitioner 实现简记

自己实现的sortByKey

RangePartitioner 实现简记

  

上一篇:boost信号量 boost::interprocess::interprocess_semaphore的用法


下一篇:PDF文件可以转换成txt文档吗