@R星校长
Hive Lateral View、视图与索引
Hive Lateral View
- Lateral View 用于和 UDTF 函数(explode、split)结合来使用。
- 首先通过 UDTF 函数拆分成多行,再将多行结果组合成一个支持别名的虚拟表。
- 主要解决在 select 使用 UDTF 做查询过程中,查询只能包含单个 UDTF,不能包含其他字段、以及多个 UDTF 的问题
- 语法:
LATERAL VIEW udtf(expression) tableAlias AS columnAlias (',' columnAlias)
hive> select explode(likes) from person;
OK
col
lol
book
movie
......
hive> select id,explode(likes) from person;
FAILED: SemanticException [Error 10081]: UDTF's are not supported outside the SELECT clause, nor nested in expressions
select id,name,myCol1,myCol2,myCol3 from person
LATERAL VIEW explode(likes) myTable1 AS myCol1
LATERAL VIEW explode(address) myTable2 AS myCol2, myCol3;
例:
统计 person 表*有多少种爱好、多少个城市?
select count(distinct(myCol1)), count(distinct(myCol2)) from person
LATERAL VIEW explode(likes) myTable1 AS myCol1
LATERAL VIEW explode(address) myTable2 AS myCol2, myCol3;
Hive视图
和关系型数据库中的普通视图一样,hive 也支持视图
特点:
- 不支持物化视图
- 只能查询,不能做加载数据操作
- 视图的创建,只是保存一份元数据,查询视图时才执行对应的子查询
- view 定义中若包含了
ORDER BY/LIMIT
语句,当查询视图时也进行ORDER BY/LIMIT
语句操作,view 当中定义的优先级更高 - view 支持迭代视图
mysql 中支持视图删除:
CREATE VIEW v_users AS SELECT * FROM myusers;
DELETE FROM v_users WHERE id = '1316403900579872';
View 语法
创建视图:
CREATE VIEW [IF NOT EXISTS] [db_name.]view_name
[(column_name [COMMENT column_comment], ...) ]
[COMMENT view_comment]
[TBLPROPERTIES (property_name = property_value, ...)]
AS SELECT ... ;
hive>create view v_psn as select * from person;
hive> show tables;
...
v_psn
...
查询视图:
select colums from view;
在对应元数据库中的 TBLS 中多出一条记录:
hive>create view v_psn2 as select * from person order by id desc;
hive> select * from v_psn2 order by id; #和视图排序一致一个job
Query ID = root_20200302193830_80bcc248-fafc-44e7-b1b9-7cdbe0117e91
Total jobs = 2 #不一致两个job
Launching Job 1 out of 2
number of mappers: 1; number of reducers: 1
order by 不建议使用,reduce 为 1 时,如果大量数据都需要加载到内存中进行排序,很可能将内存塞满。
删除视图:
DROP VIEW [IF EXISTS] [db_name.]view_name;
drop view v_psn;
Hive 索引
目的:优化查询以及检索性能
创建索引:
create index t1_index on table person(name)
as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild
in table t1_index_table;
as:指定索引器;
in table:指定索引表,若不指定默认生成在 default__person_t1_index__
表中
create index t1_index on table person2(name)
as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild;
索引表中没有索引数据:
hive> select * from t1_index_table;
OK
t1_index_table.name t1_index_table._bucketname t1_index_table._offsets
Time taken: 0.074 seconds
查询索引show index on person;
重建索引(建立索引之后必须重建索引才能生效)ALTER INDEX t1_index ON person REBUILD;
重建完毕之后,再次查询有索引数据:select * from t1_index_table;
删除索引DROP INDEX IF EXISTS t1_index ON person;
hive> select * from person where name='小明1';
OK
person.id person.name person.likes person.address
1 小明1 ["lol","book","movie"] {"beijing":"xisanqi","shanghai":"pudong"}
Time taken: 0.108 seconds, Fetched: 1 row(s)
hive> DROP INDEX IF EXISTS t1_index ON person;
OK
Time taken: 0.202 seconds
hive> select * from person where name='小明1';
OK
person.id person.name person.likes person.address
1 小明1 ["lol","book","movie"] {"beijing":"xisanqi","shanghai":"pudong"}
Time taken: 0.081 seconds, Fetched: 1 row(s)
由于使用索引需要查询两张表,当数据量少的时候,效率反而低了。
Hive 运行方式
- 命令行方式 cli:控制台模式 !!
- 脚本运行方式(实际生产环境中用最多)!!!
- JDBC方式:hiveserver2 !!!
- web GUI接口 (hwi、hue等)
命令行方式 cli:控制台模式
与 hdfs 交互:(了解)
执行执行 dfs 命令
hive>dfs –ls /;
hive>dfs -cat /user/hive_remote/warehouse/person/person01.txt;
与Linux交互
!开头
!pwd;
!ls /root;
Hive 脚本运行方式:
[root@node4 ~]# hive --service cli --help
usage: hive
-d,--define <key=value> 定义变量
--database <databasename> Specify the database to use
-e <quoted-query-string> 从命令行中输入sql预计
-f <filename> 从文件中执行sql语句。
-H,--help Print help information
--hiveconf <property=value> Use value for given property
--hivevar <key=value> Variable subsitution to apply to hive
commands. e.g. --hivevar A=B
-i <filename> 初始化的sql文件
-S,--silent 静默模式(不显示ok和Time taken提示信息)
-v,--verbose Verbose mode (echo executed SQL to the
console)
hive -e "select * from person"
hive -e "select * from person">he.log
hive -S -e "select * from person">hse.log
hive -f file
[root@node4 ~]# vim hive.sh
hive -e "select * from person"
[root@node4 ~]# chmod +x hive.sh
[root@node4 ~]# ./hive.sh
[root@node4 ~]# vim hivef.sh
select * from person
[root@node4 ~]# chmod +x hive.sh
[root@node4 ~]# hive -f hivef.sh
hive -i init.sql
(进入hive命令模式)
[root@node4 ~]# vim init.sql
select * from person;
[root@node4 ~]# hive -i init.sql
hive> source file
(在hive cli中运行)
hive> source init.sql;
Web GUI 接口 (hwi、hue等)
web 界面安装:
- 下载源码包
apache-hive-*-src.tar.gz
- 打开 PowerShell
- 进入到指定目录
cd D:\devsoft\src\apache-hive-1.2.1-src\hwi\web
- 打包:
jar -cvf hive-hwi.war *
- 将
hive-hwi.war
放在 node3 的$HIVE_HOME/lib/
- 复制
tools.jar
(在 jdk 的 lib 目录下)到 node3 的$HIVE_HOME/lib
下
[root@node3 ~]# cd /opt/hive-1.2.1/lib/
[root@node3 lib]# pwd
/opt/hive-1.2.1/lib
[root@node3 lib]# cp /usr/java/jdk1.8.0_221-amd64/lib/tools.jar ./
- 修改
hive
配置文件hive-site.xml
添加以下配置内容:
<property>
<name>hive.hwi.listen.host</name>
<value>0.0.0.0</value>
</property>
<property>
<name>hive.hwi.listen.port</name>
<value>9999</value>
</property>
<property>
<name>hive.hwi.war.file</name>
<value>lib/hive-hwi.war</value>
</property>
修改前记得备份一下该配置文件。
-
Ctrl+C
关闭
[root@node3 ~]# hive --service metastore
Starting Hive Metastore Server
- 启动 hwi 服务( 端口号 9999 )
hive --service hwi
- 浏览器通过以下链接来访问
http://node3:9999/hwi/
User 输入 test 或者 root(随便写),都可以验证成功。 - 查看数据库、表相关信息
- 查看
Hive权限
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization
Introduction
Note that this documentation is referring to Authorization which is verifying if a user has permission to perform a certain action, and not about Authentication (verifying the identity of the user). Strong authentication for tools like the Hive command line is provided through the use of Kerberos. There are additional authentication options for users of HiveServer2.
导言
请注意,此文档是指正在验证用户是否有权执行某一操作的授权,而不是关于身份验证(验证用户的身份)。通过使用 Kerberos 为 Hive 命令行等工具提供强大的身份验证。为 HiveServer2 的用户提供了额外的身份验证选项。
Transform 功能见 https://www.cnblogs.com/aquastone/p/hive-transform.html
普通用户不能登录的问题演示:
- 先将启动的所有 hive 服务器以及客户端都关闭。
- 启动:
Node3:hive --service metastore
Node4:hiveserver2
- Node2:beeline 连接:
beeline -u jdbc:hive2://node4:10000/default -n gtjin -p 123
[root@node2 ~]# beeline -u jdbc:hive2://node4:10000/default -n gtjin -p 123
Connecting to jdbc:hive2://node4:10000/default
Error: Failed to open new session: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.security.AccessControlException: Permission denied: user=gtjin, access=EXECUTE, inode="/tmp":root:supergroup:drwx------
beeline -u jdbc:hive2://node4:10000/default -n root -p 123
root 可以正常访问。
原因:这是由于 hadoop 的安全验证导致的。
解决普通用户不能登录的问题:
- 修改四台虚拟机上
hdfs-site.xml
, 添加如下配置:
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
- 修改 node3 上的
hive-site.xml
在 hive 服务端(node3)修改配置文件hive-site.xml
添加以下配置内容(建议 hwi 的相关配置删除):
<property>
<name>hive.security.authorization.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<name>hive.users.in.admin.role</name>
<value>root</value>
</property>
<property>
<name>hive.security.authorization.manager</name>
<value>org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory</value>
</property>
<property>
<name>hive.security.authenticator.manager</name> <value>org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator</value>
</property>
关闭 hive 服务器和客户端,重启 hadoop 集群。[root@node2 ~]# beeline -u jdbc:hive2://node4:10000/default -n gtjin -p 123
便可以正常访问了。
但是该方式普通用户登录之后,也可以创建角色。
CREATE ROLE role_name; -- 创建角色
DROP ROLE role_name; -- 删除角色
SET ROLE (role_name|ALL|NONE); -- 设置角色
SHOW CURRENT ROLES; -- 查看当前具有的角色
SHOW ROLES; -- 查看所有存在的角色
切换如下方式:
关闭 hive 服务器端和客户端。重新在 node3 上执行 hiveserver2 命令,node2 上通过 beeline 访问即可。普通用户只能看自己具有的角色了。
root用户登录后,创建角色失败,需要通过命令:set role admin
,切换角色。
create role sxt;
对比MYSQL授权:
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
授角色:grant admin to role sxt with admin option;
查看角色 sxt 拥有的角色列表:show role grant role sxt;
查看 admin 角色被授予的列表:show principals admin;
撤销角色:
revoke admin from role sxt;#从角色sxt撤销admin角色
revoke admin from user gtjin;#从用户gtjin撤销admin角色
删除角色:drop role test;
权限列表:
- ALL - Gives users all privileges
- ALTER - Allows users to modify the metadata of an object
- UPDATE - Allows users to modify the physical data of an object
- CREATE - Allows users to create objects. For a database, this means users can create tables, and for a table, this means users can create partitions
- DROP - Allows users to drop objects
- INDEX - Allows users to create indexes on an object (Note: this is not currently implemented)
- LOCK - Allows users to lock or unlock tables when concurrency is enabled
- SELECT - Allows users to access data for objects
- SHOW_DATABASE - Allows users to view available databases
链接:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges
授予权限:
grant select on person to user gtjin with grant option;
切换 gtjin 登录,查询 person 表可以,别的表无权限。
#grant select on person to role sxt with grant option;
查看权限:
show grant user gtjin on all;
show grant user gtjin on table person;
撤销权限:
revoke select on person from user gtjin;
root 登录,且切换 admin 角色后才能删除
Hive 优化
关闭 hive 服务器和客户端,node3:hive --service metastore
Node4:hive
Fetch 抓取
set hive.fetch.task.conversion=none/more(默认值);
默认做了优化,以下两种情况都不经过 mr,改为 none 后,将走 mr。
以下 SQL 不会转为 Mapreduce 来执行
- select 仅查询本表字段
select id,name from person;
- where 仅对本表字段做条件过滤
select id,name,age from person where age =32;
Sql 分析:
explain select * from person;
explain select count(*) from person;
explain extended select count(*) from person;(了解)
本地运行模式
set hive.exec.mode.local.auto;#默认false
hive> select count(*) from person;
Time taken: 24.026 seconds, Fetched: 1 row(s)
hive>set hive.exec.mode.local.auto=true;#开启本地模式
hive>select count(*) from person;
Time taken: 3.747 seconds, Fetched: 1 row(s)
开发和测试阶段使用本地模式,优点快,缺点是http://node3:8088/cluster
看不到。对于小数据集 hive 通过本地模式在单机上处理任务,执行时间可以明显被缩短。
设置 local mr 的最大输入数据量,当输入数据量小于这个值是采用 local mr 的方式,默认 134217728 也就是 128M 。若大于该配置仍会以集群方式来运行!
hive.exec.mode.local.auto.inputbytes.max=134217728
设置 local mr 的最大输入文件个数,当输入文件个数小于这个值是采用 local mr 方式
hive.exec.mode.local.auto.input.files.max=4 #默认4
并行模式:
hive> set hive.exec.parallel; #默认为false
hive>select t1.ct,t2.ct from (select count(id) ct from person) t1,(select count(name) ct from person) t2;
看执行过程,Launching Job 1 out of 5执行完,才执行Launching Job 2 out of 5
hive>set hive.exec.parallel=true;
hive>select t1.ct,t2.ct from (select count(id) ct from person) t1,(select count(name) ct from person) t2
Launching Job 1 out of 5和Launching Job 2 out of 5 并行执行。但时间有可能并没有减少,因为需要两套资源,目前还是使用一套。资源充足的情况下,肯定并行更快。
hive> set hive.exec.parallel.thread.number;
hive.exec.parallel.thread.number=8
并行进程默认是8个进程同时进行。
严格与非严格模式
- 参数设置
hive> set hive.mapred.mode=strict;严格模式
hive> set hive.mapred.mode=nonstrict;非严格模式(默认模式)
- 查询限制:
1、对于分区表,必须添加 where 对于分区字段的条件过滤;
hive> select * from person5;
FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "person5" Table "person5"
hive> select * from person5 where age=10; #可以查询
2、order by 语句必须包含 limit 输出限制;
hive> select * from person order by id desc;
FAILED: SemanticException 1:30 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'id'
hive> select * from person order by id desc limit 10;
使用order by语句必须使用limit语句。
3、限制执行笛卡尔积的查询。
(1) 严格模式下避免出现笛卡尔积。
Hive 排序
- order by - 对于查询结果做全排序,只允许有一个 reduce 处理
(当数据量较大时,应慎用。严格模式下,必须结合 limit 来使用) - sort by - 对于单个 reduce 的数据进行排序
- distribute by - 分区排序,经常和 Sort By 结合使用
- cluster by - 相当于 sort By + distribute By
(Cluster By 不能通过 asc、desc 的方式指定排序规则;
可通过 distribute by column sort by column asc|desc 的方式)
select * from person sort by id desc;
分区剪裁、列裁剪
- 尽可能早的过滤掉尽可能多的数据,避免大量数据流入外层 sql
- 分区剪裁
- 分区在 hive 上本质是目录,分区剪裁可以高效的过滤掉大部分数据。
- 尽量使用分区剪裁
- 列裁剪
- 只获取需要的列的数据,减少数据输入。
- 少用 select *
select id,name
From (select id,name from person) tmp
Hive - JVM 重用
适用场景:
1、小文件个数过多
2、task 个数过多
通过 set mapred.job.reuse.jvm.num.tasks=n;
来设置
(n 为 task 插槽个数)默认为 1。
优点:JVM 重用使得 JVM 实例在同一个 Job 中重复使用多次,减少进程启动和销毁时间和频繁申请资源的系统开销。
缺点:设置开启之后,task 插槽会一直占用资源,不论是否有 task 运行,直到所有的 task 即整个 job 全部执行完成时,才会释放所有的 task 插槽资源!
推测执行
根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务和原始任务同时处理“同一份”数据,并最终选择最先成功完成任务的计算结果作为最终结果。
set hive.mapred.reduce.tasks.speculative.execution=true;#开启推测
表优化
小表与大表 join
Hive Join
官网搜索:join->LanguageManual Joins
join_table:
table_reference [INNER] JOIN table_factor [join_condition]
| table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition
| table_reference LEFT SEMI JOIN table_reference join_condition
| table_reference CROSS JOIN table_reference [join_condition] (as of Hive 0.10)
table_reference:
table_factor
| join_table
table_factor:
tbl_name [alias]
| table_subquery alias
| ( table_references )
join_condition:
ON expression
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
is converted into a single map/reduce job as only key1 column for b is involved in the join. On the other hand
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
is converted into two map/reduce jobs because key1 column from b is used in the first join condition and key2 column from b is used in the second one. The first map/reduce job joins a with b and the results are then joined with c in the second map/reduce job.
https://blog.csdn.net/sofuzi/article/details/81265402
reduce side join
reduce side join 是一种最简单的 join 方式,其主要思想如下:
在 map 阶段,map 函数同时读取两个文件 File1 和 File2 ,为了区分两种来源的 key/value 数据对,对每条数据打一个标签> (tag),比如:tag=1 表示来自文件 File1,tag=2 表示来自文件 File2。即:map 阶段的主要任务是对不同文件中的数据打标签。> 在 reduce 阶段,reduce 函数获取 key 相同的来自 File1 和 File2 文件的 value list,
然后对于同一个 key,对 File1 和 File2 中的数据进行 join(笛卡尔乘积)。即:reduce 阶段进行实际的连接操作。
map side join
之所以存在 reduce side join,是因为在 map 阶段不能获取所有需要的 join 字段,即:同一个 key 对应的字段可能位于不同 map 中。Reduce side join 是非常低效的,因为 shuffle 阶段要进行大量的数据传输。 Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多 份,让每个 map task 内存中存在一份(比如存放到 hash table 中),然后只扫描大表:对于大表中的每一条记录 key/value,在 hash table 中查找是否有相同的 key 的记录,如果有,则连接后输出即可。
Join 计算时,将小表(驱动表)放在 join 的左边
Map Join:在 Map 端完成 Join
两种实现方式:
1、SQL 方式,在 SQL 语句中添加 MapJoin 标记(mapjoin hint)
语法:
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value
FROM smallTable JOIN bigTable ON smallTable.key = bigTable.key;
2、开启自动的 MapJoin
大小判断标准,表数据的大小而不是行数。(通常行少的数据一般是小表)hive.ignore.mapjoin.hint;
默认为 true,如果自动和手动冲突了,手动的配置失效,以自动配置为准。
将 key 相对分散,并数据量小的表放在 join 的左边,这样可以有效减少内存溢出发生的几率;还可以使用 map join 让小的维度表(1000 条一下的记录数据)先进内存,在 map 端完成 reduce。
测试发现新版的 hive 已经对小表 join 大表和大表 join 小表进行了优化,小表放在左边和右边已经没有明显的区别。
map join原理分析:
在 Map 端完成 Join。如果不指定 MapJoin 或者不符合 MapJoin 的条件,那么 Hive 解析器会将 Join 操作转换成 CommonJoin,在 reduce 阶段完成 join。容易发生倾斜。可以用 MapJoin 将小表全部加装到内存,在 map 端完成 join,避免 reducer 处理。
两种实现方式:
1、开启自动的 MapJoin
通过修改以下配置启用自动的 mapjoin:
set hive.auto.convert.join = true;
(该参数为 true 时,Hive 自动对左边的表统计量,如果是小表就加入内存,即对小表使用 Map join)
2、SQL 方式,在 SQL 语句中添加 MapJoin 标记(mapjoin hint)
语法:
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value
FROM smallTable JOIN bigTable ON smallTable.key = bigTable.key;
相关配置参数:
-
hive.mapjoin.smalltable.filesize;
#默认25M
(大表小表判断的阈值,如果表的大小小于该值则会被加载到内存中运行) -
hive.ignore.mapjoin.hint;
(默认值:true;是否忽略 mapjoin hint 即 mapjoin 标记) -
hive.auto.convert.join.noconditionaltask;
(默认值:true;将普通的 join 转化为普通的 mapjoin 时,是否将多个 mapjoin 转化为一个 mapjoin) -
hive.auto.convert.join.noconditionaltask.size;
(将多个 mapjoin 转化为一个 mapjoin 时,其表的最大值)
hive.groupby.skewindata
避免数据倾斜,一个 MR 变 2 个:MR->MR , 先做一次数据合并,有时候一个 MR 都即将 100% 的时候卡着不动了,分两个完成。
大表 join 大表
- 空 key 过滤:
(1) 有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。Where xxx is not null
(2) 一般在 ETL 数据清洗时便会对空值进行了处理,所以该条一般情况下意义不大,但是面试的时候可以说。最终目的是 reduce 均衡,防止数据倾斜。 - 空 key 转换:
(1) 有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上
Map-Side 聚合
- 默认情况下,Map 阶段相同 key 发送到一个 reduce,当某个 key 的数据过大时就会发生数据倾斜。
- 并不是所有的聚合都需要在 reduce 端完成,可以先在 map 端进行聚合,最后再在 reduce 端聚合。如同 combiner
- 通过设置以下参数开启在 Map 端的聚合 100000 90000
set hive.map.aggr=true;
hive.groupby.mapaggr.checkinterval:
map端group by执行聚合时处理的多少行数据(默认:100000)
hive.map.aggr.hash.min.reduction:
进行聚合的最小比例(预先对100000条数据做聚合,若聚合之后的数据量/100000的值大于该配置0.5,则不会聚合)
hive.map.aggr.hash.percentmemory:
map端聚合使用的内存的最大值
hive.map.aggr.hash.force.flush.memory.threshold:
map端做聚合操作是hash表的最大可用内存,大于该值则会触发flush
hive.groupby.skewindata
是否对GroupBy产生的数据倾斜做优化,默认为false
count(distinct) 去重统计
- 数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换
每个 reduce 任务处理的数据量,默认值 256MB
set hive.exec.reducers.bytes.per.reducer=256000000
select count(distinct imei) from jizhan;
转换为
Select count(imei) from (select imei from jizhan group by imei) tmp
虽然会多一个Job来完成,但在数据量大的情况下,这个绝对值得。
笛卡尔积
尽量避免笛卡尔积,即避免 join 的时候不加 on 条件,或者无效的 on 条件。Hive 只使用 1 个 reduce 来完成笛卡尔积,所以效率特别低。
合适设置 Map 与 Reduce 数量
合理设置 Map Task 数量
- 通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务。
- 主要决定因素:input 的文件总个数,input 的文件大小,集群设置的文件块大小。block 块,split_size , 文件个数 split 切片数量决定 map task 数量
- 是不是 map 数越多越好?
- 答:不是。如果一个任务有很多小文件,则每个小文件都会被当成一个 split 切片,用一个 map 任务来完成,执行真实的业务逻辑运算的时间远远小于 map 任务的启动和初始化的时间,就会造成很大的资源浪费。另外,同时可执行的 map 数也是受限的。如何优化,答案当然是减少 map 的数量,比如通过合并小文件减少 map 数量,见
合并小文件
。
- 答:不是。如果一个任务有很多小文件,则每个小文件都会被当成一个 split 切片,用一个 map 任务来完成,执行真实的业务逻辑运算的时间远远小于 map 任务的启动和初始化的时间,就会造成很大的资源浪费。另外,同时可执行的 map 数也是受限的。如何优化,答案当然是减少 map 的数量,比如通过合并小文件减少 map 数量,见
- 是不是保证每个 map 处理接近 128M 的文件块,就高枕无忧了?
- 答:不一定,比如一个 128MB(或者接近该值)的文件,默认情况会用一个 map 去完成,但是这个文件可能只有很少的小字段,却又几千万的记录,如果 map 处理的逻辑比较复杂,用一个 map 任务去做,肯定比较耗时。如何解决?当然是增加 map 的个数。见
复杂文件增加 map 数
。
- 答:不一定,比如一个 128MB(或者接近该值)的文件,默认情况会用一个 map 去完成,但是这个文件可能只有很少的小字段,却又几千万的记录,如果 map 处理的逻辑比较复杂,用一个 map 任务去做,肯定比较耗时。如何解决?当然是增加 map 的个数。见
合并小文件
小文件数目多,容易在文件存储端造成压力,给 hdfs 造成压力,影响效率
设置合并属性
是否合并 map 输出文件:hive.merge.mapfiles=true
是否合并 reduce 输出文件:hive.merge.mapredfiles=true;
合并文件的大小:hive.merge.size.per.task=256*1000*1000
CombineHiveInputFormat
具有对小文件进行合并的功能(系统默认的格式)
set hive.input.format=
org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
复杂文件增加 map 数
对比设置 split 逻辑切块的大小(minSize,maxSize)set mapred.max.split.size=256000000
(默认值)splitSize = Math.max(minSize,Math.min(maxSize,blockSize))
一个 split 的最大值,即每个 map 处理文件的最大值。让该值小于 blocksize 就可以增加 map 的个数。
注意:set mapred.min.split.size=1;
(默认值)
扩展 Map 数量相关的参数mapred.min.split.size.per.node
一个节点上 split 的最小值mapred.min.split.size.per.rack
一个机架上 split 的最小值
合理设置 Reduce 数
- 方式一:强制指定 reduce 任务的数量
set mapred.reduce.tasks=5;
Sql1
set mapred.reduce.tasks=3;
Sql2
- 方式二:
(1) 设置每个 reduce 处理的数据量默认值约等于 256M
set hive.exec.reducers.bytes.per.reducer=256000000;
(2) 每个任务最大的 reduce 数,默认 1009。
set hive.exec.reducers.max=1009;
(3) 计算 reduce 数 2560 * 1024 *1024
num = min(hive.exec.reducers.max,
总输入数据量/hive.exec.reducers.bytes.per.reducer)
总结:reduce 个数并不是越多越好,过多的启动和初始化 reduce 也会消耗时间和资源;另外过多的 reduce 会生成很多个结果文件,同样产生了小文件的问题。
hive—high Avaliable
hive 的搭建方式有三种,分别是
1、Local/Embedded Metastore Database (Derby)
2、Remote Metastore Database
3、Remote Metastore Server
一般情况下,我们在学习的时候直接使用hive––service metastore的方式启动服务端,使用 hive 的方式直接访问登录客户端,除了这种方式之外,hive 提供了 hiveserver2 的服务端启动方式,提供了 beeline 和 jdbc 的支持,并且官网也提出,一般在生产环境中,使用 hiveserver2 的方式比较多,如图:
使用 hiveserver2 的优点如下:
1、在应用端不需要部署 hadoop 和 hive 的客户端
2、hiveserver2 不用直接将 hdfs 和 metastore 暴露给用户
3、有 HA 机制,解决单点故障的问题以及应用端的并发和负载问题
4、jdbc 的连接方式,可以使用任何语言,方便与应用进行数据交互
本文档主要介绍如何进行 hive 的 HA 的搭建:
如何进行搭建,参照之前 hadoop 的 HA,使用 zookeeper 完成 HA
1、环境如下:
Node1 | Node2 | Node3 | Node4 | |
---|---|---|---|---|
Namenode | 1 | 1 | ||
Journalnode | 1 | 1 | 1 | |
Datanode | 1 | 1 | 1 | |
Zkfc | 1 | 1 | ||
zookeeper | 1 | 1 | 1 | |
resourcemanager | 1 | 1 | ||
nodemanager | 1 | 1 | 1 | |
Hiveserver2 | 1 | 1 | ||
beeline | 1 |
2、node2:hive-site.xml
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive_ha/warehouse</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://node1:3306/hive_ha?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>node2:2181,node3:2181,node4:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>node2</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10001</value>
</property>
3、node3:hive-site.xml
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive_ha/warehouse</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://node1:3306/hive_ha?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>node2:2181,node3:2181,node4:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>node3</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10001</value>
</property>
4、node2 和 node3 启动 hive:hiveserver2
5、使用 jdbc 或者 beeline(node4) 两种方式进行访问
1) beeline
!connect jdbc:hive2://node2,node3,node4/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk root 123
2)Jdbc
1. package cn.gtjin;
2. import java.sql.*;
3. public class HiveJdbcClient2 {
4. private static String driverName = "org.apache.hive.jdbc.HiveDriver";
5. public static void main(String[] args) throws SQLException {
6. try {
7. Class.forName(driverName);
8. } catch (ClassNotFoundException e) {
9. e.printStackTrace();
10. }
11. Connection conn = DriverManager.getConnection("jdbc:hive2://node2,node3,node4/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk", "root", "");
12. Statement stmt = conn.createStatement();
13. String sql = "select * from person";
14. ResultSet res = stmt.executeQuery(sql);
15. while (res.next()) {
16. System.out.println(res.getString(1));
17. }
18. }
19. }