何谓数据倾斜?数据倾斜指的是,并行处理的数据集 中,某一部分(如Spark的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
表现为整体任务基本完成,但仍有少量子任务的reduce还在运行。
数据倾斜的原因:
1.join
一个表较小,但key集中,分发到一个或者几个reduce上的数据远高于平均值;
大表与大表关联,但分桶的判断字段0值或者空值过多,这些空值或者0值都由一个reduce处理
2.group by
分组的维度过少,每个维度的值过多,导致处理某值的reduce耗时很久
3.count distinct
特殊值过多,处理特殊值耗时
综上所述原因就是:
key值分布不均,数据本身的原因(特殊值过多),sql语句不合理,表建的不合理
解决数据倾斜的方法:
1.参数配置
hive> set hive.map.aggr=true; 设置map端聚合
hive> set hive.groupby.skewindata=true; 当数据倾斜时,进行负责均衡
2.语句优化
小表与大表join时,使用mapjoin 将小表加载到内存中。
scala> hivecon.sql("select /*MAPJOIN(tbsex)*/ b.custname,b.nianling,a.sexname from tbsex a join cust b on a.id=b.sex").show
+---------------+--------+-------+
| custname|nianling|sexname|
+---------------+--------+-------+
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| nihao| 5| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| nihao| 5| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
+---------------+--------+-------+
如果关联的key存在空值,可以过滤掉空值再进行关联也可以为空值赋一个随机值
scala> hivecon.sql("select b.custname,b.nianling,a.sexname from tbsex a join cust b on b.sex is not null and a.id=b.sex").show
+---------------+--------+-------+
| custname|nianling|sexname|
+---------------+--------+-------+
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| hello| 100| man|
| wangwu| 47| man|
| liuqin| 56| man|
| nihao| 5| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| nihao| 5| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
| mahuateng| 1001| woman|
| liuyang| 32| woman|
| zhangsan| 20| woman|
| wangwu| 85| woman|
|tianyt_touch100| 50| woman|
+---------------+--------+-------+
把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。 concat('hehe',rand())
count distinct 引起的数据倾斜,可以先去重后再进行统计
scala> hivecon.sql("select sex,count(distinct custname) from cust group by sex").show
+----+------------------------+
| sex|count(DISTINCT custname)|
+----+------------------------+
|null| 1|
| 1| 6|
| 0| 3|
+----+------------------------+
scala> hivecon.sql("select sex,count(1) from (select sex,custname from cust group by custname, sex) mm group by sex").show
+----+--------+
| sex|count(1)|
+----+--------+
|null| 1|
| 1| 6|
| 0| 3|
+----+--------+
3.map和reduce优化
小文件过多的时候合并小文件
hive> set hive.merge.mapfiles=true;
单个文件过大可以设置map的个数