数据准备
union
先来说说数据准备阶段的 union 和 sample。union在我们日常的开发中,union 非常常见,它常常用于把两个类型一致、但来源不同的 RDD 进行合并,从而构成一个统一的、更大的分布式数据集。例如,在某个数据分析场景中,一份数据源来自远端数据库,而另一份数据源来自本地文件系统,要将两份数据进行合并,我们就需要用到 union 这个操作。具体怎么使用呢?我来举个例子。给定两个 RDD:rdd1 和 rdd2,调用 rdd1.union(rdd2) 或是 rdd1 union rdd2,其结果都是两个 RDD 的并集,具体代码如下:
// T:数据类型
val rdd1: RDD[T] = _
val rdd2: RDD[T] = _
val rdd = rdd1.union(rdd2)
// 或者rdd1 union rdd2
特别强调的是,union 操作能够成立的前提,就是参与合并的两个 RDD 的类型必须完全一致。也就是说,RDD[String]只能与 RDD[String]合并到一起,却无法与除 RDD[String]以外的任何 RDD 类型(如 RDD[Int]、甚至是 RDD[UserDefinedClass])做合并。对于多个类型一致的 RDD,我们可以通过连续调用 union 把所有数据集合并在一起。例如,给定类型一致的 3 个 RDD:rdd1、rdd2 和 rdd3,我们可以使用如下代码把它们合并在一起。
// T:数据类型
val rdd1: RDD[T] = _
val rdd2: RDD[T] = _
val rdd3: RDD[T] = _
val rdd = (rdd1.union(rdd2)).union(rdd3)
// 或者 val rdd = rdd1 union rdd2 union rdd3
union 的典型使用场景,是把多份“小数据”,合并为一份“大数据”,从而充分利用 Spark 分布式引擎的并行计算优势。与之相反,在一般的数据探索场景中,我们往往只需要对一份数据的子集有基本的了解即可。例如,对于一份体量在 TB 级别的数据集,我们只想随机提取其部分数据,然后计算这部分子集的统计值(均值、方差等)。那么,面对这类把“大数据”变成 “小数据”的计算需求,Spark 又如何进行支持呢?这就要说到 RDD 的 sample 算子了。
sample
sample 算子用于对 RDD 做随机采样,从而把一个较大的数据集变为一份“小数据”。相较其他算子,sample 的参数比较多,分别是 withReplacement、fraction 和 seed。因此,要在 RDD 之上完成数据采样,你需要使用如下的方式来调用 sample 算子:sample(withReplacement, fraction, seed)。
其中,withReplacement 的类型是 Boolean,它的含义是“采样是否有放回”,如果这个参数的值是 true,那么采样结果中可能会包含重复的数据记录,相反,如果该值为 false,那么采样结果不存在重复记录。
fraction 参数最好理解,它的类型是 Double,值域为 0 到 1,其含义是采样比例,也就是结果集与原数据集的尺寸比例。
seed 参数是可选的,它的类型是 Long,也就是长整型,用于控制每次采样的结果是否一致。光说不练假把式,我们还是结合一些示例,这样才能更好地理解 sample 算子的用法。
// 生成0到99的整型数组
val arr = (0 until 100).toArray
// 使用parallelize生成RDD
val rdd = sc.parallelize(arr)
// 不带seed,每次采样结果都不同
rdd.sample(false, 0.1).collect
// 结果集:Array(11, 13, 14, 39, 43, 63, 73, 78, 83, 88, 89, 90)
rdd.sample(false, 0.1).collect
// 结果集:Array(6, 9, 10, 11, 17, 36, 44, 53, 73, 74, 79, 97, 99)
// 带seed,每次采样结果都一样
rdd.sample(false, 0.1, 123).collect
// 结果集:Array(3, 11, 26, 59, 82, 89, 96, 99)
rdd.sample(false, 0.1, 123).collect
// 结果集:Array(3, 11, 26, 59, 82, 89, 96, 99)
// 有放回采样,采样结果可能包含重复值
rdd.sample(true, 0.1, 456).collect
// 结果集:Array(7, 11, 11, 23, 26, 26, 33, 41, 57, 74, 96)
rdd.sample(true, 0.1, 456).collect
// 结果集:Array(7, 11, 11, 23, 26, 26, 33, 41, 57, 74, 96)
数据预处理
先理解并行度这个概念。所谓并行度,它实际上就是 RDD 的数据分区数量。还记得吗?RDD 的 partitions 属性,记录正是 RDD 的所有数据分区。因此,RDD 的并行度与其 partitions 属性相一致。开发者可以使用 repartition 算子随意调整(提升或降低)RDD 的并行度,而 coalesce 算子则只能用于降低 RDD 并行度。显然,在数据分布的调整方面,repartition 灵活度更高、应用场景更多,我们先对它进行介绍,之后再去看看 coalesce 有什么用武之地。
repartition
一旦给定了 RDD,我们就可以通过调用 repartition(n) 来随意调整 RDD 并行度。其中参数 n 的类型是 Int,也就是整型,因此,我们可以把任意整数传递给 repartition。按照惯例,咱们还是结合示例熟悉一下 repartition 的用法。
// 生成0到99的整型数组
val arr = (0 until 100).toArray
// 使用parallelize生成RDD
val rdd = sc.parallelize(arr)
rdd.partitions.length
// 4
val rdd1 = rdd.repartition(2)
rdd1.partitions.length
// 2
val rdd2 = rdd.repartition(8)
rdd2.partitions.length
// 8
通过数组创建用于实验的 RDD,从这段代码里可以看到,该 RDD 的默认并行度是 4。在我们分别用 2 和 8 来调整 RDD 的并行度之后,通过计算 RDD partitions 属性的长度,我们发现新 RDD 的并行度分别被相应地调整为 2 和 8。看到这里,你可能还有疑问:“我们为什么需要调整 RDD 的并行度呢?2 和 8 看上去也没什么实质性的区别呀”。我们介绍过,每个 RDD 的数据分区,都对应着一个分布式 Task,而每个 Task 都需要一个 CPU 线程去执行。
RDD 的并行度,很大程度上决定了分布式系统中 CPU 的使用效率,进而还会影响分布式系统并行计算的执行效率。并行度过高或是过低,都会降低 CPU 利用率,从而白白浪费掉宝贵的分布式计算资源,因此,合理有效地设置 RDD 并行度,至关重要。这时你可能会追问:“既然如此,那么我该如何合理地设置 RDD 的并行度呢?”坦白地说,这个问题并没有固定的答案,它取决于系统可用资源、分布式数据集大小,甚至还与执行内存有关。不过,结合经验来说,把并行度设置为可用 CPU 的 2 到 3 倍,往往是个不错的开始。例如,可分配给 Spark 作业的 Executors 个数为 N,每个 Executors 配置的 CPU 个数为 C,那么推荐设置的并行度坐落在 NC2 到 NC3 这个范围之间。
尽管 repartition 非常灵活,你可以用它随意地调整 RDD 并行度,但是你也需要注意,这个算子有个致命的弊端,那就是它会引入 Shuffle。我们知道,由于 Shuffle 在计算的过程中,会消耗所有类型的硬件资源,尤其是其中的磁盘 I/O 与网络 I/O,因此 Shuffle 往往是作业执行效率的瓶颈。正是出于这个原因,在做应用开发的时候,我们应当极力避免 Shuffle 的引入。但你可能会说:“如果数据重分布是刚需,而 repartition 又必定会引入 Shuffle,我该怎么办呢?”如果你想增加并行度,那我们还真的只能仰仗 repartition,Shuffle 的问题自然也就无法避免。但假设你的需求是降低并行度,这个时候,我们就可以把目光投向 repartition 的孪生兄弟:coalesce。
coalesce
在用法上,coalesce 与 repartition 一样,它也是通过指定一个 Int 类型的形参,完成对 RDD 并行度的调整,即 coalesce (n)。那两者的用法到底有什么差别呢?我们不妨结合刚刚的代码示例,来对比 coalesce 与 repartition。
// 生成0到99的整型数组
val arr = (0 until 100).toArray
// 使用parallelize生成RDD
val rdd = sc.parallelize(arr)
rdd.partitions.length
// 4
val rdd1 = rdd.repartition(2)
rdd1.partitions.length
// 2
val rdd2 = rdd.coalesce(2)
rdd2.partitions.length
// 2
在用法上,coalesce 与 repartition 可以互换,二者的效果是完全一致的。不过,如果我们去观察二者的 DAG,会发现同样的计算逻辑,却有着迥然不同的执行计划。
在 RDD 之上调用 toDebugString,Spark 可以帮我们打印出当前 RDD 的 DAG。
尽管图中的打印文本看上去有些凌乱,但你只要抓住其中的一个关键要点就可以了。这个关键要点就是,在 toDebugString 的输出文本中,每一个带数字的小括号,比如 rdd1 当中的“(2)”和“(4)”,都代表着一个执行阶段,也就是 DAG 中的 Stage。
而且,不同的 Stage 之间,会通过制表符(Tab)缩进进行区分,比如图中的“(4)”显然要比“(2)”缩进了一段距离。对于 toDebugString 的解读,你只需要掌握到这里就足够了。
学习过调度系统之后,我们已经知道,在同一个 DAG 内,不同 Stages 之间的边界是 Shuffle。因此,观察上面的打印文本,我们能够清楚地看到,repartition 会引入 Shuffle,而 coalesce 不会。那么问题来了,同样是重分布的操作,为什么 repartition 会引入 Shuffle,而 coalesce 不会呢?
原因在于,二者的工作原理有着本质的不同。给定 RDD,如果用 repartition 来调整其并行度,不论增加还是降低,对于 RDD 中的每一条数据记录,repartition 对它们的影响都是无差别的数据分发。具体来说,给定任意一条数据记录,repartition 的计算过程都是先哈希、再取模,得到的结果便是该条数据的目标分区索引。对于绝大多数的数据记录,目标分区往往坐落在另一个 Executor、甚至是另一个节点之上,因此 Shuffle 自然也就不可避免。coalesce 则不然,在降低并行度的计算中,它采取的思路是把同一个 Executor 内的不同数据分区进行合并,如此一来,数据并不需要跨 Executors、跨节点进行分发,因而自然不会引入 Shuffle。这里我还特意准备了一张示意图,更直观地为你展示 repartition 与 coalesce 的计算过程,图片文字双管齐下,相信你一定能够更加深入地理解 repartition 与 coalesce 之间的区别与联系。