Spark 数据倾斜的解决方案
Shuffle(聚合) 时导致的数据倾斜
当 Shuffle 时出现了数据倾斜,我们一般的问题排查步骤
① 查看 WEB-UI
页面,查看各个 Job
的 Stage
中 Task
的执行情况,是否有明显执行时间过长的情况
② 如果任务报错,查看对应的日志异常堆栈信息,是否有内存溢出的情况
③ 抽样查看倾斜的 key
val result = rdd
// withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
// fraction:抽取的数据比例
// seed:表示一个种子,根据这个seed随机抽取
// 抽取 50% 的数据
.sample(true, 0.5)
.map((_, 1))
.reduceByKey(_ + _)
// 查看倾斜 key 的 Top3
.take(3)
第 1 种解决方案:使用 Hive ETL 对数据进行预处理
场景说明
导致数据倾斜的是 Hive
表,如果在 Hive
表中的数据本身分布不是均匀的(比如某个 key
对应的数据有 100w,其他的 key
对应的数据却只有几十条),此时我们需要通过 Spark
对数据做频繁的分析,那么就会导致数据倾斜的发生
方案说明
此时我们需要评估下,是否可以通过 Hive
做数据的预处理( 在 ETL 的时候或者预先和其他表 Join ),在接下来的 Spark 作业中,由于已经做过预处理了,我们就不需要使用原先的操作
方案的优缺点
① 优点
把 Spark 的倾斜转移到了 Hive 的 ETL 阶段
② 缺点
会导致 Hive 的 ETL 阶段发生倾斜,请查考 Hive SQL 优化
第 2 种解决方案:过滤少数导致倾斜的 Key
场景说明
少数的 key
导致了数据倾斜
方案说明
如果倾斜的 key 是无用的数据或者过滤掉倾斜的 key
对应的数据不影响结果,此时可以考虑直接将倾斜的 key 过滤掉
inputRdd.filter(_.equals("xxx"))
方案的优缺点
① 优点
直接避免了倾斜 key 的发生
② 缺点
这种场景比较少
第 3 种解决方案:提高 shuffle 并行度
场景说明
需要对倾斜的 key 做处理,此时提高并行度是优先的方案
方案说明
在执行 shuffle 算子时,直接对该算子增加并行度,该种设置的优先级最高。将原本执行倾斜 key 的 task 的数量增加,从而提高并行度,减少计算的时间
val result = rdd
.map((_, 1))
// 将并行度提高至 500
.reduceByKey(_ + _,500)
方案的优缺点
① 优点
有效缓减和减轻数据倾斜的影响
② 缺点
没有彻底根除数据倾斜,只是缓解而已。可能会出现极端情况:无论怎样增加 task 的数量,最终倾斜的 key 仍然被分配到某个 task
一般结合其他方案使用
第 4 种解决方案:两阶段聚合(局部聚合 + 全局聚合)
方案说明
局部聚合:先给每个 key 添加一个随机数,此时 key 就会发生变化
# 添加随机数前
(hello,1)(hello,1)(hello,1)(hello,1)(hello,1)
# 添加随机数后
(1_hello,1)(1_hello,1)(2_hello,1)(2_hello,1)(3_hello,1)
# 局部聚合后的结果,例如执行 reduceByKey 操作
(1_hello,2)(2_hello,2)(3_hello,1)
全局聚合:将随机值去掉,然后进行全局聚合操作
# 局部聚合后的结果,例如执行 reduceByKey 操作
(1_hello,2)(2_hello,2)(3_hello,1)
# 全局聚合
(hello,5)
代码示例
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate()
val sc: SparkContext = spark.sparkContext
// 假设倾斜的 key 为 A
val inputRdd: RDD[String] = sc.parallelize(Array(
"A", "A", "A", "A", "A", "A", "A", "A", "A", "A",
"A", "A", "A", "A", "A", "A", "A", "A", "A", "A",
"A", "A", "A", "A", "B", "B", "B", "B", "B", "B",
"B", "B", "C", "D", "E", "F", "G", "B", "B", "B"
))
val random = new Random(10)
// 局部聚合
val mapRdd: RDD[(String, Int)] = inputRdd
// 每个 key 增加随机值
.map(ele => (random.nextInt() + "_" + ele, 1))
// 聚合
.reduceByKey(_ + _, 1000)
// 全局聚合
val resultRdd: RDD[(String, Int)] = mapRdd
// 每个 key 去除随机值
.map(ele => (ele._1.split("_")(1), ele._2))
// 聚合
.reduceByKey(_ + _)
println(resultRdd.collect().toBuffer)
sc.stop()
}
方案的优缺点
① 优点
对于聚合类的 shuffle ,可以直接解决数据倾斜,或者大幅度缓减数据倾斜的问题,将 Spark 的作业性能显著提升
② 缺点
适用的场景比较少,对于 Join
产生的 shuffle
数据倾斜无法解决
Shuffle(Join) 时导致的数据倾斜
第 1 种解决方案:将 reduce join 转换为 map join
使用场景(大表关联小表)
在对 RDD 使用 Join 类操作,或者是在 Spark SQL 中使用 Join 语句时,其中的一个 RDD 或表的数据量比较小(比如几百 M 或者一两 G)
方案说明
不使用 Join 算子进行连接操作,使用 Broadcast 变量与 map 类算子实现 Join 操作,进而完全规避掉shuffle 类的操作,彻底避免数据倾斜的发生和出现
即:将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个Broadcast 变量,广播给其他 Executor 节点
代码示例
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate()
val sc: SparkContext = spark.sparkContext
// 小表
val student: List[(String, String)] = List(
("1", "Kyle"), ("2", "Jack"), ("3", "Lucy"), ("4", "Amy")
)
// 大表
val score: RDD[(String, Int)] = sc.parallelize(List(
("1", 90), ("2", 80), ("3", 65), ("4", 77),
("5", 68), ("6", 69), ("7", 57), ("8", 99)
))
// 广播
val broadcast: Broadcast[List[(String, String)]] = sc.broadcast(student)
val result: RDD[(String, Int)] = score
.map {
case (id, score) =>
var temp = ""
// 获取广播中的数据,手动进行关联匹配
for ((k, v) <- broadcast.value) {
if (k.equals(id)) {
// 根据 id 获取学生的姓名
temp = v
}
}
(temp, score)
}
// 过滤掉没有关联到的数据
.filter(_._1.nonEmpty)
println(result.collect().toBuffer)
sc.stop()
}
方案优缺点
① 优点
对于 Join 操作导致的数据倾斜效果非常好,因为根本没有发生 shuffle ,所以也就没有发生数据倾斜
② 缺点
适用的场景比较少,只适用于一个大表和一个大表的情况。当我们将小表数据进行广播,Driver
和 Executer
都会保留一份小 RDD
的数据,如果数据过大则会出现 OOM
的情况。因此,改种情况不符合大表 Join
大表的情况。
第 2 种解决方案:采样倾斜 key 并分拆 Join 操作(少量 Key 倾斜)
方案说明
两个 RDD/Hive表
进行 Join 的时候,如果两方的数据量都比较大,那么可以检查两个 RDD/Hive表
中的 key
的分布情况
如果出现数据倾斜,是因为其中某一个 RDD/Hive表
中的少数几个 key 的数据量过大,而另一个 RDD/Hive表
中的所有 key 分布都比较均匀,此时采用该方案
实现思路
注意:rdd1 和 rdd2 的数据量都很大,但是 rdd1 是倾斜的,rdd2 是均匀的
① 对包含少量倾斜 key
的 RDD
进行采样,从而获得发生倾斜的 key
val skewedKey: Array[String] = inputRdd
.sample(false, 0.5)
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
// 按照 key 对应的数据量排序,并获取 Top3 的倾斜 key
.take(3)
.map(_._1)
② 从包含少量倾斜 key
的 RDD
中过滤出倾斜 key
的数据,形成独立的 RDD
val sc: SparkContext = spark.sparkContext
// 包含倾斜 key 的 RDD,假设它的数据量很大
val rdd1: RDD[(String, String)] = sc.parallelize(List(
("aa", "Kyle"), ("bb", "Jack"), ("cc", "Lucy"), ("aa", "Amy")
))
// 筛选出倾斜 key 对应的数据
val skewedRdd: RDD[(String, String)] = rdd1.filter(ele => skewedKey.contains(ele._1))
③ 从包含少量倾斜 key
的 RDD
中过滤出非倾斜 key
的数据,形成独立的 RDD
// 筛选出非倾斜 key 对应的数据
val notSkewedRdd: RDD[(String, String)] = rdd1.filter(ele => !skewedKey.contains(ele._1))
④ 从非倾斜的 RDD
中过滤出包含倾斜的 key
的数据,扩大 n倍
并形成独立的 RDD
// 筛选出倾斜 key 对应的数据并扩大
val expandRdd: RDD[(String, String)] = rdd2
.filter(ele => skewedKey.contains(ele._1))
.flatMap(ele => {
import scala.collection.mutable.ListBuffer
val temp: ListBuffer[(String, String)] = ListBuffer()
// 将均匀 RDD 中对应的倾斜 key 的数据扩大 100 倍
for (i <- 1 to 100) {
temp += (i + "_" + ele._1, ele._2)
}
temp
})
⑤ 将倾斜的 rdd1 中的数据和扩大后的数据进行关联
val joinRdd1: RDD[(String, String)] = skewedRdd
.map(ele => (random.nextInt(100) + "_" + ele._1, ele._2))
.join(expandRdd)
.map(ele => (ele._2._1.split("_")(1), ele._2._1))
⑥ 将 rdd1 中非倾斜的数据和 rdd2 进行关联
val joinRdd2: RDD[(String, String)] = notSkewedRdd.join(rdd2)
⑦ 将两个 join 的结果进行合并
val result: RDD[(String, String)] = joinRdd1.union(joinRdd2)
方案优缺点
① 优点
针对少部分倾斜的 key 可以使用此方案
② 缺点
如果倾斜的 key 的数量非常多,那么不适用该方案
第 3 种解决方案:使用随机前缀和扩容 RDD 进行 Join(大量 Key 倾斜)
方案说明
两个 RDD/Hive表
进行 Join 的时候,如果两方的数据量都比较大,其中一方包含多个倾斜的 key
,每个倾斜的 key
都可能对应超过 1w+
的数据,此时我们就需要使用该方案
实现思路
① 将分布比较均匀的 RDD
扩大 n
倍
// 筛选出倾斜 key 对应的数据并扩大
val expandRdd: RDD[(String, String)] = rdd1
.flatMap(ele => {
import scala.collection.mutable.ListBuffer
val temp: ListBuffer[(String, String)] = ListBuffer()
// 将均匀 RDD 中对应的倾斜 key 的数据扩大 100 倍
for (i <- 1 to 100) {
temp += (i + "_" + ele._1, ele._2)
}
temp
})
② 将发生倾斜的 RDD
扩大每条数据都打上随机值
val skewedRdd: RDD[(String, String)] = rdd2
.map(ele => (random.nextInt(100) + "_" + ele._1, ele._2))
③ 将两个 RDD
进行关联
val result: RDD[(String, String)] = expandRdd.union(skewedRdd)
方案优缺点
① 优点
效果提升显著
② 缺点
扩容后,对资源的消耗比较大