CDP中的Hive3系列之Hive性能调优

这是CDP中Apache Hive3用户指南系列之一,之前的文章请参考<CDP的Hive3系列之Hive Metastore介绍>,<CDP中的Hive3系列之Apache Hive3的特性>,<CDP中的Hive3系列之启动Apache Hive3>,<CDP中的Hive3系列之Hive3使用指南>,<CDP中的Hive3系列之管理Hive3>,<CDP中的Hive3系列之管理Hive的工作负载>,<CDP中的Hive3系列之配置Apache Hive3>,<CDP中的Hive3系列之保护Hive3>和<CDP中的Hive3系列之Hive性能调优>.

1    数据迁移到 Apache Hive

要将数据从 RDBMS(例如 MySQL)迁移到 Hive,您应该考虑使用带有 Teradata 连接器的 CDP 中的 Apache Sqoop。Apache Sqoop 客户端基于 CLI 的工具在关系数据库和 HDFS 或云对象存储(包括 Amazon S3 和 Microsoft ADLS)之间批量传输数据。

需要进行提取、转换和加载 (ETL) 过程的遗留系统数据源通常驻留在文件系统或对象存储中。您还可以以分隔文本(默认)或 SequenceFile 格式导入数据,然后将数据转换为 Hive 推荐的 ORC 格式。通常,为了在 Hive 中查询数据,ORC 是首选格式,因为 ORC 提供了性能增强。

1.1   Sqoop Teradata 连接器结合使用

CDP 不支持使用 Hadoopjar命令(Java API)的 Sqoop 导出。Teradata 的连接器文档包括使用此 API 的说明。据报道,CDP 用户将不受支持的 API 命令(例如 )误认为是 -forcestage受支持的 Sqoop 命令,例如 –-staging-force。Cloudera 仅支持通过使用由 Teradata 提供支持的 Cloudera 连接器中记录的命令使用 Sqoop 。Cloudera 不支持将 Sqoop 与 Hadoop jar命令一起使用,例如 Teradata Connector for Hadoop 教程中所述的命令。

1.2   Cloudera 网站上的 Apache Sqoop 文档

要访问最新的 Sqoop 文档,请转到Sqoop 文档 1.4.7.7.1.6.0

原文链接:https://docs.cloudera.com/cdp-private-cloud-base/latest/migrating-data-into-hive/topics/hive_data_migration.html

2    设置 Sqoop

Cloudera Runtime 包括 Sqoop 客户端,用于将数据从不同数据源批量导入和导出到 Hive。您将学习如何在 CDP 中安装 RDBMS 连接器和 Sqoop 客户端。

1)     在 Cloudera Manager 的 Clusters 中,从选项菜单中选择 Add Service。

2)     选择 Sqoop 客户端并单击继续。

3)     分别根据 Sqoop 导入或导出的源或目标的数据源选择 JDBC 数据库驱动程序。

4)     /var/lib/sqoop在 Sqoop 节点上安装 JDBC 数据库驱动程序。

不要安装,/opt/cloudera/parcels/CDH因为升级修改了这个目录。

·       MySQL:将 MySQL 驱动下载 https://dev.mysql.com/downloads/connector/j/ 到 /var/lib/sqoop,然后运行tar -xvzf mysql-connector-java-<version>.tar.gz.

·       Oracle:从 下载驱动程序 https://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html 并将其放入/var/lib/sqoop.

·       PostgreSQL:从下载驱动程序 http://jdbc.postgresql.org/download.html并将其放入 /var/lib/sqoop.

5)     在 Cloudera Manager 中,单击 操作>部署客户端配置。

设置 Sqoop 客户端后,您可以使用以下连接字符串输入Sqoop 命令,具体取决于您的数据源。

·       MySQL 语法:

jdbc:mysql://<HOST>:<PORT>/<DATABASE_NAME>

例子:

jdbc:mysql://my_mysql_server_hostname:3306/my_database_name

·       Oracle语法:

jdbc:oracle:thin:@<HOST>:<PORT>:<DATABASE_NAME>

例子:

