Hive性能优化之计算Job执行优化

目录


1 Explain

1.1 功能

HiveQL是一种类SQL的语言,从编程语言规范来说是一种声明式语言,用户会根据查询需求提
交声明式的HQL查询,而Hive会根据底层计算引擎将其转化成Mapreduce/Tez/Spark的 job。大多
数情况下,用户不需要了解Hive内部是如何工作的,不过,当用户对于Hive具有越来越多的经验后,
尤其是需要在做性能优化的场景下,就要学习下Hive背后的理论知识以及底层的一些实现细节,会
让用户更加高效地使用Hive。explain命令就可以帮助用户了解一条HQL语句在底层的实现过程
explain会解析HQL语句,将整个HQL语句的实现步骤、依赖关系、实现过程都会进行解析返
回,可以帮助更好的了解一条HQL语句在底层是如何实现数据的查询及处理的过程,这样可以辅助
用户对Hive进行优化。
官网:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain

1.2 语法

Hive性能优化之计算Job执行优化
常用语法命令如下:
EXPLAIN [FORMATTED|EXTENDED|DEPENDENCY|AUTHORIZATION|] query
⚫ FORMATTED:对执行计划进行格式化,返回JSON格式的执行计划
⚫ EXTENDED:提供一些额外的信息,比如文件的路径信息
⚫ DEPENDENCY:以JSON格式返回查询所依赖的表和分区的列表
⚫ AUTHORIZATION:列出需要被授权的条目,包括输入与输出

1.3 组成

解析后的执行计划一般由三个部分构成,分别是:
⚫ The Abstract Syntax Tree for the query
◼ 抽象语法树:Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
⚫ The dependencies between the different stages of the plan
◼ Stage依赖关系:会列出运行查询所有的依赖以及stage的数量
⚫ The description of each of the stages
◼ Stage内容:包含了非常重要的信息,比如运行时的operator和sort orders等具体的信息
4.1.4 示例1:过滤
explain select count ( * ) as cnt from tb_emp where deptno = ‘10’;
⚫ 组成
Hive性能优化之计算Job执行优化
⚫ 解释
Hive性能优化之计算Job执行优化
Hive性能优化之计算Job执行优化

1.5 示例2:分组排序

explain
select
deptno, count ( * ) as cnt
from tb_emp where salary > 2000
group by deptno order by cnt desc;

Hive性能优化之计算Job执行优化
Hive性能优化之计算Job执行优化
Hive性能优化之计算Job执行优化

2 MapReduce属性优化

2.1 本地模式

使用Hive的过程中,有一些数据量不大的表也会转换为MapReduce处理,提交到集群时,需
要申请资源,等待资源分配,启动JVM进程,再运行Task,一系列的过程比较繁琐,本身数据量并
不大,提交到YARN运行返回会导致性能较差的问题。
Hive为了解决这个问题,延用了MapReduce中的设计,提供本地计算模式,允许程序不提交
给YARN,直接在本地运行,以便于提高小数据量程序的性能。
⚫ 配置
– 开启本地模式
set hive.exec.mode.local.auto = true;
⚫ 限制条件
Hive为了避免大数据量的计算也使用本地模式导致性能差的问题,所以对本地模式做了以
下限制,如果以下任意一个条件不满足,那么即使开启了本地模式,将依旧会提交给YARN集
群运行。
◼ 处理的数据量不超过128M
◼ MapTask的个数不超过4个
◼ ReduceTask的个数不超过1个

2.2 JVM重用

JVM正常指代一个Java进程,Hadoop默认使用派生的JVM来执行map-reducer,如果一个
MapReduce程序中有100个Map,10个Reduce,Hadoop默认会为每个Task启动一个JVM来运行,
那么就会启动100个JVM来运行MapTask,在JVM启动时内存开销大,尤其是Job大数据量情况,如
果单个Task数据量比较小,也会申请JVM资源,这就导致了资源紧张及浪费的情况。
为了解决上述问题,MapReduce中提供了JVM重用机制来解决,JVM重用可以使得JVM实例在
同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task
运行,直到运行了N个Task以后,就会释放,N的值可以在Hadoop的mapred-site.xml文件中进行
配置,通常在10-20之间。
⚫ 配置
– Hadoop3 之前的配置,在 mapred-site.xml 中添加以下参数
– Hadoop3 中已不再支持该选项
mapreduce.job.jvm.numtasks=10

2.3 并行执行

