Hive数据倾斜问题、优化问题、架构问题总结

Hive

以下复习内容包括架构、与MySql数据库的比较、四种By、小数据问题、两种常见数据倾斜问题和讲解、Hive的简单优化;
Hive的架构

Hive都是用别人来存东西,自己一点都不存,只负责翻译HQL成MR程序;
Hive数据倾斜问题、优化问题、架构问题总结

//客户端Client:
  CLI(command-line interface)、JDBC/ODBC(jdbc访问hive)、WEBUI(浏览器访问hive)

//Sql Parser和Physical Plan
  把HQL语句翻译成对应的MR任务

//元数据Meta Store
  元数据包括:表所属数据库(默认是default)、表的拥有者、表名及表的注释、字段及字段的注释、列/分区字段、表的类型(是否是外部表)、表数据所在目录(Hive表和HDFS的路径的映射)等,而表里面具体的内容则在HDFS里;
  元数据默认存储在自带的derby数据库(小巧但是很多缺点,比如不支持并发连接,可以理解为轻量级的MySQL数据库)中,**一般都采用MySQL存储Metastore**(即换成用MySQL来存元数据,不用Derby)。
  用户输入指令后,客户端根据建表时的映射关系,找到对应的表文件位置;
      
//结合Hadoop:
  使用HDFS进行存储,使用MapReduce进行计算;
      
//驱动器:Driver(记住)
(1)解析器(SQL Parser):将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。
(2)编译器(Physical Plan):将AST编译生成逻辑执行计划。
(3)优化器(Query Optimizer):对逻辑执行计划进行优化,比如大小表的Join问题。
(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。(驱动器将SQL命令转化为对应的MapReduce程序,生成相应jar包,再结合之前文件的位置,运行MapReduce程序)      

HIVE和数据库比较(面试)

Hive不是数据库!他只是框架!!只是用法比较像,因为95%以上SQL语句都封装了MR程序,其实没什么可比性,只是语句很像。

1、查询语言
	由于SQL被广泛的应用在数据仓库中,因此,专门针对Hive的特性设计了类SQL的查询语言HQL。熟悉SQL开发的开发者可以很方便的使用Hive进行开发。

2、数据更新
	由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不建议对历史数据的改写,所有的数据都是在加载的时候确定好的。而数
据库中的数据通常是需要经常进行修改的,因此可以使用 INSERT INTO … VALUES 添加数据,使用 UPDATE … SET修改数据,虽然HQL也可以但是这样会慢,原理是先
下载修改后上传。

3、执行延迟
	Hive 在查询数据的时候,由于没有索引,需要扫描整个表(如果没有分区分桶,则都是暴力扫描,复杂度都是ALL),因此延迟较高。
	另外一个导致 Hive 执行延迟高的因素是 MapReduce框架。由于MapReduce 本身具有较高的延迟,因此在利用MapReduce 执行Hive查询时,也会有较高的延
迟。相对的,数据库的执行延迟较低。当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。

4、数据规模
	由于Hive建立在集群上并可以利用MapReduce进行并行计算,因此可以支持很大规模的数据;对应的,数据库可以支持的数据规模较小,能用MySQL算的就不要用
Hive了。Hive可以部署在三个框架上( Apache Hadoop MapReduce, Apache Tez or Apache Spark)

Order by、Sort by、Distribute by、Cluster by比较:

//order by:全局排序,只有一个Reducer;(ASC、DESC)

//sort by:区内排序;单独用时会**随机**分区;sort by会根据数据量的大小启动一到多个reducer来干活,并且,它会在进入reduce之前为每个reducer都产生一个排序文件。这样的好处是提高了全局排序的效率。(每个reduce方法内排序)

//distribute by:根据具体对象进行分区,一般和sort by连用;对这个函数进行测试时,要分配多几个reduce进行处理,否则无法看到distribute by的效果;
(distribute by 控制map结果的分发,它会将具有相同字段的map输出分发到一个reduce节点上做处理);

比如:set mapreduce.jon.reduce =4;

distribute by会根据reduce的数目,结合by后面的列,做一个hash值处理,然后进行分区;
	因为distribute  by  通常和sort  by 一起用,所以当distribute by 遇上 sort by时,distribute by要放在前面,这个不难理解,因为要先通过
distribute by 将待处理的数据从map端做分发,这样,sort by 这个擅长局部排序的才能去放开的干活。不然要是没有distribute  by的分发,那么sort  by 将
要处理全部的数据,即全局排序,这不是sort  by的活,这样做只能拖慢集群工作效率。

//cluster by:当distribute by和sort by连用且后面的列相同时,等价于用cluster by;(默认升序排列,不能指定ASC或是DESC);

分区和分桶:

  • Hive的分区使用HDFS的子目录功能实现。每一个子目录包含了分区对应的列名和每一列的值(列名可能不止一个)
  • 分桶表是将数据集分解成更容易管理的若*分的另一个计数:进行hash,并用hash结果除以桶的个数做取余运算的方式来分桶,保证了每个桶中都有数据,但每个桶中的数据条数不一定相等。
分桶随机分割数据库,分区是非随机分割数据库。因为分桶是按照列的哈希函数进行分割的,相对比较平均;而分区是按照列的值来进行分割的,容易造成数据倾斜。
分桶是对应不同的文件(细粒度),分区是对应不同的文件夹(粗粒度)。桶是更为细粒度的数据范围划分,分桶的比分区获得更高的查询处理效率,使取样更高效。

UDF和UDTF:
自定义 UDF:继承 UDF,重写 evaluate 方法,return结果;

自定义 UDTF:继承自 GenericUDTF,重写 3 个方法:initialize(自定义输出的列名和类型),process(将结果返回 forward(result)),close。

//如果要用UDTF函数返回多行数据,经常性做法使用for循环的方式去循环调用 forward(result),因为每次 forward(result)都是一行数据输出。

Hive一些简单优化:

小文件解决方案

(1)在 Map 执行前合并小文件,减少 Map 数:CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。

(2)merge

// 输出合并小文件
SET hive.merge.mapfiles = true; -- 默认 true,在 map-only 任务结束时合并小文件

SET hive.merge.mapredfiles = true; -- 默认 false,在 map-reduce 任务结束时合并小文件

SET hive.merge.size.per.task = 268435456; -- 默认 256M

SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于 16m 该值时,启动一个独立的 map-reduce 任务进行文件 merge

(3)开启 JVM重用

set mapreduce.job.jvm.numtasks=10

简单理解JVM重用:Hadoop的默认配置通常是派生JVM来执行map和reduce任务的,这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job任务包含有成百上千个task任务的情况,JVM重用可以使得JVM实例在同一个job中重新使用N次。所以解决了小文件过大造成JVM启动耗费过多资源的问题。

缺点:被占用的task插槽会被已经占用,知道任务完成,所以先完成task的插槽虽然处于空闲状态,但并不能使用,产生了资源浪费;

Hive的索引使用

0.8.0版本后增加了一个bitmap索引,适用于值较少的列;一般hive是不用索引的;
Hive中的索引和MySql等数据库的索引也是不同的,是没有主键的,而且数据发生变动后要手动重建;

开启中间压缩

设置 map 端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了 IO 读写和网络传输,能提高很多效率)