jdbc:oracle:thin:@my_oracle_server_hostname:1521:my_database_name

·       PostgreSQL 语法:

jdbc:postgresql://<HOST>:<PORT>/<DATABASE_NAME>

例子:

jdbc:postgresql://my_postgres_server_hostname:5432/my_database_name

·       Netezza 语法:

jdbc:netezza://<HOST>:<PORT>/<DATABASE_NAME>

例子:

jdbc:netezza://my_netezza_server_hostname:5480/my_database_name

·       Teradata 语法:

jdbc:teradata://<HOST>/DBS_PORT=1025/DATABASE=<DATABASE_NAME>

例子:

jdbc:teradata://my_teradata_server_hostname/DBS_PORT=1025/DATABASE=my_database_name

原文链接:https://docs.cloudera.com/cdp-private-cloud-base/latest/migrating-data-into-hive/topics/cm_mc_sqoop1_client.html

3    将数据从数据库移动到 Apache Hive

您可以使用 Sqoop 将数据从关系数据库导入到 Hive 中使用。您可以将数据直接导入 Hive。假设 Sqoop 客户端服务在您的集群中可用,您可以从命令行发出导入和导出命令。

3.1  创建 Sqoop 导入命令

您可以创建一个 Sqoop 导入命令,该命令使用 Apache Sqoop 将来自不同数据源(例如不同网络上的关系数据库)的数据导入 Apache Hive。

您在 Hive 集群的命令行中输入 Sqoop 导入命令,将数据从数据源导入集群文件系统和 Hive。导入可以包括以下信息,例如:

·       数据库连接信息:数据库URI、数据库名称、连接协议,如 jdbc:mysql:

·       要导入的数据

·       用于快速数据传输的并行处理指令

·       导入数据的目的地

Sqoop 经测试可与 Connector/J 5.1 一起使用。如果您已升级到 Connector/J 8.0,并且想要使用该zeroDateTimeBehavior属性来处理 DATE 列中“0000-00-00\”的值,请zeroDateTimeBehavior=CONVERT_TO_NULL在连接字符串中明确指定 。例如,--connect jdbc:mysql://<MySQL host>/<DB>?zeroDateTimeBehavior=CONVERT_TO_NULL。

1)     创建一个导入命令,指定到 RDBMS 的 Sqoop 连接。

n  要在命令行上输入数据源的密码,请使用-P连接字符串中的 选项。

n  要指定存储密码的文件,请使用该 --password-file选项。

n  命令行密码:

sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \
<data to import> \
--username <username> \
-P

指定密码文件:

sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \
--table EMPLOYEES \
--username <username> \
--password-file ${user.home}/.password

2)     在命令中指定要导入的数据。

n  导入整个表。

n  导入列的子集。

n  使用*格式查询导入数据。

整个表:

sqoop import \
--connect jdbc:mysql://db.foo.com:3306/bar \
--table EMPLOYEES

列子集:

sqoop import 
--connect jdbc:mysql://db.foo.com:3306/bar \
--table EMPLOYEES \
--columns "employee_id,first_name,last_name,job_title"

导入最新数据的*格式查询:

sqoop import \
--connect jdbc:mysql://db.foo.com:3306/bar \
--table EMPLOYEES \
--where "start_date > '2018-01-01'"

3)     或者,在 import 语句中指定write parallelism以并行执行多个 map 任务:

n  设置映射器:如果源表有主键,则使用--num-mappers.

n  拆分方式:如果主键分布不均,请使用以下方法提供拆分键 --split-by

n  顺序:如果您没有主键或拆分键,请使用--num-mappers 1或 --autoreset-to-one-mapper在查询中顺序导入数据。

n  设置映射器:

sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \
 --table EMPLOYEES \
 --num-mappers 8 \

n  Split:

sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \
 --table EMPLOYEES \
 --split-by dept_id

n  设置映射器均匀地分割源表的主键范围。

n  Split by 使用拆分键而不是主键均匀拆分数据。

4)     指定使用 Hive 默认分隔符将数据导入 Hive --hive-import。

5)     指定数据的 Hive 目标。

