目录
一、什么是Canal
在项目中,不知道小伙伴们是否有遇到如下一些业务场景:
1)、我们修改了商品的信息之后,需要同步商品信息到ElasticSearch中重新建立索引;
2)、数据库实时备份;
3)、根据数据库日志表,进行日志信息采集并分析;
拿到上述需求,或许我们首先能想到的就是定时调度去同步Mysql与ElasticSearch、备份MySQL数据库、间隔一段时间去查询日志表的数据,手动发送到Kafka等中间件中。当然是用定时调度也是一种实现方式,但是阿里开源了框架Canal,它可以很方便地同步数据库的增量数据,拿到这些增量数据,我们同样可以完成上述这些需求。本篇文章就将总结一下阿里巴巴Canal中间件的简单使用。
Canal,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发。
历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;
Canal的Github项目地址:http://github.com/alibaba/canal/
二、Canal的工作原理
传统Mysql主从复制原理:
MySQL主从复制将经过如下步骤:
- 1、当master主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
- 2、slave从服务器会在一定时间间隔内对master主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到master主服务器的二进制事件日志发生了改变,则开始一个I/O Thread请求master二进制日志文件;
- 3、同时master主服务器为每个I/O Thread启动一个dump Thread,用于向其发送二进制事件日志;
- 4、slave从服务器将接收到的二进制事件日志保存至自己本地的中继日志中;
- 5、slave从服务器将启动SQL Thread从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
- 6、最后I/O Thread和SQL Thread将进入睡眠状态,等待下一次被唤醒;
Canal工作原理:
- 1、Canal模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发送dump 协议;
- 2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即Canal);
- 3、Canal解析 binary log 对象 (原始数据为byte流);
整体流程图如下所示:
三、Canal应用用场景
Canal是基于MySQL变更日志增量订阅和消费的组件,当前Canal主要是支持 MySQL,可以使用在如下一些应用场景:
- 数据库实时备份;
- 缓存刷新;
- 搜索构建,比如es索引重建;
- 价格变化等重要业务消息,比如商品价格发生变化,我们监听到发送消息通知等;
- 跨数据库的数据备份(异构数据同步),例如mysql => oracle,mysql =>redis,mysql => elasticsearch等;
- 带业务逻辑的增量数据处理
四、Canal环境搭建
下面我们通过一个简单的案例,来学习下如何搭建Canal实现对mysql的binlog日志的监听,然后拿到Canal解析之后的结果,我们就可以使用在上述说到的那些场景中。
【a】准备好MySQL运行环境
这里为了方便,直接使用docker启动了一个mysql 5.6,当然自己搭建一个mysql环境也是OK的。
sudo docker pull mysql:5.6
# --name指定容器名字 -v目录挂载 -p指定端口映射 -e设置mysql参数 -d后台运行
docker run -p 3306:3306 --name mysql5.6 -v /mysql2/conf:/etc/mysql -e MYSQL_ROOT_PASSWORD=root -d mysql:5.6
【b】开启 MySQL的binlog写入功能,配置 binlog-format 为 ROW 模式
因为有目录映射,所以我们可以直接在镜像外执行,不需要进入容器:
vim /mysql2/conf/my.cnf
my.cnf中配置如下:
[mysqld]
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复
如下图所示:
添加完成之后需要重启一下mysql服务:
docker restart mysql
注意binlog日志格式要求为row格式
Mysql Binlog的三种基本类型分别为:
- ROW模式
除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看;
- STATEMENT模式
只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
- MIX模式
比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;
【c】授权Canal连接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接 grant授权
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
【d】验证mysql服务端环境
使用如下命令查看是否打开binlog模式:
#是否启用了日志
show variables like 'log_bin';
可以看到,binlog日志功能已经开启。接着查看binlog日志文件列表:
#获取binlog文件列表
show binary logs;
接着查看当前正在写入的binlog文件:
#查看当前正在写入的binlog文件
show master status\G
如果上述三个命令都能查询出结果,说明mysql数据库服务器这一端环境配置成功,注意这里可以通过观察mysql的启动日志,查看是否有权限问题等报错。
【e】下载 Canal安装包
浏览器访问:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz,然后将下载好的canal安装包上传到服务器中并解压缩:
[root@wsh /]# mkdir -p /usr/local/canal.deployer-1.1.4
[root@wsh /]# tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4
这里笔者解压到/usr/local/canal.deployer-1.1.4目录中,注意需要提前创建好这个目录,否则会报错。解压完成后如下图:
【f】配置修改
主要是修改配置文件中与自己的数据库配置相关的信息:
vim conf/example/instance.properties
【g】启动Canal
[root@wsh canal.deployer-1.1.4]# cd bin/
[root@wsh bin]# ll
total 16
-rwxr-xr-x. 1 root root 58 Sep 2 2019 restart.sh
-rwxr-xr-x. 1 root root 1181 Sep 2 2019 startup.bat
-rwxr-xr-x. 1 root root 3167 Sep 2 2019 startup.sh
-rwxr-xr-x. 1 root root 1356 Sep 2 2019 stop.sh
[root@wsh bin]# ./startup.sh
cd to /usr/local/canal.deployer-1.1.4/bin for workaround relative path
LOG CONFIGURATION : /usr/local/canal.deployer-1.1.4/bin/../conf/logback.xml
canal conf : /usr/local/canal.deployer-1.1.4/bin/../conf/canal.properties
CLASSPATH :/usr/local/canal.deployer-1.1.4/bin/../conf:/usr/local/canal.deployer-1.1.4/bin/../lib/zookeeper-3.4.5.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/zkclient-0.10.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-core-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-context-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/snappy-java-1.1.7.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/snakeyaml-1.19.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/slf4j-api-1.7.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_httpserver-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_hotspot-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_common-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/scala-reflect-2.11.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/scala-logging_2.11-3.8.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/scala-library-2.11.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-srvutil-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-remoting-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-logging-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-common-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-client-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-acl-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/protobuf-java-3.6.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/oro-2.0.8.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/netty-all-4.1.6.Final.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/netty-3.2.2.Final.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/mysql-connector-java-5.1.47.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/metrics-core-2.2.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/lz4-java-1.4.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/logback-core-1.1.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/logback-classic-1.1.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/kafka-clients-1.1.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/kafka_2.11-1.1.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jsr305-3.0.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jopt-simple-5.0.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jctools-core-2.1.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jcl-over-slf4j-1.7.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/javax.annotation-api-1.3.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jackson-databind-2.9.6.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jackson-core-2.9.6.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jackson-annotations-2.9.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/httpcore-4.4.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/httpclient-4.5.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/h2-1.4.196.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/guava-18.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/fastsql-2.0.0_preview_973.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/fastjson-1.2.58.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/druid-1.1.9.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/disruptor-3.4.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-logging-1.1.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-lang3-3.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-lang-2.6.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-io-2.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-compress-1.9.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-codec-1.9.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-cli-1.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-beanutils-1.8.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.store-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.sink-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.server-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.protocol-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.prometheus-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.parse.driver-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.parse.dbsync-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.parse-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.meta-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.instance.spring-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.instance.manager-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.instance.core-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.filter-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.deployer-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.common-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/aviator-2.2.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/aopalliance-1.0.jar:
cd to /usr/local/canal.deployer-1.1.4/bin for continue
【h】查看Canal server 日志
[root@wsh logs]# cat canal/canal.log
2021-05-01 16:09:06.812 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2021-05-01 16:09:07.123 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2021-05-01 16:09:07.297 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2021-05-01 16:09:07.712 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
2021-05-01 16:09:17.579 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
【i】查看示例 instance 的日志
cat example.log
如果日志输出如上图,说明Canal正在准备dump数据库的binlog了。
使用canal用户查看当前正在写入的binlog文件:
如上图,成功看到此时Canal正作为一个slave监听着binlog。
五、Canal应用案例实践
【a】添加Canal的maven依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
【b】创建Canal客户端,具体代码如下:
package com.wsh.mybatis.mybatisdemo.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
public class CanalDemo {
private final static int BATCH_SIZE = 1000;
private final static String SERVER_HOST = "192.168.121.128";
private final static String DESTINATION = "example";
private final static String USERNAME = "";
private final static String PASSWORD = "";
public static void main(String[] args) {
// 创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(SERVER_HOST, 11111), DESTINATION, USERNAME, PASSWORD);
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//操作类型
EventType eventType = rowChage.getEventType();
//获取RowChange对象里的每一行数据,打印出来
for (RowData rowData : rowChage.getRowDatasList()) {
//删除
if (eventType == EventType.DELETE) {
System.out.println("/*************删除操作***************/");
for (Column column : rowData.getBeforeColumnsList()) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
} else if (eventType == EventType.INSERT) {
System.out.println("/*************新增操作***************/");
for (Column column : rowData.getAfterColumnsList()) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
} else if (eventType == EventType.UPDATE) {
System.out.println("/*************修改操作***************/");
//更新
System.out.println("/*************变更前***************/");
for (Column column : rowData.getBeforeColumnsList()) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
System.out.println("/*************变更后***************/");
for (Column column : rowData.getAfterColumnsList()) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
}
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
}
【d】测试
插入数据:
INSERT INTO `user`(`name`) VALUES ('name');
更新数据:
UPDATE `canal_demo`.`user`
SET `name` = 'zhangsan'
WHERE `id` = '2';
删除数据:
DELETE
FROM `canal_demo`.`user`
WHERE `id` = '2';
如上,我们看到成功利用Canal监听到数据库的变化,拿到这些信息之后,我们就可以将它封装成DTO,发送到Redis或者ElasticSearch等中间件去做一些业务处理。
六、总结
因为是基于监听binlog日志去进行同步数据,所以Canal对业务代码没有侵入,实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。通过本篇文章,我们明白了如何搭建一个Canal的基本环境(注:生产环境一般都是集群部署)以及如何使用代码拿到Canal帮我们分析的一些数据库增量同步信息,拿到信息后,我们就可以借助Redis、RabbitMQ等一些中间件去实现业务处理。