Flink实例(117):flink-sql使用(二十三)以upsert的方式读写Kafka数据——以Flink1.12为例(二)

来源:https://mp.weixin.qq.com/s/Tb8GtabOVBvx88de0C4ncw

使用案例

本文以实时地统计网页PV和UV的总量为例,介绍upsert-kafka基本使用方式:

  • Kafka 数据源

用户的ippv信息,一个用户在一天内可以有很多次pv

CREATE TABLE source_ods_fact_user_ippv (
    user_id      STRING,       -- 用户ID
    client_ip    STRING,       -- 客户端IP
    client_info  STRING,       -- 设备机型信息
    pagecode     STRING,       -- 页面代码
    access_time  TIMESTAMP,    -- 请求时间
    dt           STRING,       -- 时间分区天
    WATERMARK FOR access_time AS access_time - INTERVAL 5 SECOND  -- 定义watermark
) WITH (
   connector = kafka, -- 使用 kafka connector
    topic = user_ippv, -- kafka主题
    scan.startup.mode = earliest-offset, -- 偏移量
    properties.group.id = group1, -- 消费者组
    properties.bootstrap.servers = kms-2:9092,kms-3:9092,kms-4:9092, 
    format = json, -- 数据源格式为json
    json.fail-on-missing-field = false,
    json.ignore-parse-errors = true
);
  • Kafka Sink表

统计每分钟的PV、UV,并将结果存储在Kafka中

CREATE TABLE result_total_pvuv_min (
    do_date     STRING,     -- 统计日期
    do_min      STRING,      -- 统计分钟
    pv          BIGINT,     -- 点击量
    uv          BIGINT,     -- 一天内同个访客多次访问仅计算一个UV
    currenttime TIMESTAMP,  -- 当前时间
    PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
  connector = upsert-kafka,
  topic = result_total_pvuv_min,
  properties.bootstrap.servers = kms-2:9092,kms-3:9092,kms-4:9092,
  key.json.ignore-parse-errors = true,
  value.json.fail-on-missing-field = false,
  key.format = json,
  value.format = json,
  value.fields-include = EXCEPT_KEY -- key不出现kafka消息的value中
);

计算逻辑

-- 创建视图
CREATE VIEW view_total_pvuv_min AS
SELECT
     dt AS do_date,                    -- 时间分区
     count (client_ip) AS pv,          -- 客户端的IP
     count (DISTINCT client_ip) AS uv, -- 客户端去重
     max(access_time) AS access_time   -- 请求的时间
FROM
    source_ods_fact_user_ippv
GROUP BY dt;

-- 写入数据
INSERT INTO result_total_pvuv_min
SELECT
  do_date,    --  时间分区
  cast(DATE_FORMAT (access_time,HH:mm) AS STRING) AS do_min,-- 分钟级别的时间
  pv,
  uv,
  CURRENT_TIMESTAMP AS currenttime -- 当前时间
from
  view_total_pvuv_min;

生产用户访问数据到kafka,向kafka中的user_ippv插入数据

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:32:24","dt":"2021-01-08"}

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-08 11:32:55","dt":"2021-01-08"}

{"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031","access_time":"2021-01-08 11:32:59","dt":"2021-01-08"}

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-08 11:33:24","dt":"2021-01-08"}

{"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001","access_time":"2021-01-08 11:33:30","dt":"2021-01-08"}

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:34:24","dt":"2021-01-08"}

查询结果表:

select * from result_total_pvuv_min;

Flink实例(117):flink-sql使用(二十三)以upsert的方式读写Kafka数据——以Flink1.12为例(二)

 

 

可以看出:每分钟的pv、uv只显示一条数据,即代表着截止到当前时间点的pv和uv

查看Kafka中result_total_pvuv_min主题的数据,如下:

Flink实例(117):flink-sql使用(二十三)以upsert的方式读写Kafka数据——以Flink1.12为例(二)

 

 

可以看出:针对每一条访问数据,触发计算了一次PV、UV,每一条数据都是截止到当前时间的累计PV和UV。

尖叫提示:

默认情况下,如果在启用了检查点的情况下执行查询,Upsert Kafka接收器会将具有至少一次保证的数据提取到Kafka主题中。

这意味着,Flink可能会将具有相同键的重复记录写入Kafka主题。但是,由于连接器在upsert模式下工作,因此作为源读回时,同一键上的最后一条记录将生效。因此,upsert-kafka连接器就像HBase接收器一样实现幂等写入。

总结

本文以Flink1.12为例,介绍了upsert-kafka的基本使用,该方式允许用户以upsert 的方式读写Kafka中的表,使用起来非常方便。另外本文也给出了一个具体的使用案例,可以进一步加深对该功能的使用

 

Flink实例(117):flink-sql使用(二十三)以upsert的方式读写Kafka数据——以Flink1.12为例(二)

上一篇:MySQL创建数据库并支持中文字符


下一篇:mysql 存储过程 日常实例记录