n  如果您认为 Hive 中不存在该表,请命名数据库和表,并使用 --create-hive-table 选项。

n  如果要将导入的数据插入到现有的 Hive 外部表中,请命名数据库和表,但不要使用 --create-hive-table 选项。

此命令将 MySQL EMPLOYEES 表导入到仓库中命名的新 Hive 表中。

sqoop import --connect jdbc:mysql://db.foo.com:3306/corp \
--table EMPLOYEES \
--hive-import \
--create-hive-table \
--hive-database 'mydb' \
--hive-table 'newtable'

此命令将 MySQL EMPLOYEES 表导入到 HDFS 中的外部表。

sqoop import --connect jdbc:mysql://db.foo.com:3306/corp \
--table EMPLOYEES \
--hive-import \
--hive-database 'mydb' \
--hive-table 'myexternaltable'

如上所示,在单独的行(推荐)中指定用单引号括起来的数据库和表名称。或者在一行中指定数据库和表名,并用反引号将数据库和表名括起来。

--hive-table `mydb`.`myexternaltable`

由于 Hive-16907 错误修复,Hive 拒绝 SQL 查询中的 `db.table`。表名中不再允许使用点 (.)。您需要更改使用此类引用的查询,以防止 Hive 将整个 db.table 字符串解释为表名。

3.2   RDBMS 数据导入 Hive

您可以测试Apache Sqoop 导入命令,然后执行该命令将关系数据库表导入Apache Hive。

您可以在 Hive 集群的命令行中输入 Sqoop import 命令,以将数据从数据源导入 Hive。您可以在实际执行之前测试导入语句。

·       Apache Sqoop 客户端服务可用。

·       Hive Metastore 和 Hive 服务可用。

1)     或者,在执行前使用 eval 选项测试导入命令。

sqoop eval --connect jdbc:mysql://db.foo.com/bar \
--query "SELECT * FROM employees LIMIT 10"

select 语句的输出出现,其中列出了 RDBMS 员工表中的 10 行数据。

2)     执行 Sqoop 导入命令,指定到 RDBMS 的 Sqoop 连接、要导入的数据以及目标 Hive 表名称。

此命令将 MySQL EMPLOYEES 表导入到仓库中命名的新 Hive 表中。

sqoop import --connect jdbc:mysql://db.foo.com:3306/corp \
--table EMPLOYEES \
--hive-import \
--create-hive-table \
--hive-table mydb.newtable

原文链接:https://docs.cloudera.com/cdp-private-cloud-base/latest/migrating-data-into-hive/topics/hive_moving_data_from_databases_to_hive.html

4    将数据从 HDFS 移动到 Apache Hive

在CDP Private Cloud Base 中,您可以将不同数据源的数据导入 HDFS,进行 ETL 处理,然后在 Apache Hive 中查询数据。

4.1   RDBMS 数据导入 HDFS

在CDP Private Cloud Base 中,您创建一个 Sqoop 导入命令,将数据从关系数据库导入 HDFS。

您在集群的命令行中输入 Sqoop import 命令以将数据导入 HDFS。导入命令需要包含数据库URI、数据库名称和连接协议,如jdbc:mysql:m和要导入的数据。可选地,该命令可以包括用于快速数据传输的并行处理指令、导入数据的 HDFS 目标目录、数据分隔符和其他信息。如果未指定其他位置,则使用默认 HDFS 目录。字段以逗号分隔,行以行分隔。您可以在实际执行之前测试导入语句。

·       Apache Sqoop 已安装和配置。

1)     创建一个导入命令,指定与您要导入的数据源的 Sqoop 连接。

n  如果要在命令行上为数据源输入密码,请使用-P连接字符串中的选项。

n  如果要指定存储密码的文件,请使用该 --password-file选项。

n  命令行密码:

sqoop import --connect jdbc:mysql://db.foo.com/bar \
<data to import> \
--username <username> \
-P

n  指定密码文件:

sqoop import --connect jdbc:mysql://db.foo.com/bar \
--table EMPLOYEES \
--username <username> \
--password-file ${user.home}/.password

