来源:https://mp.weixin.qq.com/s/Tb8GtabOVBvx88de0C4ncw
使用案例
本文以实时地统计网页PV和UV的总量为例,介绍upsert-kafka基本使用方式:
- Kafka 数据源
用户的ippv信息,一个用户在一天内可以有很多次pv
CREATE TABLE source_ods_fact_user_ippv ( user_id STRING, -- 用户ID client_ip STRING, -- 客户端IP client_info STRING, -- 设备机型信息 pagecode STRING, -- 页面代码 access_time TIMESTAMP, -- 请求时间 dt STRING, -- 时间分区天 WATERMARK FOR access_time AS access_time - INTERVAL ‘5‘ SECOND -- 定义watermark ) WITH ( ‘connector‘ = ‘kafka‘, -- 使用 kafka connector ‘topic‘ = ‘user_ippv‘, -- kafka主题 ‘scan.startup.mode‘ = ‘earliest-offset‘, -- 偏移量 ‘properties.group.id‘ = ‘group1‘, -- 消费者组 ‘properties.bootstrap.servers‘ = ‘kms-2:9092,kms-3:9092,kms-4:9092‘, ‘format‘ = ‘json‘, -- 数据源格式为json ‘json.fail-on-missing-field‘ = ‘false‘, ‘json.ignore-parse-errors‘ = ‘true‘ );
- Kafka Sink表
统计每分钟的PV、UV,并将结果存储在Kafka中
CREATE TABLE result_total_pvuv_min ( do_date STRING, -- 统计日期 do_min STRING, -- 统计分钟 pv BIGINT, -- 点击量 uv BIGINT, -- 一天内同个访客多次访问仅计算一个UV currenttime TIMESTAMP, -- 当前时间 PRIMARY KEY (do_date, do_min) NOT ENFORCED ) WITH ( ‘connector‘ = ‘upsert-kafka‘, ‘topic‘ = ‘result_total_pvuv_min‘, ‘properties.bootstrap.servers‘ = ‘kms-2:9092,kms-3:9092,kms-4:9092‘, ‘key.json.ignore-parse-errors‘ = ‘true‘, ‘value.json.fail-on-missing-field‘ = ‘false‘, ‘key.format‘ = ‘json‘, ‘value.format‘ = ‘json‘, ‘value.fields-include‘ = ‘EXCEPT_KEY‘ -- key不出现kafka消息的value中 );
计算逻辑
-- 创建视图 CREATE VIEW view_total_pvuv_min AS SELECT dt AS do_date, -- 时间分区 count (client_ip) AS pv, -- 客户端的IP count (DISTINCT client_ip) AS uv, -- 客户端去重 max(access_time) AS access_time -- 请求的时间 FROM source_ods_fact_user_ippv GROUP BY dt; -- 写入数据 INSERT INTO result_total_pvuv_min SELECT do_date, -- 时间分区 cast(DATE_FORMAT (access_time,‘HH:mm‘) AS STRING) AS do_min,-- 分钟级别的时间 pv, uv, CURRENT_TIMESTAMP AS currenttime -- 当前时间 from view_total_pvuv_min;
生产用户访问数据到kafka,向kafka中的user_ippv插入数据
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:32:24","dt":"2021-01-08"} {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-08 11:32:55","dt":"2021-01-08"} {"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031","access_time":"2021-01-08 11:32:59","dt":"2021-01-08"} {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-08 11:33:24","dt":"2021-01-08"} {"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001","access_time":"2021-01-08 11:33:30","dt":"2021-01-08"} {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:34:24","dt":"2021-01-08"}
查询结果表:
select * from result_total_pvuv_min;
可以看出:每分钟的pv、uv只显示一条数据,即代表着截止到当前时间点的pv和uv
查看Kafka中result_total_pvuv_min主题的数据,如下:
可以看出:针对每一条访问数据,触发计算了一次PV、UV,每一条数据都是截止到当前时间的累计PV和UV。
尖叫提示:
默认情况下,如果在启用了检查点的情况下执行查询,Upsert Kafka接收器会将具有至少一次保证的数据提取到Kafka主题中。
这意味着,Flink可能会将具有相同键的重复记录写入Kafka主题。但是,由于连接器在upsert模式下工作,因此作为源读回时,同一键上的最后一条记录将生效。因此,upsert-kafka连接器就像HBase接收器一样实现幂等写入。
总结
本文以Flink1.12为例,介绍了upsert-kafka的基本使用,该方式允许用户以upsert 的方式读写Kafka中的表,使用起来非常方便。另外本文也给出了一个具体的使用案例,可以进一步加深对该功能的使用
Flink实例(117):flink-sql使用(二十三)以upsert的方式读写Kafka数据——以Flink1.12为例(二)