本文基于Flink SQL与hudi构建准实时数仓,在Flink从kafka接入数据之后,即将所有数据存于hudi中,包括所有中间处理数据以及最终数据。文章《实时数仓|基于Flink1.11的SQL构建实时数仓探索实践 (qq.com)》描述了基于Flink SQL与kafka构建的实时数仓,本文以上述文章为基础。
在完成本文实践的同时可以同步参考上述文章。
最终结果:
背景介绍
本文电商业务为例,展示准实时数仓的数据处理流程。
组件与配置说明
Flink 1.13.3
flink cdc 2.0.2
hudi 0.10.0 (2021.12.08最新发布版本,地址:https://github.com/apache/hudi/releases/tag/release-0.10.0)
hadoop 3.2.0
zeppelin 0.10.0
mysql 5.7(开启binlog)
kafka 2.5.0
由于zeppelin的便捷性,本文全部基于zeppelin进行任务提交,如果您还不会用zeppelin,那么您可以参考:https://lrting-top.blog.csdn.net/article/details/120681666。当然,如果您不想用zeppelin,用Flink SQL Client提交也是完全没有问题的。
本实验Flink开启checkpoint,设置为60s。
在完成以下任务之前,请确保您已经
- 部署好Flink 1.13.3,并将hudi对应的Jar包已经正确打包并且放置到Flink的lib目录下,将flink cdc对应的jar包放置到lib目录下。
- 部署并启动zeppelin 0.10.0,在zeppelin的Flink interpreter上指定了FLINK_HOME以及HADOOP_CLASSPATH
- 同时还有启动hadoop、mysql、kafka
处理流程
MySQL建表与原始数据载入
首先从以下地址获取mysql建表语句以及模拟数据:
下载了上述建表语句之后,进入mysql,新建realtime_dw_demo_1,进入数据库 realtime_dw_demo_1 ,初始化数据库
mysql -u root -p
create database realtime_dw_demo_1;
use database realtime_dw_demo_1;
source realtime_table.sql
将mysql表数据同步到kafka
使用flink cdc将mysql数据同步到kafka中,以下为相关sql语句:
读取mysql源表数据
%flink.ssql
drop table if exists base_category1;
drop table if exists base_category2;
drop table if exists base_category3;
drop table if exists base_province;
drop table if exists base_region;
drop table if exists base_trademark;
drop table if exists date_info;
drop table if exists holiday_info;
drop table if exists holiday_year;
drop table if exists order_detail;
drop table if exists order_info;
drop table if exists order_status_log;
drop table if exists payment_info;
drop table if exists sku_info;
drop table if exists user_info;
---mysql table
CREATE TABLE `base_category1` (
`id` bigint NOT NULL,
`name` string NOT NULL,
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'base_category1'
);
CREATE TABLE `base_category2` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '二级分类名称',
`category1_id` bigint NULL COMMENT '一级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'base_category2'
);
CREATE TABLE `base_category3` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '三级分类名称',
`category2_id` bigint NULL COMMENT '二级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'base_category3'
);
CREATE TABLE `base_province` (
`id` int NULL COMMENT 'id',
`name` string NULL COMMENT '省名称',
`region_id` int NULL COMMENT '大区id',
`area_code` string NULL COMMENT '行政区位码',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'base_province'
);
CREATE TABLE `base_region` (
`id` int NOT NULL COMMENT '大区id',
`region_name` string NULL COMMENT '大区名称',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'base_region'
);
CREATE TABLE `base_trademark` (
`tm_id` string NULL COMMENT '品牌id',
`tm_name` string NULL COMMENT '品牌名称',
PRIMARY KEY (`tm_id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'base_trademark'
);
CREATE TABLE `date_info` (
`date_id` int NOT NULL,
`week_id` int NULL,
`week_day` int NULL,
`day` int NULL,
`month` int NULL,
`quarter` int NULL,
`year` int NULL,
`is_workday` int NULL,
`holiday_id` int NULL,
PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'date_info'
);
CREATE TABLE `holiday_info` (
`holiday_id` int NOT NULL,
`holiday_name` string NULL,
PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'holiday_info'
);
CREATE TABLE `holiday_year` (
`year_id` int NULL,
`holiday_id` int NULL,
`start_date_id` int NULL,
`end_date_id` int NULL,
PRIMARY KEY (`end_date_id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'holiday_year'
);
CREATE TABLE `order_detail` (
`id` bigint NOT NULL COMMENT '编号',
`order_id` bigint NULL COMMENT '订单编号',
`sku_id` bigint NULL COMMENT 'sku_id',
`sku_name` string NULL COMMENT 'sku名称(冗余)',
`img_url` string NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` string NULL COMMENT '购买个数',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'order_detail'
);
CREATE TABLE `order_info` (
`id` bigint NOT NULL COMMENT '编号',
`consignee` string NULL COMMENT '收货人',
`consignee_tel` string NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) NULL COMMENT '总金额',
`order_status` string NULL COMMENT '订单状态',
`user_id` bigint NULL COMMENT '用户id',
`payment_way` string NULL COMMENT '付款方式',
`delivery_address` string NULL COMMENT '送货地址',
`order_comment` string NULL COMMENT '订单备注',
`out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` string NULL COMMENT '订单描述(第三方支付用)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
`operate_time` timestamp(3) NULL COMMENT '操作时间',
`expire_time` timestamp(3) NULL COMMENT '失效时间',
`tracking_no` string NULL COMMENT '物流单编号',
`parent_order_id` bigint NULL COMMENT '父订单编号',
`img_url` string NULL COMMENT '图片路径',
`province_id` int NULL COMMENT '地区',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'order_info'
);
CREATE TABLE `order_status_log` (
`id` int NOT NULL,
`order_id` int NULL,
`order_status` int NULL,
`operate_time` timestamp(3) NULL,
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'order_status_log'
);
CREATE TABLE `payment_info` (
`id` bigint NOT NULL COMMENT '编号',
`out_trade_no` string NULL COMMENT '对外业务编号',
`order_id` string NULL COMMENT '订单编号',
`user_id` string NULL COMMENT '用户编号',
`alipay_trade_no` string NULL COMMENT '支付宝交易流水编号',
`total_amount` decimal(16,2) NULL COMMENT '支付金额',
`subject` string NULL COMMENT '交易内容',
`payment_type` string NULL COMMENT '支付方式',
`payment_time` string NULL COMMENT '支付时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'payment_info'
);
CREATE TABLE `sku_info` (
`id` bigint NOT NULL COMMENT 'skuid(itemID)',
`spu_id` bigint NULL COMMENT 'spuid',
`price` decimal(10,0) NULL COMMENT '价格',
`sku_name` string NULL COMMENT 'sku名称',
`sku_desc` string NULL COMMENT '商品规格描述',
`weight` decimal(10,2) NULL COMMENT '重量',
`tm_id` bigint NULL COMMENT '品牌(冗余)',
`category3_id` bigint NULL COMMENT '三级分类id(冗余)',
`sku_default_img` string NULL COMMENT '默认显示图片(冗余)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'sku_info'
);
CREATE TABLE `user_info` (
`id` bigint NOT NULL COMMENT '编号',
`login_name` string NULL COMMENT '用户名称',
`nick_name` string NULL COMMENT '用户昵称',
`passwd` string NULL COMMENT '用户密码',
`name` string NULL COMMENT '用户姓名',
`phone_num` string NULL COMMENT '手机号',
`email` string NULL COMMENT '邮箱',
`head_img` string NULL COMMENT '头像',
`user_level` string NULL COMMENT '用户级别',
`birthday` date NULL COMMENT '用户生日',
`gender` string NULL COMMENT '性别 M男,F女',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'mysql-cdc',
'hostname' = 'vhost-118-23',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'server-time-zone'= 'Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name' = 'realtime_dw_demo_1',
'table-name' = 'user_info'
);
kafka sink表建表语句
%flink.ssql
drop table if exists base_category1_topic;
drop table if exists base_category2_topic;
drop table if exists base_category3_topic;
drop table if exists base_province_topic;
drop table if exists base_region_topic;
drop table if exists base_trademark_topic;
drop table if exists date_info_topic;
drop table if exists holiday_info_topic;
drop table if exists holiday_year_topic;
drop table if exists order_detail_topic;
drop table if exists order_info_topic;
drop table if exists order_status_log_topic;
drop table if exists payment_info_topic;
drop table if exists sku_info_topic;
drop table if exists user_info_topic;
CREATE TABLE `base_category1_topic` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '分类名称',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_category1'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `base_category2_topic` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '二级分类名称',
`category1_id` bigint NULL COMMENT '一级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_category2'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `base_category3_topic` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '三级分类名称',
`category2_id` bigint NULL COMMENT '二级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_category3'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `base_province_topic` (
`id` int NULL COMMENT 'id',
`name` string NULL COMMENT '省名称',
`region_id` int NULL COMMENT '大区id',
`area_code` string NULL COMMENT '行政区位码'
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_province'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `base_region_topic` (
`id` int NOT NULL COMMENT '大区id',
`region_name` string NULL COMMENT '大区名称',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_region'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `base_trademark_topic` (
`tm_id` string NULL COMMENT '品牌id',
`tm_name` string NULL COMMENT '品牌名称',
PRIMARY KEY (`tm_id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_trademark'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `date_info_topic` (
`date_id` int NOT NULL,
`week_id` int NULL,
`week_day` int NULL,
`day` int NULL,
`month` int NULL,
`quarter` int NULL,
`year` int NULL,
`is_workday` int NULL,
`holiday_id` int NULL,
PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.date_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `holiday_info_topic` (
`holiday_id` int NOT NULL,
`holiday_name` string NULL,
PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.holiday_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `holiday_year_topic` (
`year_id` int NULL,
`holiday_id` int NULL,
`start_date_id` int NULL,
`end_date_id` int NULL
)
with(
'connector' = 'kafka'
,'topic' = 'my5.holiday_year'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `order_detail_topic` (
`id` bigint NOT NULL COMMENT '编号',
`order_id` bigint NULL COMMENT '订单编号',
`sku_id` bigint NULL COMMENT 'sku_id',
`sku_name` string NULL COMMENT 'sku名称(冗余)',
`img_url` string NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` string NULL COMMENT '购买个数',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.order_detail'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `order_info_topic` (
`id` bigint NOT NULL COMMENT '编号',
`consignee` string NULL COMMENT '收货人',
`consignee_tel` string NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) NULL COMMENT '总金额',
`order_status` string NULL COMMENT '订单状态',
`user_id` bigint NULL COMMENT '用户id',
`payment_way` string NULL COMMENT '付款方式',
`delivery_address` string NULL COMMENT '送货地址',
`order_comment` string NULL COMMENT '订单备注',
`out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` string NULL COMMENT '订单描述(第三方支付用)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
`operate_time` timestamp(3) NULL COMMENT '操作时间',
`expire_time` timestamp(3) NULL COMMENT '失效时间',
`tracking_no` string NULL COMMENT '物流单编号',
`parent_order_id` bigint NULL COMMENT '父订单编号',
`img_url` string NULL COMMENT '图片路径',
`province_id` int NULL COMMENT '地区',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.order_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `order_status_log_topic` (
`id` int NOT NULL ,
`order_id` int NULL,
`order_status` int NULL,
`operate_time` timestamp(3) NULL,
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.order_status_log'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `payment_info_topic` (
`id` bigint NOT NULL COMMENT '编号',
`out_trade_no` string NULL COMMENT '对外业务编号',
`order_id` string NULL COMMENT '订单编号',
`user_id` string NULL COMMENT '用户编号',
`alipay_trade_no` string NULL COMMENT '支付宝交易流水编号',
`total_amount` decimal(16,2) NULL COMMENT '支付金额',
`subject` string NULL COMMENT '交易内容',
`payment_type` string NULL COMMENT '支付方式',
`payment_time` string NULL COMMENT '支付时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.payment_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `sku_info_topic` (
`id` bigint NOT NULL COMMENT 'skuid(itemID)',
`spu_id` bigint NULL COMMENT 'spuid',
`price` decimal(10,0) NULL COMMENT '价格',
`sku_name` string NULL COMMENT 'sku名称',
`sku_desc` string NULL COMMENT '商品规格描述',
`weight` decimal(10,2) NULL COMMENT '重量',
`tm_id` bigint NULL COMMENT '品牌(冗余)',
`category3_id` bigint NULL COMMENT '三级分类id(冗余)',
`sku_default_img` string NULL COMMENT '默认显示图片(冗余)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.sku_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
CREATE TABLE `user_info_topic` (
`id` bigint NOT NULL COMMENT '编号',
`login_name` string NULL COMMENT '用户名称',
`nick_name` string NULL COMMENT '用户昵称',
`passwd` string NULL COMMENT '用户密码',
`name` string NULL COMMENT '用户姓名',
`phone_num` string NULL COMMENT '手机号',
`email` string NULL COMMENT '邮箱',
`head_img` string NULL COMMENT '头像',
`user_level` string NULL COMMENT '用户级别',
`birthday` date NULL COMMENT '用户生日',
`gender` varchar(1) NULL COMMENT '性别 M男,F女',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.user_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
);
insert语句,将mysql binlog数据导入kafka对应的topic
%flink.ssql(runAsOne=true)
insert into base_category1_topic select * from base_category1;
insert into base_category2_topic select * from base_category2;
insert into base_category3_topic select * from base_category3;
insert into base_province_topic select * from base_province;
insert into base_region_topic select * from base_region;
insert into base_trademark_topic select * from base_trademark;
insert into date_info_topic select * from date_info;
insert into holiday_info_topic select * from holiday_info;
insert into holiday_year_topic select * from holiday_year;
insert into order_detail_topic select * from order_detail;
insert into order_info_topic select * from order_info;
insert into order_status_log_topic select * from order_status_log;
insert into payment_info_topic select * from payment_info;
insert into sku_info_topic select * from sku_info;
insert into user_info_topic select * from user_info;
将维表数据导入hudi
将my5.base_province和my1.base_region两张区域维表信息写入hudi COW表中
%flink.ssql
drop table if exists base_province_topic_source;
drop table if exists base_province_hudi;
CREATE TABLE `base_province_topic_source` (
`id` int NULL COMMENT 'id',
`name` string NULL COMMENT '省名称',
`region_id` int NULL COMMENT '大区id',
`area_code` string NULL COMMENT '行政区位码'
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_province'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `base_province_hudi` (
`id` int NULL COMMENT 'id',
`name` string NULL COMMENT '省名称',
`region_id` int NULL COMMENT '大区id',
`area_code` string NULL COMMENT '行政区位码',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/base_province_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql
drop table if exists base_region_topic_source;
drop table if exists base_region_hudi;
CREATE TABLE `base_region_topic_source` (
`id` int NOT NULL COMMENT '大区id',
`region_name` string NULL COMMENT '大区名称',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_region'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `base_region_hudi` (
`id` int NOT NULL COMMENT '大区id',
`region_name` string NULL COMMENT '大区名称',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/base_region_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_region_hudi select * from base_region_topic_source;
使用上述两张维表创建dim_province表
%flink.ssql
DROP TABLE IF EXISTS dim_province_hudi;
create table dim_province_hudi (
province_id INT,
province_name STRING,
area_code STRING,
region_id INT,
region_name STRING ,
PRIMARY KEY (province_id) NOT ENFORCED
) with (
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/dim_province_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'province_id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into dim_province_hudi
SELECT
bp.id AS province_id,
bp.name AS province_name,
bp.area_code AS area_code,
br.id AS region_id,
br.region_name AS region_name
FROM base_region_hudi br
JOIN base_province_hudi bp ON br.id= bp.region_id
;
将商品维表my5.base_category1和my5.base_category2两张商品维表信息写入hudi COW表
%flink.ssql
drop table if exists base_category1_topic_source;
drop table if exists base_category1_hudi;
CREATE TABLE `base_category1_topic_source` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '分类名称',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_category1'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `base_category1_hudi` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '分类名称',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/base_category1_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql
drop table if exists base_category2_topic_source;
drop table if exists base_category2_hudi;
CREATE TABLE `base_category2_topic_source` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '分类名称',
`category1_id` bigint NULL COMMENT '一级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_category2'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `base_category2_hudi` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '分类名称',
`category1_id` bigint NULL COMMENT '一级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/base_category2_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql
drop table if exists base_category3_topic_source;
drop table if exists base_category3_hudi;
CREATE TABLE `base_category3_topic_source` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '分类名称',
`category2_id` bigint NULL COMMENT '二级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.base_category3'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `base_category3_hudi` (
`id` bigint NOT NULL COMMENT '编号',
`name` string NOT NULL COMMENT '分类名称',
`category2_id` bigint NULL COMMENT '二级分类编号',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/base_category3_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_category3_hudi select * from base_category3_topic_source;
将商品表导入hudi
%flink.ssql
drop table if exists sku_info_topic_source;
drop table if exists sku_info_topic_hudi;
CREATE TABLE `sku_info_topic_source` (
`id` bigint NOT NULL COMMENT 'skuid(itemID)',
`spu_id` bigint NULL COMMENT 'spuid',
`price` decimal(10,0) NULL COMMENT '价格',
`sku_name` string NULL COMMENT 'sku名称',
`sku_desc` string NULL COMMENT '商品规格描述',
`weight` decimal(10,2) NULL COMMENT '重量',
`tm_id` bigint NULL COMMENT '品牌(冗余)',
`category3_id` bigint NULL COMMENT '三级分类id(冗余)',
`sku_default_img` string NULL COMMENT '默认显示图片(冗余)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.sku_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `sku_info_topic_hudi` (
`id` bigint NOT NULL COMMENT 'skuid(itemID)',
`spu_id` bigint NULL COMMENT 'spuid',
`price` decimal(10,0) NULL COMMENT '价格',
`sku_name` string NULL COMMENT 'sku名称',
`sku_desc` string NULL COMMENT '商品规格描述',
`weight` decimal(10,2) NULL COMMENT '重量',
`tm_id` bigint NULL COMMENT '品牌(冗余)',
`category3_id` bigint NULL COMMENT '三级分类id(冗余)',
`sku_default_img` string NULL COMMENT '默认显示图片(冗余)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/sku_info_topic_hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into sku_info_topic_hudi select * from sku_info_topic_source;
基于上述步骤,我们把商品维表的基础数据同步到hudi中,同样我们使用商品维表创建dim_sku_info视图
%flink.ssql
drop view if exists dim_sku_info;
CREATE VIEW dim_sku_info AS
SELECT
si.id AS id,
si.sku_name AS sku_name,
si.category3_id AS c3_id,
si.weight AS weight,
si.tm_id AS tm_id,
si.price AS price,
si.spu_id AS spu_id,
c3.name AS c3_name,
c2.id AS c2_id,
c2.name AS c2_name,
c3.id AS c1_id,
c3.name AS c1_name
FROM
sku_info_topic_hudi si
JOIN base_category3_hudi c3 ON si.category3_id = c3.id
JOIN base_category2_hudi c2 ON c3.category2_id =c2.id
JOIN base_category1_hudi c1 ON c2.category1_id = c1.id
;
DWD层数据处理
经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。
%flink.ssql
drop table if exists ods_order_detail_topic;
drop table if exists ods_order_info_topic;
drop table if exists dwd_paid_order_detail_hudi;
CREATE TABLE `ods_order_detail_topic` (
`id` bigint NOT NULL COMMENT '编号',
`order_id` bigint NULL COMMENT '订单编号',
`sku_id` bigint NULL COMMENT 'sku_id',
`sku_name` string NULL COMMENT 'sku名称(冗余)',
`img_url` string NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` int NULL COMMENT '购买个数',
`create_time` timestamp(3) NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.order_detail'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE `ods_order_info_topic` (
`id` bigint NOT NULL COMMENT '编号',
`consignee` string NULL COMMENT '收货人',
`consignee_tel` string NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) NULL COMMENT '总金额',
`order_status` string NULL COMMENT '订单状态',
`user_id` bigint NULL COMMENT '用户id',
`payment_way` string NULL COMMENT '付款方式',
`delivery_address` string NULL COMMENT '送货地址',
`order_comment` string NULL COMMENT '订单备注',
`out_trade_no` string NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` string NULL COMMENT '订单描述(第三方支付用)',
`create_time` timestamp(3) NULL COMMENT '创建时间',
`operate_time` timestamp(3) NULL COMMENT '操作时间',
`expire_time` timestamp(3) NULL COMMENT '失效时间',
`tracking_no` string NULL COMMENT '物流单编号',
`parent_order_id` bigint NULL COMMENT '父订单编号',
`img_url` string NULL COMMENT '图片路径',
`province_id` int NULL COMMENT '地区',
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector' = 'kafka'
,'topic' = 'my5.order_info'
,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
,'format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'properties.group.id' = 'hudiGroup'
);
CREATE TABLE dwd_paid_order_detail_hudi
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,0),
create_time TIMESTAMP(3),
pay_time TIMESTAMP(3),
primary key (detail_id) not enforced
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/dwd_paid_order_detail_hudi',
'scan.startup.mode' = 'earliest-offset',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into dwd_paid_order_detail_hudi
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM ods_order_info_topic
WHERE order_status = '2' -- 已支付
) oi JOIN
(
SELECT *
FROM ods_order_detail_topic
) od
ON oi.id = od.order_id;
ADS层数据
经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了hudi中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。
ads_province_index_hudi
%flink.ssql
drop table if exists ads_province_index_hudi;
drop table if exists tmp_province_index_hudi;
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
-- 2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index_hudi(
province_id INT,
area_code STRING,
province_name STRING,
region_id INT,
region_name STRING,
order_amount DECIMAL(10,2),
order_count BIGINT,
dt STRING,
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/ads_province_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------
CREATE TABLE tmp_province_index_hudi(
province_id INT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
pay_date DATE,
primary key(province_id) not enforced
)WITH (
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_province_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
%flink.ssql
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index_hudi
SELECT
province_id,
count(distinct order_id) order_count,-- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql
INSERT INTO ads_province_index_hudi
SELECT
pc.province_id,
dp.area_code,
dp.province_name,
dp.region_id,
dp.region_name,
pc.order_amount,
pc.order_count,
cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_hudi pc
JOIN dim_province_hudi as dp
ON dp.province_id = pc.province_id;
查看ADS层的ads_province_index_hudi表数据:
ads_sku_index_hudi
%flink.ssql
-- ---------------------------------
-- 使用 DDL创建hudi中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
-- 2.每天每个商品对应的订单金额
-- 3.每天每个商品对应的数量
-- ---------------------------------
drop table if exists ads_sku_index_hudi;
CREATE TABLE ads_sku_index_hudi
(
sku_id BIGINT,
sku_name VARCHAR,
weight DOUBLE,
tm_id BIGINT,
price DOUBLE,
spu_id BIGINT,
c3_id BIGINT,
c3_name VARCHAR ,
c2_id BIGINT,
c2_name VARCHAR,
c1_id BIGINT,
c1_name VARCHAR,
order_amount DOUBLE,
order_count BIGINT,
sku_count BIGINT,
dt varchar,
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) with (
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/ads_sku_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
drop table if exists tmp_sku_index_hudi;
CREATE TABLE tmp_sku_index_hudi(
sku_id BIGINT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
order_sku_num BIGINT,
pay_date DATE,
PRIMARY KEY (sku_id) NOT ENFORCED
)WITH (
'connector' = 'hudi',
'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_sku_index_hudi',
'table.type' = 'MERGE_ON_READ',
'compaction.async.enabled' = 'false',
'read.streaming.enabled' = 'true'
);
%flink.ssql
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index_hudi
SELECT
sku_id,
count(distinct order_id) order_count, -- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
sum(sku_num) order_sku_num,
TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql
INSERT INTO ads_sku_index_hudi
SELECT
sku_id ,
sku_name ,
weight ,
tm_id ,
price ,
spu_id ,
c3_id ,
c3_name,
c2_id ,
c2_name ,
c1_id ,
c1_name ,
sc.order_amount,
sc.order_count ,
sc.order_sku_num ,
cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_hudi sc
JOIN dim_sku_info ds
ON ds.id = sc.sku_id
%flink.ssql
select * from ads_sku_index_hudi;