2)     在命令中指定要导入的数据。

n  导入整个表。

n  导入列的子集。

n  使用*格式查询导入数据。

整个表:

sqoop import \
--connect jdbc:mysql://db.foo.com/bar \
--table EMPLOYEES

列子集:

sqoop import \
--connect jdbc:mysql://db.foo.com/bar \
--table EMPLOYEES \
--columns "employee_id,first_name,last_name,job_title"

导入最新数据的*格式查询:

sqoop import \
--connect jdbc:mysql://db.foo.com/bar \
--table EMPLOYEES \
--where "start_date > '2018-01-01'"

3)     使用--target-dir选项指定导入数据的目的地 。

此命令使用默认文本文件分隔符将从 MySQL EMPLOYEES 表导入的数据附加到 HDFS 目标目录中的输出文件。

sqoop import \
--connect jdbc:mysql://db.foo.com:3600/bar \
--table EMPLOYEES \
--where "id > 100000" \
--target-dir /incremental_dataset \
--append

该命令按列拆分导入的数据,并指定将数据导入到 HDFS 目标目录中的输出文件中。

sqoop import \
--connect jdbc:mysql://db.foo.com:3600/bar \
--query 'SELECT a.*, b.* \
FROM a JOIN b on (a.id == b.id) \
WHERE $CONDITIONS' \
--split-by a.id \
--target-dir /user/foo/joinresults

此命令执行一次并使用 -m 1 选项指定的单个映射任务串行导入数据:

sqoop import \
--connect jdbc:mysql://db.foo.com:3600/bar \
--query \
'SELECT a.*, b.* \
FROM a \
JOIN b on (a.id == b.id) \
WHERE $CONDITIONS' \
-m 1 \
--target-dir /user/foo/joinresults

4)     或者,在 import 语句中指定write parallelism以并行执行多个 map 任务:

n  设置映射器:如果源表有主键,则使用--num-mappers.

n  拆分方式:如果主键分布不均,请使用以下方法提供拆分键 --split-by

n  顺序:如果您没有主键或拆分键,请使用--num-mappers 1或 --autoreset-to-one-mapper在查询中顺序导入数据。

n  设置映射器:

sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \
--table EMPLOYEES \
--num-mappers 8

n  拆分:

sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \
--table EMPLOYEES \
--split-by dept_id

n  设置映射器均匀地分割源表的主键范围。

n  Split by 使用拆分键而不是主键均匀拆分数据。

5)     或者,在执行前使用 eval 选项测试导入命令。

sqoop eval --connect jdbc:mysql://db.foo.com:3306/bar \
--query "SELECT * FROM employees LIMIT 10"

将出现 select 语句的输出。

4.2   HDFS 文件转换为 ORC

在CDP Private Cloud Base 中,要使用 Hive 查询 HDFS 中的数据,您需要对数据应用架构,然后以 ORC 格式存储数据。

要将HDFS中存储的数据转换成Hive中推荐的查询格式,您可以通过创建Hive外部表的方式为HDFS数据创建一个schema,然后创建一个Hive托管的表来转换和查询ORC格式的数据。转换是一个并行的分布式动作,不需要独立的 ORC 转换工具。假设您有以下 CSV 文件,其中包含描述字段的标题行和包含以下数据的后续行:

Name,Miles_per_Gallon,Cylinders,Displacement,Horsepower,Weight_in_lbs, \
  Acceleration,Year,Origin
"chevrolet chevelle malibu",18,8,307,130,3504,12,1970-01-01,A
"buick skylark 320",15,8,350,165,3693,11.5,1970-01-01,A
"plymouth satellite",18,8,318,150,3436,11,1970-01-01,A
"amc rebel sst",16,8,304,150,3433,12,1970-01-01,A
"ford torino",17,8,302,140,3449,10.5,1970-01-01,A

您从 CSV 文件中删除了标题。

1)     创建外部表:

