什么是数据倾斜
对于集群系统,一般缓存是分布式的,即不同节点处理一定范围的缓存数据。
如果缓存数据的分散度不够,导致大量的缓存数据集中到了一台或者少数几台的服务器节点上,称为数据倾斜。
数据倾斜的表现
mapreduce执行时,大部分reduce节点执行完毕,但是只有少数几个reduce执行很慢(卡在reduce=99%),
导致整个任务运行很慢,这是因为这几个少数节点处理的数据远远大于其他节点,数据分布极不平衡。
数据倾斜常见情形:
关键词 | 情形 |
Join | 小表关联大表,小表的key比较集中 |
大表关联大表,但是分桶字段比较集中 | |
Group by | 某个值的记录过多 |
Count distinct | 某个值的记录过多 |
以上三者都会导致少数几个reduce节点分配到的数据过多。
数据倾斜的解决方法
- Group by倾斜
解决方法1:set hive.map.aggr=true;
在mapper进行aggregation(聚合)。该参数默认就是为true的。
疑问:为什么在mapper端聚合就可以解决数据倾斜呢?
解决方法2:set hive.groupby.skewindata=true;
这样就不会将相同的key分发给一个reduce了,而是随机分发,在reduce端做聚合,然后再启动一轮MR(为什么?),
用聚合后的结果再计算结果。这样做,就和set hive.map.aggr=true做类似的事情了,只不过将聚合放在reduce端了,
而且还要额外启动一轮mapreduce,效果不明显,不推荐使用。
- Count distinct倾斜
改写SQL,比如以下改写:
select a,count(distinct b) as c from T group by a;
改写成:
select a,count(1) from (select a,b from T group by a,b) X group by a;
- Join倾斜
解决方法1:参数设置
set hive.optimize.skewjoin=true;
将这种特殊的key先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,
期望能提高计算这部分值的处理速度。还要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,
比如默认值是100000,那么超过100000条记录的值就是特殊值。
解决方法2:特殊值分开处理
可以通过改写SQL,将比较集中的key单独拿出来:
数据集1(key比较集中) union all 数据集2(不含数据集1中的key)
解决方法3:随机数分配法
select a.*,b.*
from (select *,cast(rand() * 10 as int) as r_id from logs) a
join (select *,r_id from items lateral view explode(range_list(1,10)) rl as r_id) b
on a.item_id = b.item_id
and a.r_id = b.r_id
解决方法4:使用map-side join
set hive.auto.convert.join=true;
此方法适用于小表关联大表的情况。
典型的业务场景
- 因空值过多导致的数据倾斜
空值也可以是一个key,因此也可能导致数据倾斜。
- 因关联字段有多个数据类型导致的数据倾斜
表A的key是int,表B的key既有int又有string,导致表B所有string类型的key分配到一个reducer中去。
解决方法:将表B的key同样转换成int。
Hive中应该不会存在这种类型的数据倾斜,因为Hive中的字段数据类型是确定的,不会出现一个字段有多种数据类型的情况。
- 使用map-side join解决数据倾斜
如果我们能在mapper端执行关联计算,就不会发生reducer倾斜。