什么是数据倾斜
对于集群系统,一般缓存是分布式的,即不同节点处理一定范围的缓存数据。如果缓存数据的分散度不够,导致大量的缓存数据集中到了一台或者少数几台的服务器节点上,称为数据倾斜。
数据倾斜的表现
mapreduce执行时,大部分reduce节点执行完毕,但是只有其中少数几个reduce执行很慢(卡在reduce=99%),导致整个任务运行很慢,这是因为这几个少数节点处理的数据远远大于其他节点,数据分布极不平衡。
数据倾斜常见情形
关键词 |
情形 |
Join |
小表关联大表,小表的key比较集中 |
大表关联大表,但是分桶字段比较集中 |
|
Group by |
某个值的记录过多 |
Count distinct |
某个值的记录过多 |
以上三者都会导致少数几个reduce节点分配到的数据过多。
数据倾斜的解决方法
- Group by倾斜
方法1:set hive.map.aggr=true;--在mapper进行aggregation(聚合)。该参数默认就是为true的。
所以为什么在mapper端聚合可以解决数据倾斜呢?(待解决)
方法2:set hive.groupby.skewindata=true。这样就不会将相同的key分发给一个reduce了,而是随机分发,在reduce端做聚合,然后再启动一轮MR(为什么?),用聚合后的结果再计算结果。这样做,就和set hive.map.aggr=true做类似的事情了,只不过将聚合放在reduce端了,而且还要额外启动一轮mapreduce,效果不明显,不推荐使用。
- Count distinct倾斜
方法:改写SQL,比如以下改写:
select a,count(distinct b) as c from T group by a;
改写成:
select a,count(1) from (select a ,b from T group by a,b) X group by a
- Join倾斜
解决方法1:参数设置
用于关联的字段key数据分布不平衡。使用参数:
set hive.optimize.skewjoin=true;
将这种特殊的key先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。
还有要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值。
解决方法2:特殊值分开处理
可以通过改写SQL,将比较集中的key单独拿出来:
数据集1(key比较集中) union all 数据集2(不含数据集1中的key)
解决方法3:随机数分配法
select
a.*,
b.*
from
(
select *, cast(rand() * 10 as int) as r_id from logs
)
a
join
(
select *, r_id from items lateral view explode(range_list(1, 10)) rl as r_id
)
b
on
a.item_id = b.item_id
and a.r_id = b.r_id
解决方法4:使用map-side join
set hive.auto.convert.join=true;--适用于小表关联大表的情况
典型的业务场景
- 因空值过多导致的数据倾斜
空值也可以是一个key,因此也可能导致数据倾斜。
- 因关联的字段有多个数据类型导致的数据倾斜
表A的key是int,表B的key既有int又有string(这样真的存在么?),导致表B所有string类型的key分配到一个reducer中去。解决方法:将表B的key统一转换成int。
- 使用map-side join解决数据倾斜
如果我们能在mapper端执行关联计算,就不会发生reducer倾斜。