Hive在实现HQL计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨
个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的,例如Union语句,Join语句
等等,这些Stage没有依赖关系,但是Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,
我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执
行,提高性能。
⚫ 配置
– 开启 Stage 并行化,默认为 false
SET hive.exec.parallel= true;
– 指定并行化线程数,默认为 8
.SET hive.exec.parallel.thread.number=16;
⚫ 注意:线程数越多,程序运行速度越快,但同样更消耗CPU资源

3 Join优化

3.1 Hive中的Join方案

表的Join是数据分析处理过程中必不可少的操作,Hive同样支持Join的语法,Hive Join的底层
还是通过MapReduce来实现的,Hive实现Join时,为了提高MapReduce的性能,提供了多种Join
方案来实现,例如适合小表Join大表的Map Join,大表Join大表的Reduce Join,以及大表Join的优
化方案Bucket Join等。

3.2 Map Join

⚫ 应用场景
适合于小表join大表或者小表Join小表
⚫ 原理
Hive性能优化之计算Job执行优化
◼ 将小的那份数据给每个MapTask的内存都放一份完整的数据,大的数据每个部分都可以与
小数据的完整数据进行join
◼ 底层不需要经过shuffle,需要占用内存空间存放小的数据文件
⚫ 使用
◼ 尽量使用Map Join来实现Join过程
◼ Hive中默认自动开启了Map Join
– 默认已经开启了 Map Join
hive.auto.convert.join= true
◼ Hive中判断哪张表是小表及限制
LEFT OUTER JOIN的左表必须是大表
RIGHT OUTER JOIN的右表必须是大表
INNER JOIN左表或右表均可以作为大表
FULL OUTER JOIN不能使用MAPJOIN
MAPJOIN支持小表为子查询
使用MAPJOIN时需要引用小表或是子查询时,需要引用别名
在MAPJOIN中,可以使用不等值连接或者使用OR连接多个条件
在MAPJOIN中最多支持指定6张小表,否则报语法错误
◼ Hive中小表的大小限制
– 2.0 版本之前的控制属性
hive.mapjoin.smalltable.filesize=25M
– 2.0 版本开始由以下参数控制
hive.auto.convert.join.noconditionaltask.size=512000000

3.3 Reduce Join

⚫ 应用场景
适合于大表Join大表
⚫ 原理
Hive性能优化之计算Job执行优化
◼ 将两张表的数据在shuffle阶段利用shuffle的分组来将数据按照关联字段进行合并
◼ 必须经过shuffle,利用Shuffle过程中的分组来实现关联
⚫ 使用
◼ Hive会自动判断是否满足Map Join,如果不满足Map Join,则自动执行Reduce Join

3.4 Bucket Join

⚫ 应用场景
适合于大表Join大表
⚫ 原理
Hive性能优化之计算Job执行优化
◼ 将两张表按照相同的规则将数据划分,根据对应的规则的数据进行join,减少了比较次数,
提高了性能
⚫ 使用
◼ Bucket Join
语法:clustered by colName
参数
– 开启分桶 join
set hive.optimize.bucketmapjoin = true;
要求
分桶字段 = Join字段 ,桶的个数相等或者成倍数
◼ Sort Merge Bucket Join(SMB):基于有序的数据Join
语法:clustered by colName sorted by (colName)
参数
– 开启分桶 SMB join
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join= true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask= true;
要求
分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数

4 优化器

4.1 关联优化

在使用Hive的过程中经常会遇到一些特殊的问题,例如当一个程序中如果有一些操作彼此之间
有关联性,是可以放在一个MapReduce中实现的,但是Hive会不智能的选择,Hive会使用两个
MapReduce来完成这两个操作。
例如:当我们执行以下SQL语句:
select …… from table group by id order by id desc;
该SQL语句转换为MapReduce时,我们可以有两种方案来实现:
⚫ 方案一
◼ 第一个MapReduce做group by,经过shuffle阶段对id做分组
◼ 第二个MapReduce对第一个MapReduce的结果做order by,经过shuffle阶段对id
进行排序
⚫ 方案二
◼ 因为都是对id处理,可以使用一个MapReduce的shuffle既可以做分组也可以排序
在这种场景下,Hive会默认选择用第一种方案来实现,这样会导致性能相对较差,我们可以在
Hive中开启关联优化,对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce中实现。
⚫ 配置
set hive.optimize.correlation= true;

4.2 CBO优化器引擎

