总览
bulk_insert
用于快速导入快照数据到hudi。
基本特性
bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会跳过数据去重
,所以用户需要保证数据的唯一性。
bulk_insert在批量写入模式中是更加有效率的。默认情况下,批量执行模式按照分区路径对输入记录进行排序,并将这些记录写入Hudi,该方式可以避免频繁切换文件句柄导致的写性能下降。
bulk_insert的并行度有write.tasks
参数指定,并行度会影响小文件的数量。理论上来说,bulk_insert的并行度就是bucket的数量(特别是,当每个bucket写到最大文件大小时,它将转到新的文件句柄。最后,文件的数量将大于参数write.bucket.assign.tasks
指定的数量 )
可选配置参数
参数名称 | 是否必须 | 默认值 | 参数说明 |
---|---|---|---|
write.operation |
true |
upsert |
设置为 bulk_insert 以开启bulk_insert功能 |
write.tasks |
false |
4 |
bulk_insert 并行度, the number of files >= write.bucket_assign.tasks
|
write.bulk_insert.shuffle_by_partition |
false |
true |
写入前是否根据分区字段进行数据重分布。 启用此选项将减少小文件的数量,但可能存在数据倾斜的风险 |
write.bulk_insert.sort_by_partition |
false |
true |
写入前是否根据分区字段对数据进行排序。 启用此选项将在写任务写多个分区时减少小文件的数量 |
write.sort.memory |
false |
128 |
排序算子的可用托管内存。 默认为 128 MB |
Flink SQL实践
使用datafaker生成100000条数据,放到mysql数据库中的stu4表。
数据生成方式以及Flink SQL使用方法见Flink SQL Client实战CDC数据入湖
使用bulk_insert方式写入到hudi中。
Flink SQL client 创建myql数据源
create table stu4(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'Pass-123-root',
'table-name' = 'stu4'
);
创建hudi表
create table stu4_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu4_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'bulk_insert',
'write.precombine.field' = 'school'
);
mysql数据插入到hudi中
insert into stu4_sink_hudi select * from stu4;
执行时间
Index bootstrap
基本特性
该方式用于快照数据+增量数据的导入。如果快照数据已经通过bulk_insert导入到hudi,那么用户就可以近实时插入增量数据并且通过index bootstrap功能来确保数据不会重复。
温馨提示:
如果你觉得这个过程特别耗时,那么你在写快照数据的时候可以多设置计算资源,然后在插入增量数据时减少计算资源。
可选配置参数
参数名称 | 是否必须 | 默认值 | 参数说明 |
---|---|---|---|
index.bootstrap.enabled |
true |
false |
当启用index bootstrap功能时,会将Hudi表中的剩余记录一次性加载到Flink状态中 |
index.partition.regex |
false |
* |
优化参数,设置正则表达式来过滤分区。 默认情况下,所有分区都被加载到flink状态 |
使用方法
- CREATE TABLE创建一条与Hudi表对应的语句。 注意这个table.type配置必须正确。
- 设置
index.bootstrap.enabled = true
来启用index bootstrap功能 - 在flink-conf.yaml文件中设置Flink checkpoint的容错机制,设置配置项
execution.checkpointing.tolerable-failed-checkpoints = n
(取决于Flink checkpoint执行时间) - 等待直到第一个checkpoint成功,表明index bootstrap完成。
- 在index bootstrap完成后,用户可以退出并保存savepoint(或直接使用外部 checkpoint)。
- 重启任务,并且设置
index.bootstrap.enable
为false
温馨提示:
- 索引引导是一个阻塞过程,因此在索引引导期间无法完成checkpoint。
- index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。
- index bootstrap是并发执行的。用户可以在日志文件中通过
finish loading the index under partition
以及Load record form file
观察index bootstrap的进度。 - 第一个成功的checkpoint表明index bootstrap已完成。 从checkpoint恢复时,不需要再次加载索引。
Flink SQL实践
前提条件:
- 已有50w条数据已写入kafka,使用bulk_insert的方式将其导入hudi表。
- 再通过创建任务消费最新kafka数据,并开启index bootstrap特性。
如果未将数据导入kafka可使用Flink SQL Client实战CDC数据入湖文章提供的方法将数据导入kafka。
创建bulk_insert任务:
create table stu3_binlog_source_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'kafka',
'topic' = 'cdc_mysql_stu3_sink_test',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup_20210929_4'
);
create table stu3_binlog_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'bulk_insert',
'write.precombine.field' = 'school'
);
insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka;
创建开启index bootstrap特性、离线压缩任务。
create table stu3_binlog_source_kafka_1(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'kafka',
'topic' = 'cdc_mysql_stu3_sink_test',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'latest-offset',
'properties.group.id' = 'testGroup_20210929_4'
);
create table stu3_binlog_sink_hudi_1(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'upsert',
'write.tasks' = '4',
'write.precombine.field' = 'school',
'compaction.async.enabled' = 'false',
'index.bootstrap.enabled' = 'true'
);
insert into stu3_binlog_sink_hudi_1 select * from stu3_binlog_source_kafka_1;
提交离线压缩任务:
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.12-0.9.0.jar --path hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4
创建bulk_insert任务:
Changelog Mode
基本特性
Hudi可以保留消息的所有中间变化(I / -U / U / D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息,支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。
可选配置参数
参数名称 | 是否必须 | 默认值 | 参数说明 |
---|---|---|---|
changelog.enabled |
false |
false |
它在默认情况下是关闭的,为了拥有upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。 设置为true以支持使用所有更改 |
温馨提示:
不管格式是否存储了中间更改日志消息,批处理(快照)读取仍然合并所有中间更改。
在设置changelog.enable为true时,更新日志记录的保留只是最大的努力: 异步压缩任务将更新日志记录合并到一条记录中,因此如果流源不及时消费,则压缩后只能读取每个key的合并记录。 解决方案是通过调整压缩策略,比如压缩选项:compress.delta_commits和compression.delta_seconds,为读取器保留一些缓冲时间。
Insert Mode
基本特性
默认情况下,Hudi对插入模式采用小文件策略:MOR将增量记录追加到日志文件中,COW合并基本parquet文件(增量数据集将被重复数据删除)。 这种策略会导致性能下降。
如果要禁止文件合并行为,可将write.insert.deduplicate设置为false,则跳过重复数据删除。 每次刷新行为直接写入一个新的 parquet文件(MOR表也直接写入parquet文件)。
可选配置参数
参数名称 | 是否必须 | 默认值 | 参数说明 |
---|---|---|---|
write.insert.deduplicate |
false |
true |
“插入模式”默认启用重复数据删除功能。 关闭此选项后,每次刷新行为直接写入一个新的 parquet文件 |