1. 背景
在FlinkSQL关联时,必然会涉及到维表,维表又可能是不断变化的(aka 时态表 或 版本表)。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
2. FlinkSQL中时态表类型
2.1 Debezium
2.2 a compacted Kafka topic
bin/kafka-topics.sh --alter --topic my_topic_name --zookeeper my_zookeeper:2181 --config cleanup.policy=compact
tableEnv.executeSql(
"CREATE TABLE dim_source (" +
" id STRING," +
" name STRING," +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time, " +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-kafka'," +
" 'topic' = 'flinksqldim'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'key.format' = 'json'," +
" 'value.format' = 'json')"
);
对应的kafka topic生产者代码
// 创建消息
// DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn");
for (int i = 1; i < 5; i++) {
JSONObject json1 = new JSONObject();
json1.put("key", i+"");
//json.put("update_time", dtf.format(LocalDateTime.now()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name222"+i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"flinksqldim",
json1.toJSONString(),
json.toJSONString()
);
// 发送消息
Future<RecordMetadata> future = producer.send(record);
}
3. 遇到的问题
3.1 kafka支持的METADATA
3.2 key.format
必须指定key.format
,可以不指定'scan.startup.mode' = 'earliest-offset'
4. 引用
Flink-1.12 - 之Flink SQL 与 kafka connector实践
Flink动态表和时态表总结
基于 FLINK SQL 的实时数据打宽的三种方式
将某一个主题设置为compacted topic
kafka支持的METADATA类型