合理设置 Map

mapred.min.split.size: 指的是数据的最小分割单元大小;min 的默认值是 1B
mapred.max.split.size: 指的是数据的最大分割单元大小;max 的默认值是 256MB
通过调整 max 可以起到调整 map 数的作用,减小 max 可以增加 map 数,增大 max 可以减少 map 数。
需要提醒的是,直接调整 mapred.map.tasks 这个参数是没有效果的。

合理设置 Reduce

Reduce 个数并不是越多越好
(1)过多的启动和初始化 Reduce 也会消耗时间和资源;
(2)另外,有多少个 Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置 Reduce 个数的时候也需要考虑这两个原则:处理大数据量利用合适的 Reduce 数;使单个 Reduce 任务处理数据量大小要合适;

数据倾斜问题的解决:

什么是数据倾斜?简单理解,就是reducer阶段中,不同的reduce task内的数据量差别很大。

常见容易出现数据倾斜的操作?

数据倾斜可能会发生在group过程和join过程。

//发生在group过程:
当按照类型进行group by的时候,会将相同的group by字段的reduce任务需要的数据拉取到同一个节点进行聚合,而当其中每一组的数据量过大时,会出现其他组的
计算已经完成而这里还没计算完成,其他节点的一直等待这个节点的任务执行完成,所以会看到一直map 100% reduce 99%的情况。简单理解就是某些key字段的数据
量过于庞大;
    //解决方法:一、开启数据倾斜时负载均衡
set hive.groupby.skewindata=true;
思想:就是先随机分发并处理,再按照 key group by 来分发处理。
操作:当选项设定为 true,生成的查询计划会有两个 MRJob。
	第一个 MRJob 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 GroupBy Key 有
