canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据的订阅和消费。
学习地址:https://github.com/alibaba/canal
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括:
-
数据库镜像
-
数据库实时备份
-
索引构建和实时维护(拆分异构索引、倒排索引等)
-
业务 cache 刷新
-
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
1. Canal原理讲解
我们首先了解一下MySQL主从复制的原理:
-
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
-
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
-
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal 工作原理:
-
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump协议
-
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
-
canal 解析 binary log 对象(原始为 byte 流)
Canal获取增量数据后可以将数据推送给其他程序,比如:MQ中间件或者我们自定义的Java程序,从而进行一些我们特别的操作。
2. Canal安装
2.1 开启binlog
- 对于MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
**注意:**针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
- 重启mysql
service mysql restart
- 查看是否开启binlog(进入命令行)
show variables like 'log_bin';
2.2 Canal安装
-
下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz
或者去github下载
https://github.com/alibaba/canal/releases -
解压
mkdir /opt/canal tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/canal
-
config配置
conf/canal.properties canal 的通用配置, 主要关注下canal.port, 默认是11111
-
instance配置
conf/example/instance.properties instance.properties是针对要追踪的 mysql 的实例配置
2.3 启动
bin/startup.sh
2.4 查看日志
3. 实战
-
创建工程
自行创建一个spring boot 即可。
-
添加依赖
<!--springboot-canal快速构建依赖包--> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency>
-
配置文件
server: port: 8083 spring: application: name: canal #Canal配置 canal: server: localhost:11111 destination: example #日志配置 logging: pattern: console: "%msg%n" level: root: error
-
创建监听器
CanalTable
注解表示监听具体的那张表实现EntryHandler接口,然后重现其中的三个方法,看名字应该就可以知道大概意思了。
@Component @CanalTable(value = "ad_items") public class AdItemsHandler implements EntryHandler<AdItems> { @Override public void insert(AdItems adItems) { System.out.println("inset:" + adItems); } @Override public void update(AdItems before, AdItems after) { System.out.println("before:" + before); System.out.println("after:" + after); } @Override public void delete(AdItems adItems) { System.out.println("delete;" + adItems); } }
-
启动项目
-
我们可以自行在监听的数据库中操作数据,然后至控制台查看效果。