#### 什么是CDC
CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
#### CDC的种类
||基于查询的CDC |基于Binlog的CDC|
|开源产品| Sqoop、Kafka JDBC Source|Canal、Maxwell、Debezium|
|执行模式| Batch| Streaming|
|是否可以捕获所有数据变化| 否 |是|
|延迟性| 高延迟| 低延迟|
|是否增加数据库压力 |是 |否|
#### Flink-CDC
Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取**全量数据**和**增量变更数据**的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
#### 主要代码
//2.1 开启Checkpoint,每隔5秒钟做一次CK
//2.2 指定CK的一致性语义
//2.3 设置任务关闭的时候保留最后一次CK数据
//2.4 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
//2.6 设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "bigdata");
Properties properties = new Properties();
//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
properties.setProperty("scan.startup.mode", "initial");
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.tableList("test.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据
.deserializer(new StringDebeziumDeserializationSchema())
[bigdata@hadoop102 flink-standalone]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save
#### FlinkSQL方式的应用
tableEnv.executeSql("CREATE TABLE user_info (" +
" id INT," +
" name STRING," +
" phone_num STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'hadoop102'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '000000'," +
" 'database-name' = 'test'," +
" 'table-name' = 'z_user_info'" +
tableEnv.executeSql("select * from user_info").print();
#### 自定义反序列化器
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
properties.setProperty("debezium.snapshot.mode", "initial");
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.tableList("test.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
.deserializer(new DebeziumDeserializationSchema<String>() { //自定义数据解析器
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//获取主题信息,包含着数据库和表名 mysql_binlog_source.test.z_user_info
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
//3.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);