可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;
	第二个 MRJob 再根据预处理的数据结果按照 GroupBy Key 分布到 Reduce 中(这个过程可以保证相同的原始GroupBy Key 被分布到同一个 Reduce 中),最
后完成最终的聚合操作。
	它使计算变成了两个 mapreduce,先在第一个中在 shuffle 过程 partition 时随机给 key 打标记,使每个 key随机均匀分布到各个 reduce 上计
算,但是这样只能完成部分计算,因为相同 key 没有分配到相同 reduce 上。所以需要第二次的 mapreduce,这次就回归正常 shuffle,但是数据分布不均匀的问题
在第一次 mapreduce 已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次 mr 中随机分布到各个节点完成。
    
    //解决方法:二:在Map端做预聚合,提前把数据不均匀的问题做一个优化;
    

发生在join过程:

如何去理解join过程的数据倾斜现象呢?
    //这里先考虑大小表Join的问题;
    这里我推荐一篇博客给大家做一个参考:https://blog.csdn.net/leying521/article/details/93178185?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522162713261816780357264700%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=162713261816780357264700&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_v2~rank_v29-1-93178185.pc_search_result_cache&utm_term=hive%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C&spm=1018.2226.3001.4187

	总结下来就是这样一句话:把重复关联键少的表放在join前面做关联可以提高join的效率。写在关联左侧的表每有1条重复的关联键时底层就会多1次运算处理。
	所以在《Hive编程指南》书中108面,提高了一个解决方法:map-side JOIN;这个过程的思路和之前MapReduce中的mapJoin的思路类似:把小表放在内存中,然后大
表的每条记录再去内存中做检测,最终完成关联查询。但Hive中的小表Join大表的解释和MapReduce中的不尽相同;
	简单理解,两个表做Join的时候,关联键相同的记录会放入一个value list中,同时保证join左边的表的记录在value list的前面,而join右边的表的记录在
value list的后面。 
这里可以举一个例子:
     假设A表id=3有2条记录,B表id=3有5条记录。首先读取v[0]发现是A表的记录,用了1次读取操作。然后再读取v[1]发现依然是A表的记录,累计用了2次读取操
作。然后读取v[2]发现是B表的记录,此时v[0]和v[2]可以直接关联输出,累计用了3次操作。接下来v[0]可以依次和v[3]~v[6]进行关联输出,累计用了7次操作。接
下来v[1]再依次和v[2]~v[6]进行关联输出,累计用了12次操作。 
     把上面的例子调过来,假设A表id=3有5条记录,B表id=3有2条记录。先读取v[0]发现是A表的记录,用了1次读取操作。然后再读取v[1]发现依然是A表的记录,
 累计用了2次读取操作。以此类推,读取到v[4]发现依然是A表的记录,累计用了5次读取操作。接下来读取v[5],发现是B表的记录,此时v[0]和v[5]可以直接关联输
出,累计用了6次操作。然后v[0]和v[6]进行关联输出,累计用了7次操作。然后v[1]分别与v[5]、v[6]关联输出,累计用了9次操作。V[2] 分别与v[5]、v[6]关
联输出,累计用了11次操作。以此类推,最后v[4] 分别与v[5]、v[6]关联输出,累计用了15次操作。 
     总结下来,把A表放在前面会少3次操作;
     如果A表是3条,B表是1千万条,那差别就更大了;如果B表在前,所有的B表的记录会被先读取,然后去和A一个一个的关联,这样B表的记录相当于读取了1千万
 次,而如果是A表在前就只用读取3次(后面的关联读取二者关联输出的次数是相同的)。
     //解决方法:小表 Join 大表而不是大表 Join 小表;
    

空值分布问题:

比如:在生产环境经常会有大量空值数据进入到一个 reducer 中去,导致数据倾斜。(比如其中一张表的多是空值或者0比较多,容易shuffle给一个reducer,造
成运行慢。)
这里提一下,为什么会分配个一个reducer,因为Map方法出来之后会经历一个getPartition方法,往往是按照hash值/分区数来的,很明显空值和0值得hash值
一样,所以会被分到一个分区,而一个分区就对应一个Reducer;
//解决办法:
自定义分区,将为空的 key 转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分到多个 Reducer。
注意:对于异常值如果不需要的话,最好是提前在 where 条件里过滤掉,这样可以使计算量大大减少。
上一篇:控制reduce端缓冲大小以避免OOM


下一篇:通俗易懂解释《什么是数据倾斜?》