CREATE EXTERNAL TABLE IF NOT EXISTS Cars(
    Name STRING, 
    Miles_per_Gallon INT,
    Cylinders INT,
    Displacement INT,
    Horsepower INT, 
    Weight_in_lbs INT,
    Acceleration DECIMAL,
    Year DATE,
    Origin CHAR(1))
COMMENT 'Data about cars from a public database'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/user/<username>/visdata';

2)     创建 Hive 管理的表以将数据转换为 ORC。

CREATE TABLE IF NOT EXISTS mycars(
Name STRING, 
Miles_per_Gallon INT,
Cylinders INT,
Displacement INT,
Horsepower INT, 
Weight_in_lbs INT,
Acceleration DECIMAL,
Year DATE,
Origin CHAR(1))
COMMENT 'Data about cars from a public database'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS ORC;

3)     将外部表中的数据插入到 Hive 管理的表中。

INSERT OVERWRITE TABLE mycars SELECT * FROM cars;

4)     验证您是否将数据正确导入到 ORC 格式的表中:

hive> SELECT * FROM mycars LIMIT 3;
OK
"chevrolet chevelle malibu" 18  8   307 130 3504   12  1970-01-01 A
"buick skylark 320"  15  8   350 165 3693   12  1970-01-01 A
"plymouth satellite" 18  8   318 150 3436   11  1970-01-01 A
Time taken: 0.144 seconds, Fetched: 3 row(s)

4.3   增量更新导入的表

在CDP Private Cloud Base 中,更新导入的表涉及使用 Apache Sqoop 导入对原始表所做的增量更改,然后将更改与导入到 Apache Hive 的表合并。

在将操作数据库中的数据摄取到 Hive 后,您通常需要设置一个流程来定期将导入的表与操作数据库表同步。基表是在第一次数据摄取期间创建的 Hive 管理的表。从操作数据库系统增量更新 Hive 表涉及合并基表和更改记录以反映最新的记录集。您将增量表创建为 Hive 外部表,通常来自 HDFS 中的 CSV 数据,以存储更改记录。此外部表包含自上次数据摄取以来操作数据库中的更改(插入和更新)。一般情况下,表是分区的,只更新最新的分区,使这个过程更有效率。

您可以使用 Oozie 自动执行以增量方式更新 Hive 中数据的步骤。

·       第一次将数据摄取到 hive 中时,您将整个基表以 ORC 格式存储在 Hive 中。

·       将其从外部表移动到 Hive 管理的表后的基表定义具有以下架构:

CREATE TABLE base_table (
id STRING,
field1 STRING,
modified_date DATE)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

1)     将增量表作为外部表存储在 Hive 中,并获取比 更新的记录last_import_date,这是上次增量数据更新的日期。

自上次更新数据以来,您经常导入增量更改,然后将其合并。

·       使用--check-column获取记录

·       用于--query获取记录

sqoop import --connect jdbc:teradata://{host name}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1 --check-column modified_date --incremental lastmodified --last-value {last_import_date}
sqoop import --connect jdbc:teradata://{host name}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --target-dir /user/hive/incremental_table -m 1 --query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’

2)     使用 Sqoop 将增量表数据移入 HDFS 后,您可以使用以下命令在其上定义外部 Hive 表

CREATE EXTERNAL TABLE incremental_table (
 id STRING,
 field1 STRING,
 modified_date DATE)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/user/hive/incremental_table';

3)     使用 MERGE 命令合并数据并协调基表记录与新记录:

MERGE INTO base_table
USING incremental_table ON base.id = incremental_table.id
WHEN MATCHED THEN UPDATE SET
fieldl1=incremental_table.email,
modified_date=incremental_table.state
WHEN NOT MATCHED THEN INSERT
VALUES(incremental_table.id, incremental_table.field1, incremental_table.modified_data);

原文链接:https://docs.cloudera.com/cdp-private-cloud-base/latest/migrating-data-into-hive/topics/hive_moving_data_from_hdfs_to_hive.html

 

上一篇:11月21日云栖精选夜读:如何扛住1.8亿/秒的双11数据洪峰?阿里流计算技术全揭秘


下一篇:git笔记:通过给grunt-inline打tag看tag操作