在使用MySQL或者Hive等工具时,我们经常会遇到一个问题,默认的优化器在底层解析一些
聚合统计类的处理的时候,底层解析的方案有时候不是最佳的方案。
例如:当前有一张表【共1000条数据】,id构建了索引,id =100值有900条,我们现在的需
求是查询所有id = 100的数据,所以SQL语句为:select * from table where id = 100;
由于id这一列构建了索引,索引默认的优化器引擎RBO,会选择先从索引中查询id = 100的值
所在的位置,再根据索引记录位置去读取对应的数据,但是这并不是最佳的执行方案。有id=100
的值有900条,占了总数据的90%,这时候是没有必要检索索引以后再检索数据的,可以直接检索
数据返回,这样的效率会更高,更节省资源,这种方式就是CBO优化器引擎会选择的方案。
使用Hive时,Hive中也支持RBO与CBO这两种引擎,默认使用的是RBO优化器引擎。
⚫ RBO
◼ rule basic optimise:基于规则的优化器
◼ 根据设定好的规则来对程序进行优化
⚫ CBO
◼ cost basic optimise:基于代价的优化器
◼ 根据不同场景所需要付出的代价来合适选择优化的方案
◼ 对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案
是最佳方案
很明显CBO引擎更加智能,所以在使用Hive时,我们可以配置底层的优化器引擎为CBO引擎。
⚫ 配置
set hive.cbo.enable= true;
set hive.compute.query.using.stats= true;
set hive.stats.fetch.column.stats= true;
set hive.stats.fetch.partition.stats= true;
⚫ 要求
◼ 要想使用CBO引擎,必须构建数据的元数据【表行数、列的信息、分区的信息……】
◼ 提前获取这些信息,CBO才能基于代价选择合适的处理计划
◼ 所以CBO引擎一般搭配analyze分析优化器一起使用

4.3 Analyze分析优化器

⚫ 功能
用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、
分区信息、列的信息】,搭配CBO引擎一起使用
⚫ 语法
– 构建分区信息元数据
ANALYZE TABLE tablename
[ PARTITION(partcol1[=val1], partcol2[=val2], …)]
COMPUTE STATISTICS [noscan];
– 构建列的元数据
ANALYZE TABLE tablename
[ PARTITION(partcol1[=val1], partcol2[=val2], …)]
COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2…) [noscan];
– 查看元数据
DESC FORMATTED [tablename] [columnname];
⚫ 举例
◼ 构建表中分区数据的元数据信息
ANALYZE TABLE tb_login_part PARTITION(logindate) COMPUTE STATISTICS;
◼ 构建表中列的数据的元数据信息
ANALYZE TABLE tb_login_part COMPUTE STATISTICS FOR COLUMNS userid;
◼ 查看构建的列的元数据
desc formatted tb_login_part userid;
Hive性能优化之计算Job执行优化

5 谓词下推(PPD)

5.1 基本思想

谓词下推 Predicate Pushdown(PPD)的思想简单点说就是在不影响最终结果的情况下,尽
量将过滤条件提前执行。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据
在集群上传输的量,降低了Reduce端的数据负载,节约了集群的资源,也提升了任务的性能。

5.2 基本规则

⚫ 开启参数
– 默认自动开启谓词下推
hive.optimize.ppd= true;
⚫ 不同Join场景下的Where谓词下推测试
Hive性能优化之计算Job执行优化
⚫ 试验结论
Hive性能优化之计算Job执行优化
◼ Inner Join和Full outer Join,条件写在on后面,还是where后面,性能上面没有区别
◼ Left outer Join时 ,右侧的表写在on后面,左侧的表写在where后面,性能上有提高
◼ Right outer Join时,左侧的表写在on后面、右侧的表写在where后面,性能上有提高
◼ 如果SQL语句中出现不确定结果的函数,也无法实现下推

6 数据倾斜

6.1 数据倾斜的现象

分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是,当我们提交运
行一个程序时,我们通过监控发现,这个程序的大多数的Task都已经运行结束了,只有某一个Task
一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候我们就可以判定程序出
现了数据倾斜的问题。

6.2 数据倾斜的原因

表面上看,发生数据倾斜的原因在于这个Task运行过慢,但是仔细分析我们会发现,这个Task
运行过慢的原因在于这个Task的负载要比其他Task的负载要高,所以发生数据倾斜的直观原因在于
Task的数据分配不均衡。
那为什么会出现多个Task数据分配不均衡的情况呢?
从两方面考虑,第一:数据本身就是倾斜的,数据中某种数据出现的次数过多。第二:分区规
则导致这些相同的数据都分配给了同一个Task,导致这个Task拿到了大量的数据,而其他Task拿到
的数据比较少,所以运行起来相比较于其他Task就比较慢一些。
综上所述,产生数据倾斜的根本原因在于分区规则。Hive性能优化之计算Job执行优化

