Hive:select count(distinct)优化以及hive.groupby.skewindata
原文链接:https://juejin.cn/post/6926536667877048333
问题引入
数据分析师小A接到需求,需要统计当日各个省份20岁以下的日活跃用户数(去重统计user_id,即UV)
现有一个Hive表存储着用户行为数据:user_behaviour_trace_info
列 | 描述 |
---|---|
user_id | 用户id |
nickname | 昵称 |
age | 年龄 |
province | 省份 |
url | 访问地址 |
access_time | 访问时间 |
device_id | 用户手机设备id |
小A很顺其自然的写出这段SQL:
select
province,
count(distinct user_id) as uv
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
group by
province
复制代码
立马提交SQL开始执行任务,一顿操作猛如虎,一看时长十点五(小时)
心想不愧是用户行为数据,数据量居然这么大?那让我们看看任务各个Task的执行耗时:
以下三个JobHistory截图属于另一个select count(distinct)数据倾斜任务,具有代表意义
我们可以观察到
- 任务整体耗时:10小时11分钟:
- Map Task 平均耗时:1分16秒
- Reduce Task 平均耗时:1分59秒
任务执行时间长,MR Task 平均耗时短,极有可能是出现了数据倾斜!
那我们继续看看Map Task的执行情况,按Map Task耗时倒序排序
Map Task最长耗时为2分49秒,而且整体看起来运行耗时相差不大,问题不在Map阶段
接下来看看Reduce阶段,按Reduce Task耗时倒序排序
好家伙,有一个Reduce Task执行了10个小时,另个一执行了近2小时,其余Reduce Task的执行时间很短。
说好了大家一起干活,最终却只有我一个人扛下了所有?
那么,问题出在哪里?
我们先要弄明白 Hive 是如何执行这段 SQL 的
Hive SQL 最终要转化成 MapReducer 任务,在逻辑上可以细分为三个阶段:
- Map阶段:将 group by 字段作为 key,聚合函数中的列作为 Value,输出键值对
- Shuffle阶段:对 Map 阶段输出的键值对 Key 进行 Hash,按照Hash值将键值对发送至各个 Reducer 中(相同的 Key 会分配给同一个 Reducer)
- Reduce阶段:执行聚合操作
简而言之:SQL 中的 Group By 字段会决定某条数据最终落在哪一个 Reducer 上处理。
下文将 group by 的字段称之为 group_by_column
那么,对于刚刚那段SQL,group_by_column 是 province,同一个 province 的数据会分配给同一个 Reducer,在 Reduce 阶段,对 user_id 进行去重统计
然鹅,我国共有34个省级行政区域,一个 Reducer 处理一个省的数据,最多也只能有 34 个 Reducer 同时处理数据
当然,多个省份还可能落在同一个 Reducer 中
如何优雅的解决问题?
我们先来分析一下最初那段SQL的本质:
- Map + Shuffle阶段:按 province 将数据分发给 Reducer
- Reduce阶段:对同一个 province 的 user_id 先去重,再计数
Reduce 阶段任务最重,执行了去重和计数两个操作:
- 去重:在 province 内,对 user_id 去重
- 计数:统计 province 内 user_id 的个数
SQL任务慢的原因是:同一个 province 的数据全部由同一个 Reducer 处理
思考一下,不难发现:
- 可以通过 group by 实现快速去重
- 计数操作可以由多个Task分别计数,最终再汇总结果
那么优化思路就不言而喻了:将去重和计数两个操作分开,并且用多个Task同时计数,最终再汇总所有Task的计数数据
hive.groupby.skewindata参数
其实 Hive 早就考虑到这个场景,并且贴心的提供了 hive.groupby.skewindata 参数。
当 hive.groupby.skewindata = true 时,Hive 会将原来的一个 MaReduce 阶段转化成两个 MapReduce 阶段:
- 一阶段MapReduce:随机打散数据,打散后进行局部聚合(数据去重 + 多Task局部计数)
- 二阶段MapReduce:对一阶段的局部聚合结果进行最终聚合(最终汇总计数)
这样的描述看起来有点云里雾里,那不妨让我们自己通过手动优化来更加深入的理解这个参数。手动优化的思路和原理和 hive.groupby.skewindata = true 是一致的
第一步:数据去重
我们先实现第一步,在每个 province 中,对 user_id 进行去重
SQL很简单,但有一些需要注意的点:
- 去重性能:group by 的去重性能要比 select distinct 要好,所以使用 group by 去重
- 数据过滤:因为要计算的 uv 指标有条件,所以需要对数据进行过滤
- null值:因为 count(distinc user_id) 不会计算 user_id 为 null 的数据,所以在去重时需要过滤 null 值
那么我们可以写出这段SQL
select
province,
user_id,
cast(rand() * 1000 as int) as random_key -- 随机数,作用稍后解释~
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20 -- uv统计条件
and user_id is not null -- count(distinc)不统计null值
group by -- 对数据进行去重
province,
user_id
复制代码
聪明的同学已经看出来了,这里除了对数据进行去重外,还多了一个随机数字段。这个随机数字段是用来做什么的呢,继续往下看你就知道了~
第二步:打散数据,计算局部聚合结果
数据去重完毕后,只需要统计每个 province 的 user_id 个数就能得到对应 province 的 uv 指标!
由上文提到,group_by_column 决定了数据怎么分发给 Reducer
同一个 group_by_column 的数据会分配给同一个 Reducer
那么我们该如何让多个 Reducer 同时计算某个 province 的 user_id 个数呢?这里就可以使用去重阶段“多出来”的 随机数 random_key !
select
province,
random_key,
sum(1) as partial_uv -- 对 user_id 进行计数,是局部聚合结果
from (
select -- 子查询是第一步SQL:数据去重
province,
user_id,
cast(rand() * 1000 as int) as random_key
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
and user_id is not null
group by
province,
user_id
) t1
group by -- 对随机数也进行 group by,让多个 Reducer 一起统计数据
province,
random_key
复制代码
使用组合键 "province + random_key" 进行 group by,同一个 province 的数据会随机分发给多个 Reducer
每个 Reducer 对 user_id 进行计数,获得局部聚合结果
任务执行过程如下:
第一步 + 第二步 就相当于是 hive.groupby.skewindata = true 时的一阶段Mapreduce
第三步:最终聚合
在第二步中,我们已经将同一个 province 的 user_id 分成多个部分,并且统计出了每个部分的 user_id 数量(partial_uv)
那么接下来,我们只要对局部聚合结果进行简单的相加就可以了
最终SQL如下:
select
province,
sum(partial_aggregation) as uv -- 最终聚合结果就是 count(distinct user_id)
from (
select -- 第二步SQL:打散数据,计算局部聚合结果
province,
random_key,
sum(1) as partial_uv
from ( -- 第一步SQL:数据去重
select
province,
user_id,
cast(rand() * 1000 as int) as random_key
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
and user_id is not null
group by
province,
user_id
) t1
group by
province,
random_key
) t2
group by
province
复制代码
进阶:如何优化多列 count(distinct)
hive.groupby.skewindata 对 count(distinct) 的优化是有限制的,当 hive.groupby.skewindata = true 时,SQL只能对一个列进行 count(distinct),否则会抛出异常:
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: DISTINCT on different columns not supported with skew in data
复制代码
其实这很容易理解,在刚刚的手动优化过程中,我们能够很容易发现,这个方法不能同时对多个列进行 去重+计数 得出各自的 count(distinct) 值
主要原因:无法在某一个维度里,同时对多个列进行去重
count(distinct)优化方案不能直接套用在计算多列的情况上,但可以采用分治的思想,对每个列单独计算 count(distinct),然后再将结果进行合并
案例
现在有个需求,需要按省份分别去重统计当日 user_id 和 device_id 的去重数量,要求用户年龄为20岁以下
优化前SQL:
select
province,
count(distinct user_id) as uv,
count(distinct device_id) as dv
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
group by
province
复制代码
优化方案
- 第一步:单独计算 uv、dv:Job1、Job2
- 第二步:合并计算结果:Job3
值得注意的是,最外层 SELECT 使用 COALESCE() 是因为:在单独计算某个 count(distinct) 时,可能因为添加了统计条件(年龄小于20岁),而导致 province 没有对应的取值,left join 时指标为 null
虽然写起来比原SQL要麻烦些,但效率吊打原SQL不知道多少倍
笔者曾经将一个9小时耗时的任务,通过该方法优化至15分钟~