Hive SQL基本上适用大数据领域离线数据处理的大部分场景。Hive SQL的优化也是我们必须掌握的技能,而且,面试一定会问。那么,我希望面试者能答出其中的80%优化点,在这个问题上才算过关。
Hive优化目标
在有限的资源下,执行效率更高
常见问题
数据倾斜
map数设置
reduce数设置
其他
Hive执行
HQL --> Job --> Map/Reduce
执行计划
explain [extended] hql
样例
select col,count(1) from test2 group by col;
explain select col,count(1) from test2 group by col;
Hive表优化
分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
静态分区
动态分区
分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
数据
相同数据尽量聚集在一起
Hive Job优化
并行化执行
每个查询被hive转化成多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间
set hive.exec.parallel= true;
set hive.exec.parallel.thread.numbe=8;
本地化执行
job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
job的reduce数必须为0或者1
set hive.exec.mode.local.auto=true;
当一个job满足如下条件才能真正使用本地模式:
job合并输入小文件
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
合并文件数由mapred.max.split.size限制的大小决定
job合并输出小文件
set hive.merge.smallfiles.avgsize=256000000;当输出文件平均小于该值,启动新job合并文件
set hive.merge.size.per.task=64000000;合并之后的文件大小
JVM重利用
set mapred.job.reuse.jvm.num.tasks=20;
JVM重利用可以使得JOB长时间保留slot,直到作业结束,这在对于有较多任务和较多小文件的任务是非常有意义的,减少执行时间。当然这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他的作业可能就需要等待。
压缩数据
set hive.exec.compress.output=true;
set mapred.output.compreession.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapred.output.compression.type=BLOCK;
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
中间压缩就是处理hive查询的多个job之间的数据,对于中间压缩,最好选择一个节省cpu耗时的压缩方式
hive查询最终的输出也可以压缩
Hive Map优化
set mapred.map.tasks =10; 无效
(1)默认map个数
default_num=total_size/block_size;
(2)期望大小
goal_num=mapred.map.tasks;
(3)设置处理的文件大小
split_size=max(mapred.min.split.size,block_size);
split_num=total_size/split_size;
(4)计算的map个数
compute_map_num=min(split_num,max(default_num,goal_num))
经过以上的分析,在设置map个数的时候,可以简答的总结为以下几点:
增大mapred.min.split.size的值
如果想增加map个数,则设置mapred.map.tasks为一个较大的值
如果想减小map个数,则设置mapred.min.split.size为一个较大的值
情况1:输入文件size巨大,但不是小文件
情况2:输入文件数量巨大,且都是小文件,就是单个文件的size小于blockSize。这种情况通过增大mapred.min.split.size不可行,需要使用combineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。
map端聚合
set hive.map.aggr=true;
推测执行
mapred.map.tasks.apeculative.execution
Hive Shuffle优化
Map端
io.sort.mb
io.sort.spill.percent
min.num.spill.for.combine
io.sort.factor
io.sort.record.percent
Reduce端
mapred.reduce.parallel.copies
mapred.reduce.copy.backoff
io.sort.factor
mapred.job.shuffle.input.buffer.percent
mapred.job.shuffle.input.buffer.percent
mapred.job.shuffle.input.buffer.percent
Hive Reduce优化
需要reduce操作的查询
group by,join,distribute by,cluster by...
order by比较特殊,只需要一个reduce
sum,count,distinct...
聚合函数
高级查询
推测执行
mapred.reduce.tasks.speculative.execution
hive.mapred.reduce.tasks.speculative.execution
Reduce优化
numRTasks = min[maxReducers,input.size/perReducer]
maxReducers=hive.exec.reducers.max
perReducer = hive.exec.reducers.bytes.per.reducer
hive.exec.reducers.max 默认 :999
hive.exec.reducers.bytes.per.reducer 默认:1G
set mapred.reduce.tasks=10;直接设置
计算公式
Hive查询操作优化
join优化
关联操作中有一张表非常小
不等值的链接操作
set hive.auto.current.join=true;
hive.mapjoin.smalltable.filesize默认值是25mb
select
/*+mapjoin(A)*/
f.a,f.b from A t join B f on (f.a=t.a)hive.optimize.skewjoin=true;如果是Join过程出现倾斜,应该设置为true
set hive.skewjoin.key=100000; 这个是join的键对应的记录条数超过这个值则会进行优化
mapjoin
简单总结下,mapjoin的使用场景:
Bucket join
两个表以相同方式划分桶
两个表的桶个数是倍数关系
crete table order(cid int,price float) clustered by(cid) into 32 buckets;
crete table customer(id int,first string) clustered by(id) into 32 buckets;
select price from order t join customer s on t.cid=s.id
join 优化前
select m.cid,u.id from order m join customer u on m.cid=u.id where m.dt='2013-12-12';
join优化后
select m.cid,u.id from (select cid from order where dt='2013-12-12')m join customer u on m.cid=u.id;
group by 优化
hive.groupby.skewindata=true;如果是group by 过程出现倾斜 应该设置为true
set hive.groupby.mapaggr.checkinterval=100000;--这个是group的键对应的记录条数超过这个值则会进行优化
count distinct 优化
优化前
select count(distinct id) from tablename
优化后
select count(1) from (select distinct id from tablename) tmp;
select count(1) from (select id from tablename group by id) tmp;
优化前
select a,sum(b),count(distinct c),count(distinct d) from test group by a
优化后
select a,sum(b) as b,count(c) as c,count(d) as d from(select a,0 as b,c,null as d from test group by a,c union all select a,0 as b,null as c,d from test group by a,d union all select a,b,null as c,null as d from test)tmp1 group by a;