# Flink SQL 写 hudi
最近在做一个数据湖项目,基于 Hudi 的湖仓一体项目,计算引擎是 Flink + Spark
之前稍稍研究了一下数据湖的三个主要技术组件 IceBerg,以为可能会用,在网上看资料的时候,同样也发现,IceBerg 对 Flink 比较亲和,Hudi 对 Spark 比较亲和
一直以为会选 IceBerg,没想到 IceBerg 还有很多功能没有实现,相对来说 Hudi 会好很多
## 版本
Flink 的 Hudi bundle 是 0.9-SNAPSHOT
Hive 2.3
Hadoop 3.1
## 编译 hudi
Hudi 选择的是还没发布的 SNAPSHOT 版本,所以需要自己编译
```sh
mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Hudi ............................................... SUCCESS [ 1.948 s]
[INFO] hudi-common ........................................ SUCCESS [ 6.494 s]
[INFO] hudi-timeline-service .............................. SUCCESS [ 2.287 s]
[INFO] hudi-client ........................................ SUCCESS [ 0.054 s]
[INFO] hudi-client-common ................................. SUCCESS [ 2.930 s]
[INFO] hudi-hadoop-mr ..................................... SUCCESS [ 2.892 s]
[INFO] hudi-spark-client .................................. SUCCESS [ 6.267 s]
[INFO] hudi-sync-common ................................... SUCCESS [ 0.502 s]
[INFO] hudi-hive-sync ..................................... SUCCESS [ 2.651 s]
[INFO] hudi-spark-datasource .............................. SUCCESS [ 0.089 s]
[INFO] hudi-spark-common_2.11 ............................. SUCCESS [ 2.346 s]
[INFO] hudi-spark2_2.11 ................................... SUCCESS [ 1.436 s]
[INFO] hudi-spark_2.11 .................................... SUCCESS [ 9.377 s]
[INFO] hudi-utilities_2.11 ................................ SUCCESS [ 4.049 s]
[INFO] hudi-utilities-bundle_2.11 ......................... SUCCESS [ 12.717 s]
[INFO] hudi-cli ........................................... SUCCESS [ 4.430 s]
[INFO] hudi-java-client ................................... SUCCESS [ 0.902 s]
[INFO] hudi-flink-client .................................. SUCCESS [ 1.406 s]
[INFO] hudi-spark3_2.12 ................................... SUCCESS [ 2.199 s]
[INFO] hudi-dla-sync ...................................... SUCCESS [ 1.347 s]
[INFO] hudi-sync .......................................... SUCCESS [ 0.042 s]
[INFO] hudi-hadoop-mr-bundle .............................. SUCCESS [ 4.292 s]
[INFO] hudi-hive-sync-bundle .............................. SUCCESS [ 1.297 s]
[INFO] hudi-spark-bundle_2.11 ............................. SUCCESS [ 9.176 s]
[INFO] hudi-presto-bundle ................................. SUCCESS [ 4.972 s]
[INFO] hudi-timeline-server-bundle ........................ SUCCESS [ 4.643 s]
[INFO] hudi-hadoop-docker ................................. SUCCESS [ 0.445 s]
[INFO] hudi-hadoop-base-docker ............................ SUCCESS [ 0.204 s]
[INFO] hudi-hadoop-namenode-docker ........................ SUCCESS [ 0.053 s]
[INFO] hudi-hadoop-datanode-docker ........................ SUCCESS [ 0.045 s]
[INFO] hudi-hadoop-history-docker ......................... SUCCESS [ 0.096 s]
[INFO] hudi-hadoop-hive-docker ............................ SUCCESS [ 0.278 s]
[INFO] hudi-hadoop-sparkbase-docker ....................... SUCCESS [ 0.064 s]
[INFO] hudi-hadoop-sparkmaster-docker ..................... SUCCESS [ 0.048 s]
[INFO] hudi-hadoop-sparkworker-docker ..................... SUCCESS [ 0.048 s]
[INFO] hudi-hadoop-sparkadhoc-docker ...................... SUCCESS [ 0.047 s]
[INFO] hudi-hadoop-presto-docker .......................... SUCCESS [ 0.087 s]
[INFO] hudi-integ-test .................................... SUCCESS [ 4.581 s]
[INFO] hudi-integ-test-bundle ............................. SUCCESS [ 32.789 s]
[INFO] hudi-examples ...................................... SUCCESS [ 1.140 s]
[INFO] hudi-flink_2.11 .................................... SUCCESS [ 1.734 s]
[INFO] hudi-flink-bundle_2.11 ............................. SUCCESS [ 21.285 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:34 min
[INFO] Finished at: 2021-07-18T18:17:51+08:00
[INFO] Final Memory: 232M/1644M
[INFO] ------------------------------------------------------------------------
[WARNING] The requested profile "include-flink-sql-connector-hive" could not be activated because it does not exist.
```
## 前提
Flink 写 Hudi 很简单,把 “hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar” 放到Flink 的 lib 下,把 Hudi 当成Flink 的一个 connector,使用 sql-client 就可以直接写Hudi 了,但是这样不能直接将 Hudi 的元数据同步到 Hive,Flink 在建 hudi 表的时候指定 hive 同步参数,可以将 Flink 建的表的元数据,直接同步到 Hive 中
* 配置 hive
* 将 hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar 放入 hive lib 中(让 hive 支持 hudi 格式的数据)
* 启动 hive metastore
* 启动 hive hiveserver2
```sh
nohup hive --service metastore &
nohup hive --service hiveserver2 &
```
## Flink SQL
启动一个 yarn session
```sh
./bin/yarn-session.sh -d -ynm sql
```
启动 sql-client
```sh
./bin/sql-client.sh embedded -s application_1626588183454_0001
```
Flink sql
```sql
create table kafka_ods_user_info (
id int
,name string
,sex string
,age int
,birthday string
) with (
'connector' = 'kafka',
'topic' = 'test_topic_1',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
create table ods_user_info_3(
dl_uuid string
,id int
,name string
,sex string
,age int
,birthday string
,`etl_create_time` TIMESTAMP(3) COMMENT 'ETL创建时间'
,`etl_update_time` TIMESTAMP(3) COMMENT 'ETL更新时间'
,`partition` string
) with (
'connector' = 'hudi'
,'is_generic' = 'true'
,'path' = 'hdfs:///user/hive/warehouse/ods.db/ods_user_info_3'
,'hoodie.datasource.write.recordkey.field' = 'dl_uuid'
,'hoodie.datasource.write.partitionpath.field' = 'partition'
,'write.precombine.field' = 'etl_update_time'
,'write.tasks' = '1'
,'table.type' = 'MERGE_ON_READ'
,'compaction.tasks' = '1'
,'compaction.trigger.strategy' = 'num_or_time'
,'compaction.delta_commits' = '30'
,'compaction.delta_seconds' = '3600'
,'hive_sync.enable' = 'true'
,'hive_sync.db' = 'ods'
,'hive_sync.table' = 'ods_user_info'
,'hive_sync.file_format' = 'PARQUET'
,'hive_sync.support_timestamp' = 'true'
,'hive_sync.use_jdbc' = 'true'
,'hive_sync.jdbc_url' = 'jdbc:hive2://localhost:10000'
,'hive_sync.metastore.uris' = 'thrift://thinkpad:9083'
,'hoodie.datasource.hive_style_partition' = 'true'
,'hive_sync.partition_fields' = 'partition'
,'read.tasks' = '1'
,'read.streaming.enabled' = 'true'
,'hoodie.datasource.query.type' = 'snapshot'
,'read.streaming.start-commit' = '20210101000000'
,'read.streaming.check-interval' = '30'
,'hoodie.datasource.merge.type' = 'payload_combine'
,'read.utc-timezone' = 'false'
);
insert into ods_user_info_3
select cast(id as string) dl_uuid
,id
,name
,sex
,age
,birthday
,now() etl_create_time
,now() etl_update_time
,date_format(now(), 'yyyy/MM/dd') -- only support partition format
from kafka_ods_user_info;
```
## 写入测试数据
```sh
cho $message | /opt/kafka2.2/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic_1
```
## hdfs 查看数据```sh
hadoop fs -ls /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/
Found 4 items
-rw-r--r-- 1 wuxu supergroup 115647 2021-07-18 18:09 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_20210718175258.log.1_0-1-0
-rw-r--r-- 1 wuxu supergroup 93 2021-07-18 17:05 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.hoodie_partition_metadata
-rw-r--r-- 1 wuxu supergroup 436892 2021-07-18 17:20 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/9a29cbb2-b78c-4f32-a71e-36f975617ed0_0-1-0_20210718171958.parquet
-rw-r--r-- 1 wuxu supergroup 461463 2021-07-18 17:53 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718175258.parquet
```
## hive 查询数据
```sh
hive> select * from ods_user_info_rt limit 10;
OK
_hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name dl_uuid id name sex age birthday etl_create_time etl_update_time partition
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
20210718171958 20210718171958_0_1 4970 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4970 4970 zhangsan_4970 male_4970 18 2020-01-01 1626599814075 1626599814075 2021-07-18
20210718171958 20210718171958_0_2 4850 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4850 4850 zhangsan_4850 male_4850 18 2020-01-01 1626599551180 1626599551180 2021-07-18
20210718171958 20210718171958_0_3 4971 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4971 4971 zhangsan_4971 male_4971 18 2020-01-01 1626599816273 1626599816273 2021-07-18
20210718171958 20210718171958_0_4 4727 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4727 4727 zhangsan_4727 male_4727 18 2020-01-01 1626599281780 1626599281780 2021-07-18
20210718171958 20210718171958_0_5 4848 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4848 4848 zhangsan_4848 male_4848 18 2020-01-01 1626599546853 1626599546853 2021-07-18
20210718171958 20210718171958_0_6 4969 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4969 4969 zhangsan_4969 male_4969 18 2020-01-01 1626599811859 1626599811859 2021-07-18
20210718171958 20210718171958_0_7 4728 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4728 4728 zhangsan_4728 male_4728 18 2020-01-01 1626599284072 1626599284072 2021-07-18
20210718171958 20210718171958_0_8 4849 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4849 4849 zhangsan_4849 male_4849 18 2020-01-01 1626599548969 1626599548969 2021-07-18
20210718171958 20210718171958_0_9 4729 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4729 4729 zhangsan_4729 male_4729 18 2020-01-01 1626599286234 1626599286234 2021-07-18
20210718171958 20210718171958_0_10 4840 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4840 4840 zhangsan_4840 male_4840 18 2020-01-01 1626599529532 1626599529532 2021-07-18
Time taken: 1.46 seconds, Fetched: 10 row(s)
```
参考文档: