一、FlinkX实时采集功能的基本介绍
首先为大家介绍下FlinkX实时模块的分类,如下图所示:
1、实时采集模块(CDC)
1)MySQL Binlog插件
利用阿里开源的Canal组件实时从MySQL中捕获变更数据。
2)PostgreSQL Wal插件
PostgreSQL 实时采集是基于 PostgreSQL的逻辑复制以及逻辑解码功能来完成的。逻辑复制同步数据的原理是,在Wal日志产生的数据库上,由逻辑解析模块对Wal日志进行初步的解析,它的解析结果为ReorderBufferChange(可以简单理解为HeapTupleData),再由Pgoutput Plugin对中间结果进行过滤和消息化拼接后,然后将其发送到订阅端,订阅端通过逻辑解码功能进行解析。
2、消息队列
1)Kafka:Kafka插件存在四个版本,根据Kafka版本的不同,插件名称也略有不同。具体对应关系如下表所示:
2)EMQX:EMQX 是一款完全开源,高度可伸缩,高可用的分布式MQTT消息服务器,适用于IoT、M2M 和移动应用程序,可处理千万级别的并发客户端。
3、间隔轮询
RDB类型插件的使用限制:
- 只有RDB类型的reader插件支持间隔轮询
- 轮询字段只能为数值类型或者时间类型
- 轮询字段只能为连续递增且不重复
4、其他
Hive插件: Hive插件只有写入插件,功能基于HDFS的写入插件实现,也就是说从实时采集插件读取,写入Hive也支持失败恢复的功能。
二、Binlog实时采集原理
1、什么是Binlog
MySQL 的二进制日志 Binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDL 和 DML 语句(除了数据查询语句Select、Show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的,Binlog 的主要目的是复制和恢复。
2、Binlog插件基本原理
实时采集插件的核心是如何实时捕获数据库数据的变更,对于MySQL数据库而言,阿里开源的Canal已经很好的帮我们实现了基于MySQL数据库增量日志解析,提供增量数据订阅和消费功能。因此这里我们直接用Canal捕获MySQL数据库数据的变更信息,基于FlinkX框架将任务简化成脚本的配置,基于Flink的Checkpoint机制提供了任务的故障恢复,提高了任务的容错性。
基本步骤如下:
- 任务启动时启动Canal线程
- Canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
- MySQL master收到dump请求,开始推送Binary Log给slave(即Canal)
- Canal解析Binary Log 对象(原始为Byte流)
- FlinkX获取Canal解析后的对象做二次解析,封装后发送至下游数据源
三、Binlog到Hive实战
1、环境准备:确认数据库开启了Binlog
show variables like '%log_bin%';
2、建表
CREATE TABLE `kudu` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `user_id` bigint(11) DEFAULT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4