使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)

Flink从1.13版本开始支持在SQL Client从savepoint恢复作业。flink-savepoint介绍

接下来我们从Flink SQL Client构建一个mysql cdc数据经kafka入hudi数据湖的例子。整体流程如下:

使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)

在上述第二步中,我们通过手工停止kafka→hudi的Flink任务,然后在Flink SQL Client从savepoint进行恢复。

下述工作类似于Flink SQL Client实战CDC数据入湖只是本文的flink版本为1.13.1,可参考其完成本文验证。

环境依赖

hadoop 3.2.0

zookeeper 3.6.3

kafka 2.8.0

mysql 5.7.35

flink 1.13.1-scala_2.12

flink cdc 1.4

hudi 0.10.0-SNAPSHOT (最新master分支编译 [2021/10/08])

datafaker 0.7.6

各组件说明见Flink SQL Client实战CDC数据入湖

组件说明、安装以及使用可直接在上述文章页面右上角搜索框使用关键词进行搜索。

操作指南

使用datafaker将测试数据导入mysql

  1. 在数据库中新建stu8表
mysql -u root -p

create database test;
use test;
create table stu8 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;
  1. 新建meta.txt文件,文件内容为:
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
  1. 生成1000000条数据并写入到mysql中的test.stu8表(将数据设置尽量大,让写入hudi的任务能够不断进行)
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu8 1000000 --meta meta.txt 

hudi、flink-mysql-cdc、flink-kafka相关jar包下载

本文提供编译好的hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar,如果你想自己编译hudi那么直接clone master分支进行编译即可。(注意指定hadoop版本)

将jar包下载到flink的lib目录下

cd flink-1.13.1/lib
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/flink/flink-sql-client-savepoint-example/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar
wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jar

如果你在启动以及运行flink任务中遇到缺少某些类问题,请下载相关jar包并放置到flink-1.13.1/lib目录下,本实验在操作过程中遇到的缺少的包如下(点击可下载):

在yarn上启动flink session集群

首先确保已经配置好HADOOP_CLASSPATH,对于开源版本hadoop3.2.0,可通过如下方式设置:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*

flink需要开启checkpoint,和配置savepoint目录,修改flink-conf.yaml配置文件

execution.checkpointing.interval: 150000ms
state.backend: rocksdb
state.checkpoints.dir: hdfs://hadoop:9000/flink-chk
state.backend.rocksdb.localdir: /tmp/rocksdb
state.savepoints.dir: hdfs://hadoop:9000/flink-1.13-savepoints

启动flink session集群

cd flink-1.13.1
bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -d

启动flink sql client

cd flink-1.13.1
bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar shell

flink读取mysql binlog并写入kafka

创建mysql源表

create table stu8_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'test',
  'table-name' = 'stu8'
);

创建kafka目标表

create table stu8_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'kafka'
  ,'topic' = 'cdc_mysql_test_stu8_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'format' = 'debezium-json'
);

创建任务将mysql binlog日志写入kafka

insert into stu8_binlog_sink_kafka
select * from stu8_binlog;

flink读取kafka数据并写入hudi数据湖

创建kafka源表

create table stu8_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_test_stu8_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
  );

创建hudi目标表

 create table stu8_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/test_stu8_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

创建任务将kafka数据写入到hudi中

insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;

待任务运行一段时间后,我们手动保存hudi作业并停止任务

bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001

得到该任务的具体savepoint路径:

[root@hadoop flink]# bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Suspending job "0128b183276022367e15b017cb682d61" with a savepoint.
2021-10-07 22:55:51,317 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/data/flink-1.13.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-10-07 22:55:51,364 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at hadoop/192.168.241.128:8032
2021-10-07 22:55:51,504 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-10-07 22:55:51,506 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-10-07 22:55:51,567 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop:34681 of application 'application_1633660054258_0001'.
Savepoint completed. Path: hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb

从savepoint恢复任务:(在Flink SQL Client执行)

 SET execution.savepoint.path=hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb
 
 insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;

可以看到该任务从上述检查点恢复:

使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)

最新hudi基础入门、实战经验、进阶提升、经典案例,请参考:

hudi | 从大数据到人工智能

使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)
使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)

上一篇:【笔记6】用pandas实现条目数据格式的推荐算法 (基于物品的协同)


下一篇:【笔记5】用pandas实现矩阵数据格式的推荐算法 (基于物品的协同)