6.3 group By的数据倾斜

当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的根
据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。根本原因是因为分区规则导致的,
所以我们可以通过以下几种方案来解决group by导致的数据倾斜的问题。
⚫ 方案一:开启Map端聚合
– 开启 Map 端聚合:Combiner
hive.map.aggr= true;
◼ 通过减少Reduce的输入量,避免每个Task数据差异过大导致数据倾斜
⚫ 方案二:实现随机分区
– SQL 中避免数据倾斜,构建随机分区
select * from table distribute by rand();
◼ distribute by用于指定底层的MapReduce按照哪个字段作为Key实现分区、分组等
◼ 默认由Hive自己选择,我们可以通过distribute by自己指定,通过rank函数随机值实
现随机分区,避免数据倾斜
⚫ 方案三:自动构建随机分区并自动聚合
– 开启随机分区,走两个 MapReduce
hive.groupby.skewindata= true;
◼ 开启该参数以后,当前程序会自动通过两个MapReduce来运行
◼ 第一个MapReduce自动进行随机分区,然后实现聚合
◼ 第二个MapReduce将聚合的结果再按照业务进行处理,得到结果

6.4 Join的数据倾斜

实际业务需求中往往需要构建两张表的Join实现,如果两张表比较大,无法实现Map Join,
只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join
产生的数据倾斜,我们核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现,但
往往很多的Join场景不满足Map Join的需求,那么我们可以以下几种方案来解决Join产生的数据倾
斜问题:
⚫ 方案一:提前过滤,将大数据变成小数据,实现Map Join
实现两张表的Join时,我们要尽量考虑是否可以使用Map Join来实现Join过程。有些
场景下看起来是大表Join大表,但是我们可以通过转换将大表Join大表变成大表Join小表,
来实现Map Join。
例如:现在有两张表订单表A与用户表B,需要实现查询今天所有订单的用户信息,
关联字段为userid。
A表:今天的订单,1000万条,字段:orderId,userId,produceId,price等
B表:用户信息表,100万条,字段:userid,username,age,phone等
◼ 需求:两张表关联得到今天每个订单的用户信息
◼ 实现1:直接关联,实现大表Join大表
select a. * ,b. * from A join B on a.userid = b.userid;
由于两张表比较大,无法走Map Join,只能走Reduce Join,容易产生数据倾斜。
◼ 实现2:将下了订单的用户的数据过滤出来,再Join

select a. * ,d. *
from (
-- 获取所有下订单的用户信息
select
b. *
from
-- 获取所有下订单的 userid
( select distinct a.userid from A a ) c join B b on c.userid =
b.userid ) d
join
A a on d.userid = a.userid;

◼ 100万个用户中,在今天下订单的人数可能只有一小部分,大量数据是不会
Join成功的
◼ 可以提前将订单表中的userid去重,获取所有下订单的用户id
◼ 再使用所有下订单的用户id关联用户表,得到所有下订单的用户的信息
◼ 最后再使用下订单的用户信息关联订单表
◼ 通过多次Map Join来代替Reduce Join,性能更好也可以避免数据倾斜
⚫ 方案二:使用Bucket Join
◼ 如果使用方案一来避免Reduce Join ,有些场景下依旧无法满足,例如过滤后的数据
依旧是一张大表,那么最后的Join依旧是一个Reduce Join
◼ 这种场景下,我们可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数
据倾斜
⚫ 方案三:使用Skew Join
Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程,这种Join的
原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数
据倾斜的数据单独使用Map Join来实现,其他没有产生数据倾斜的数据由Reduce Join来
实现,这样就避免了Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和
Reduce Join的结果进行Union合并
◼ 配置
– 开启运行过程中 skewjoin
set hive.optimize.skewjoin= true;
– 如果这个 key 的出现的次数超过这个范围
set hive.skewjoin.key=100000;
– 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoin.compiletime= true;
– 不合并,提升性能
set hive.optimize.union.remove= true;
– 如果 Hive 的底层走的是 MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive= true;
◼ 示例图
Hive性能优化之计算Job执行优化

上一篇:看一眼肉的光泽和脂肪分层就行?日本APP正在用AI给金枪鱼分级


下一篇:还没搞懂人工智能吧,要不,让图灵“亲自”给你讲讲?