1.Hive压缩
1.1概述
当前的大数据环境下,机器性能好,节点更多,但并不代表我们无条件直接对数据进行处理,在某些情况下,我们依旧需要对数据进行压缩处理,压缩处理能有效减少存储系统的字节读取数,提高网络带宽和磁盘空间的效率。
Hive相当于Hadoop的客户端,Hive和Hadoop都可以进行压缩操作。
Hive压缩体现在数据文件的最终存储格式是否启用压缩;
压缩使用得当会提高性能,运用不当也会降低性能。
1.2条件
why:压缩减少系统读取字节数,提高网络宽带和磁盘空间效率;
缺点:使用文件时需要解压,加重CPU负载,算法复杂度决定解压时间;
条件:空间和CPU资源充裕;
技术:
有损压缩(LOSSY COMPRESSION):有数据丢失,场景:视频数据;
无损压缩(LOSSLESS COMPRESSION):无数据丢失,场景:日志数据;
对称和非对称:
对称:压缩和解压时间一样
非对称:压缩和解压时间不相同,有多种情况:
压缩时间长,解压时间短;
压缩时间短,解压时间长;
1.3基本原则:
计算密集型作业少用压缩;
计算密集需要大量CPU资源,解压需要CPU资源,加重CPU负载;
IO密集型作业多用压缩;
CPU消耗少,IO操作多
重点记住:Gzip,Bzip2,Snappy,Zstd
2.Hive存储
2.1存储方式
OLTP:联机事务处理
OLAP:联机分析处理
2.2OLTP事务处理
传统关系型数据库的主要应用;
计算与存储分开,用户发送请求事件,事务处理从关系型数据库中查询并返回。
实时性好,请求及时处理;
问题是并发性低,连表代价大;
一般使用行式存储,要求实时性;
2.3OLAP分析处理
分布式数据库的主要应用;
将事务数据库数据复制到数据仓库,然后对数据进行查询和分析,帮助决策;
实时性要求不高;
使用列式存储,要求性能好;
2.4行式存储
查询时返回表的一行,方便插入、更新操作,数量小时性能良好;
查询指定列数据时可能会返回大量无用数据,数据量较大时,效率急剧降低,,还有就是由于每一行中,列的数据类型不一致,导致不容易获得一个极高的压缩比,也就是空间利用率不高。
2.5列式存储
查询时返回表的一列,同一列中的数据属于同一类型,压缩算法*,压缩效果显著,压缩比高,查询指定列时不会出现大量无用数据;
插入、更新操作不方便,不小量数据。
2.6存储格式
Text File:默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销大。
仅在需要从 Hadoop 中直接提取数据,或直接从文件中加载大量数据的情况下,才建议使用纯文本格式或 CSV。
优点:易读、轻量;
缺点:读写速度慢,不支持块压缩,无法切分压缩文件
Sequence File:
Sequence 最初是为 MapReduce 设计的,因此和 MapReduce 集成很好。在 Sequence File 中,每个数据都是以一个 Key和一个 Value 进行序列化存储的,内部使用了 Hadoop 的标准的 Writable 接口实现序列化和反序列化。Sequence File 中的数据是以二进制格式存储的,这种格式所需的存储空间小于文本的格式。
Sequence File 是 Hadoop API 提供的一种支持二进制格式存储,存储方式为行存储,其具有使用方便、可分割、可压缩的特点。Sequence File 支持三种压缩选择:NONE,RECORD,BLOCK。Record 压缩率低,一般建议使用 BLOCK 压缩。
优点:
与文本文件相比更紧凑,支持块级压缩。
压缩文件内容的同时,支持将文件切分。
序列文件在 Hadoop 和其他支持 HDFS 的项目兼容很好,例如:Spark。
让我们摆脱文本文件迈出的第一步。
可以作为大量小文件的容器。
缺点:
对于具有 SQL 类型的 Hive 支持不好,需要读取和解压缩所有字段。
不存储元数据,并且对 Schema 扩展的唯一方式是在末尾添加新字段。
Map File:
Map File 是排序后的 Sequence File,Map File 由两部分组成,分别是 data 和 index。
Sequence File 的优缺点就是 Map File 的优缺点。
Avro File:
Apache Avro 是与语言无关的序列化系统,Avro 是基于行的存储格式,Avro 是一种自描述格式。
Avro 的模式( Schema) 是用 JSON 定义的, 这有助于在已经具有 JSON 库的语言中实现 Avro 数据的读写操作。 Avro
支持丰富的数据结构类型, 包括但不限于 map、 union 和 fixed 等, 这些数据结构类型在 JSON 中并不直接支持, 而是通过
Avro 的二进制格式实现。 因此, 虽然 Avro 的 Schema 定义使用了 JSON, 但其核心的数据交换格式是二进制。
优点:
Avro 是与语言无关的数据序列化系统,以 JSON 格式存储 Schema ,使其易于被任何程序读取和解释。
Avro 格式是 Hadoop 的一种基于行的存储格式,被广泛用作序列化平台。
数据本身以二进制格式存储,使其在 Avro 文件中紧凑且高效。
序列化和反序列化速度很快。
Avro 文件是可切分的、可压缩的,非常适合在 Hadoop 生态系统中进行数据存储。
缺点:
行式存储效率较低。例如:读取 15 列中的 2 列数据,基于行式存储就需要读取数百万行的 15 列。而列式存储只需要读取这 2 列即可。
列式存储因为是将同一列(类)的数据存储在一起,压缩率要比行式存储高。
RC File:
基于MR,结合行列存储优势(先水平后垂直划分数据),保证同一行数据位于同一节点;
元组重构成本低,可跳过不必要的列读取;
RC File 是由二进制键/值对组成的 flat 文件,它与 Sequence File 有许多相似之处。在数仓中执行分析时,这种面向列的存储非常有用。
优点:
基于列式存储,更好的压缩比;
利用元数据存储来支持数据类型;
支持Split。
缺点:
RC File 不支持 Schema 扩展,如果要添加新的列,则必须重写文件,这会降低操作效率。
ORC File(Hive主推):
Apache ORC(Optimized Row Columnar,优化行列)是 Apache Hadoop 生态系统面向列的开源数据存储格式,它与Hadoop 环境中的大多数计算框架兼容。ORC 代表“优化行列”,它以比 RC 更为优化的方式存储数据,提供了一种非常有效的方式来存储关系数据,然后存储 RC 文件。ORC 将原始数据的大小最多减少 75%,数据处理的速度大大提高。
优点:
比 Text File,Sequence File 和 RC File 具备更好的的性能。
列数据单独存储。
带类型的数据存储格式,使用类型专用的编码器。
轻量级索引。
缺点:
与 RC 文件一样,ORC 也是不支持列扩展的。如果要添加新的列,则必须重写文件,这会降低操作效率。
Parquet File(最通用):
Parquet File 是另一种列式存储的结构,由 Twitter + Cloudera 开源,被 Hive、SparkDrill、Impala、Pig 等支持,目前已成为 Apache 的*项目。和 ORC File 一样,Parquet 也是基于列的二进制存储格式,可以存储嵌套的数据结构。当指定要使用列进行操作时,磁盘输入/输出操效率很高。Parquet 与 Cloudera Impala 兼容很好,并做了大量优化。Parquet 还支持块压缩。与 RC 和 ORC 文件不同的是,Parquet Serdes 支持有限的 Schema 扩展。在 Parquet 中,可以在结构的末尾添加新列。
优点:
和 ORC 文件一样,它非常适合进行压缩,具有出色的查询性能,尤其是从特定列查询数据时,效率很高。
缺点:
与 RC 和 ORC 一样,Parquet 也具有压缩和查询性能方面的优点,与非列文件格式相比,写入速度通常较慢。
3.Hive优化
3.1EXPLAIN 执行计划
通过查看explain执行计划可用了解到SQL语句的执行过程是怎样的,进而对SQL语句进行优化。
3.2SQL优化
3.2.1RBO优化
Rule-Based Optimization,简称 RBO:基于规则优化的优化器,是一种经验式、启发式的优化思路,优化规则都已经预先定义好了,只需要将 SQL 往这些规则上套即可。简单的说,RBO 就像是一个经验丰富的老司机,基本优化套路全都知道。例如谓词下推、列裁剪、常量折叠等。
谓词下推:
将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据。在文件格式使用 Parquet 或 ORC 时,甚至可能整块跳过不相关的文件。
如select * from t1 join t2 where t1.id<100;
优化为:select * from
(select * from t1 where t1.id<100) T1 join
(select * from t2 where t2.id<100) T2
using(id);
解决了t2.i>100那部分的数据连接,因为t1.id<100,t2.id大于100的部分根本连上。
列裁剪(投影下推):
表示扫描数据源的时候,只读取那些与查询相关的字段。
如 select t1.id,1+2+t1.value as v
from t1,t2
where t1.id=t2.id
and t2.id<100;
优化为:
select t1.id,1+2+t1.value as v
from t1,(select t2.id from t2) t2
where t1.id=t2.id
and t2.id<100;
常量折叠:
如 select t1.id,1+2+t1.value as v
from t1,t2
where t1.id=t2.id
and t2.id<100;
优化为:
select t1.id,3+t1.value as v
from t1,(select t2.id from t2) t2
where t1.id=t2.id
and t2.id<100;
3.2.2CBO 优化
CBO(Cost-Based Optimization)意为基于代价优化的策略,它需要计算所有可能执行计划的代价,并挑选出代价最小的执行计划。
RBO 就像一个老司机,基本优化套路全都知道。然而世界上有一种东西叫做——不按套路出牌。
3.2.3JOIN优化
Map Join:顾名思义,就是在 Map 阶段进行表之间的连接。而不需要进入到 Reduce 阶段才进行连接。这样就节省了在 Shuffle 阶段时要进行的大量数据传输。从而起到了优化作业的作用。
实现原理:Map Join 会把小表全部读入内存中,在 Map 阶段直接拿另外一个表的数据和内存中的表数据做匹配,由于在 Map 阶段进行了 JOIN 操作,底层不需要经过 Shuffle,这样就不会由于数据倾斜导致某个 Reduce 上落数据太多而失败,但是需要占用内存空间存放小表数据。
Reduce Join(Shuffle Join):
Map 端的主要工作:为来自不同表或文件的数据,进行打标签以区别不同来源的记录。然后用 JOIN 字段作为 Key,其余部分数据和新加的标志作为 Value,最后进行输出。
Reduce 端的主要工作:在 Reduce 端以 JOIN 字段作为 Key 完成分组,接下来只需要在每一个分组中,将那些来源于不同 Map 端的文件的记录进行合并即可。
Bucket Map Join :
大表对小表应该使用 Map Join 来进行优化,但是如果是大表(相较于 Reduce Join 大表的数据更大)对大表(相较于 Reduce Join 大表的数据更大),如果进行 Shuffle,那就非常怕,第一个慢不用说,第二个容易出异常。此时就可以使用 Bucket Join 来进行优化,而 Bucket Join 又分为:
Bucket Map Join
Sort Merge Bucket Join(SMB Join)
具体使用流程如下:
将两张大表的数据构建分桶
数据按照分桶的规则拆分到不同的文件中
分桶规则 = MapReduce 分区的规则 = Key 的 Hash 取余
Key = 分桶的字段
只需要实现桶与桶的 JOIN 即可,减少了比较次数
分桶本质:底层 MapReduce 的分区,桶的个数 = Reduce 个数 = 文件个数。
SMB Join:
SMB Join 是基于 Bucket Map Join 的有序 Bucket,可实现在 Map 端完成 JOIN 操作,只要桶内的下一条不是,就不用再比较了,有效地减少或避免 Shuffle 的数据量。
SMB Join 的条件和 Map Join 类似但又不同:
所有要 JOIN 的表必须分桶,如果表不是 Bucket 的,则只是做普通 JOIN
大表的 Bucket 数 = 大表的 Bucket 数
Bucket 列 == JOIN 列 == SORT 列
必须是应用在 Bucket Map Join 的场景中
3.2.4分区/分桶
大数据学习10之Hive高级-****博客
3.3数据倾斜
3.3.1定义
数据倾斜,即单个节点任务所处理的数据量远大于同类型任务所处理的数据量,导致该节点成为整个作业的瓶颈,这是分布式系统不可能避免的问题。
MapReduce 模型中,数据倾斜问题是很常见的,因为同一个 Key 的 Values,在进GroupByKey、CountByKey、ReduceByKey、Join 等操作时一定是分配到某一个节点上的一个 Task 中进行处理的,如果某个 Key 对应的数据量特别大的话,就会造成某个节点的堵塞甚至宕机的情况。在并行计算的作业中,整个作业的进度是由运行时间最长的那个 Task 决定的,在出现数据倾斜时,整个作业的运行将会非常缓慢,甚至会发生 OOM 异常。
3.3.2原因
读取的大文件不可分割;
任务需要处理大量相同的键;
3.3.3解决
压缩引发的数据倾斜:
采用支持文件分割的压缩算法;
单表聚合数据倾斜优化:
加盐去盐操作;
JOIN数据倾斜优化:
Map Join:在map阶段进行表的连接操作;
map阶段将小表读入内存,顺序扫描大表完成join
适用于小表join大表,小表足够小
解决方案:在小表 JOIN 大表时如果产生数据倾斜,那么 Map JOIN 可以直接规避掉 Shuffle 阶段(默认开启)。
大小表 Join:
采样倾斜key 并拆分join
适用于join时出现数据倾斜
将存在倾斜的表,根据抽样结果,拆分为倾斜 Key(Skew 表) 和没有倾斜 Key(Normal 表)的两个数据集。
将 Skew 表的 Key 全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集做笛卡尔乘积(即将数据量扩大 N 倍,得到 New 表)。
打散的 Skew 表 JOIN 扩容的 New 表,普通表 JOIN 未扩容的 Old 表,最后将这两部分结果 UNION ALL。
业务无关数据引发的数据倾斜:
实际业务中有些大量的 NULL 值(空 Key)或者一些无意义的数据参与到计算作业中,这些数据可能来自业务为了上报或因数据规范将某类数据进行归一化变成空值或空字符等形式,这些与业务无关的数据导致在进行分组聚合或者在执行表连接时发生数据倾斜。对于这类问题引发的数据倾斜,在计算过程中排除含有这类“异常”数据即可。
3.4聚合优化
3.4.1GROUP BY
例如在 student 表中,假设 age 有数据倾斜,可以通过开启 hive.groupby.skewindata 将作业拆解成两个作业,第一个作业会尽可能将 Map 的数据平均分发到不同的 Reduce,每个 Reduce 实现预聚合,这样的处理结果是相同的 Key 会被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个作业在第一个作业处理的数据基础上,按 Key 分发到 Reduce中,这个过程可以保证相同的 Key 会被分发到同一个 Reduce,完成最终聚合。
3.4.2ORDER BY
早排序,避免全局排序;
3.4.3COUNT(DISTINCT)
3.5资源优化
3.5.1向量化查询
多核并发执行操作;
3.5.2存储优化
HDFS小文件合并
3.5.3YARN 优化
配置每个容器请求被分配的最小内存。如果容器请求的资源小于该值,会以 1024MB 进行分配;如果 NodeManager 可被分配的内存小于该值,则该 NodeManager 将会被 ResouceManager 给关闭。默认值 1024MB。
3.5.4并行执行
Hive 在实现 HQL 计算运行时,会解析为多个 Stage,有时候 Stage 彼此之间有依赖关系,只能挨个执行。但有些时候,很多 Stage 之间是没有依赖关系的,例如 UNION 语句,JOIN 语句等等,这些 Stage 没有依赖关系,但是 Hive 依旧会挨个执行每个 Stage,这样会导致性能非常的差。我们可以通过修改参数,开启并行执行,当多个 Stage 之间没有依赖关系时,允许多个 Stage 并行执行,提高性能。
需要注意的是,在共享集群中,如果 Job 中并行阶段增多,那么集群利用率就会增加。系统资源比较空闲的时候才会有优势,如果系统资源比较吃紧,没资源意味着并行也无法启动。
3.5.5 JVM 重用
当一个 Task 运行结束以后,JVM 不会进行释放,而是继续供下一个 Task 运行,直到运行了 N 个 Task 以后,就会释放。
3.6Job优化
3.6.1 Map 优化
减少或增加MapTask数量,即改变块(不是HDFS的块)的大小,
增大 改minSize参数
减小 改maxSize参数
3.6.2Reduce 优化
3.6.3Shuffle 优化
目的:压缩中间数据,从而减少磁盘操作以及减少网络传输数据量。参考Hive压缩/存储
3.7其它优化
3.7.1Fetch 模式
当我们执行一个简单的 HQL 语句时,例如 SELECT * FROM emp; ,发现数据可以很快的返回,这其实涉及到 Fetch抓取的概念。相关配置为:
该配置默认值为 more ,如果设置为 none ,那么每次执行 HQL 都会执行 MapReduce 程序。
3.7.2多重模式
3.7.3关联优化
当一个程序中有一些操作彼此之间有关联性的时候,是可以在一个 MapReduce 中完成的,但是 Hive 会不智能的选择使用两个 MapReduce 来完成这两个操作。
3.7.4本地模式
3.7.5严格模式
分区表不允许全局查询了;
3.7.6推测执行
老师叫班长去办公室拿作业,但班长去了十分钟还没回来,所以老师决定再派副班长去拿。
推测执行(Speculative Execution)是指在集群环境下运行 MapReduce,可能是程序 Bug,负载不均或者其他的一些问题,导致在一个 Job 下的多个 Task 速度不一致,比如有的任务已经完成,但是有些任务可能只跑了 10%,根据木桶原理,这些任务将成为整个 Job 的短板,如果集群启动了推测执行,这时为了最大限度的提高短板,Hadoop 会为该 Task 启动备份任务,让 Speculative Task 与原始 Task 同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果,并且在运行完成后 Kill 掉另外一个任务。