FlinkCDC
因为一直没有一个详尽的解读我就自己做一篇
·https://github.com/apache/flink·
官网
·https://github.com/ververica/flink-cdc-connectors·
官方论坛
·Flink SQL CDC 实践以及一致性分析·2021-03-10 ·https://mp.weixin.qq.com/s/tE70jJO6pZTe6oB0fKcZkQ·
·Flink 如何实时分析 Iceberg 数据湖的 CDC 数据·2021-02-23 ·https://mp.weixin.qq.com/s/18ZA_DuAyvafl3k9lhJnVA·
·基于 Flink SQL CDC 的实时数据同步方案·2020-11-02 ·https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q·
·Flink SQL CDC 上线!我们总结了 13 条生产实践经验·2020-09-04 ·https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ·
·Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据·2020-08-28 ·https://mp.weixin.qq.com/s/HaSi4E1Ez4jV06RWAQ2wAQ·
WIKI
·https://github.com/ververica/flink-cdc-connectors/wiki/Downloads·
·https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B·
简介
Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。
特点
1 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
2 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
3 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。
MySQL CDC
Postgres CDC
Database | Version |
---|---|
MySQL | Database: 5.7, 8.0.x JDBC Driver: 8.0.16 |
PostgreSQL | Database: 9.6, 10, 11, 12 JDBC Driver: 42.2.12 |
Kafka的Connector
canal-json
debezium-json
changelog-json(这个本身带墓碑)
的format......
MySQL CDC
·https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector·
表/ SQL API的用法
-- 创建一个mysql cdc表源
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
-- 从mysql读取快照和binlog数据,并进行一些转换,并在客户端上显示
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
DataStream API的用法
<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory") // 监视下库存数据库中的所有表
.username("flinkuser")
.password("flinkpw")
.deserializer(new StringDebeziumDeserializationSchema()) // 将SourceRecord转换为String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); //对接收器使用并行度1以保持消息顺序
env.execute();
}
}
具体配置见·https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#startup-reading-position·
Postgres CDC
具体配置见·https://github.com/ververica/flink-cdc-connectors/wiki/Postgres-CDC-Connector·
KAFKA Connectors
SQL解析使用Changelog JSON格式
·https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/json/·
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.0</version>
</dependency>
下载flink-format-changelog-json-1.1.3.jar并将其放在下<FLINK_HOME>/lib/
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',-- 使用kafka连接器
'topic' = 'user_behavior',-- kafka topic
'properties.bootstrap.servers' = 'localhost:9092',-- kafka代理地址
'properties.group.id' = 'testGroup',-- 消费者足
'format' = 'json',-- 数据格式为json
'json.fail-on-missing-field' = 'false',-- 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
'json.ignore-parse-errors' = 'true'-- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
......
)
Format 参数
参数 | 是否必须 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format |
必选 | (none) | String | 声明使用的格式,这里应为'json' 。 |
json.fail-on-missing-field |
可选 | false | Boolean | 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。 |
json.ignore-parse-errors |
可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 。 |
json.timestamp-format.standard |
可选 | 'SQL' |
String | 声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为'SQL' 以及 'ISO-8601' :
|
json.map-null-key.mode |
选填 | 'FAIL' |
String | 指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL' , 'DROP' 和 'LITERAL' :
|
json.map-null-key.literal |
选填 | 'null' | String | 当 'json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
json.encode.decimal-as-plain-number |
选填 | false | Boolean | 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8 。当此选项设为 true 时,则会表示为 0.000000027 。 |
数据类型映射关系
当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL 类型 | JSON 类型 |
---|---|
CHAR / VARCHAR / STRING |
string |
BOOLEAN |
boolean |
BINARY / VARBINARY |
string with encoding: base64 |
DECIMAL |
number |
TINYINT |
number |
SMALLINT |
number |
INT |
number |
BIGINT |
number |
FLOAT |
number |
DOUBLE |
number |
DATE |
string with format: date |
TIME |
string with format: time |
TIMESTAMP |
string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
string with format: date-time (with UTC time zone) |
INTERVAL |
number |
ARRAY |
array |
MAP / MULTISET |
object |
ROW |
object |
DEMO
-- 假设我们有一个user_behavior日志
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka', -- 使用kafka连接器
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- 从开头
'properties.bootstrap.servers' = 'localhost:9092', -- kafka代理地址
'format' = 'json' -- 数据格式为json
);
-- 我们要使用changelog-json格式将UV聚合结果存储在kafka中
create table day_uv (
day_str STRING,
uv BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'day_uv',
'scan.startup.mode' = 'earliest-offset', -- 从开头读取
'properties.bootstrap.servers' = 'localhost:9092', -- kafka代理地址
'format' = 'changelog-json' -- 数据格式为json
);
-- 使用更改日志-JSON格式写的UV结果转换成kafka
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');
-- 读回changelog
SELECT * FROM day_uv;
接下来博主会结合自己的使用经验分析SQLCDC和DataStreamAPICDC的使用场景 比较耗费精力 因为最近要上FLINK1.13和普罗米修斯及ZEPPLIN 博主也会将这三篇文章搞定
通常来讲博主自身的生产经验
通常 如果单表binlog那么就会使用FlinkSQL 方便
如果 多表(因为SQL字段已经固定但是多表间字段可能不同)甚至整个库及复杂binlog就会使用DataStreamAPI 需要自己json格式解析取需要的字段
CDC DEMO
博主要统计商品仓储数量 仓储字段有1(入仓)2(出仓)两种mark。
增删都好说。但是涉及到update,当且仅当非仓储字段update也会产生binlog 。
这时就要判断是否是仓储字段update,最好的办法就是从binlog before after判断是否是仓储字段update,如果是进行仓储数量计算,不是就不计算。
整个流程是 :启动流之前查询DB表统计仓储数量到state=>利用上面的DEMO对state进行实时+-。