基于Docker结合Canal实现MySQL实时增量数据传输

 蒋大帅 分布式实验室

基于Docker结合Canal实现MySQL实时增量数据传输


Canal的介绍

基于Docker结合Canal实现MySQL实时增量数据传输


Canal的历史由来
在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,但因为跨机房同步数据的业务需求 ,便孕育而生出了Canal,主要是基于trigger(触发器)的方式获取增量变更。从2010年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。
当前的Canal支持的数据源端MySQL版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。
Canal的应用场景
目前普遍基于日志增量订阅和消费的业务,主要包括:
  • 基于数据库增量日志解析,提供增量数据订阅和消费

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务Cache刷新

  • 带业务逻辑的增量数据处理


Canal的工作原理

基于Docker结合Canal实现MySQL实时增量数据传输


在介绍Canal的原理之前,我们先来了解下MySQL主从复制的原理。
MySQL主从复制原理
基于Docker结合Canal实现MySQL实时增量数据传输
  • MySQL Master将数据变更的操作写入二进制日志binary log中, 其中记录的内容叫做二进制日志事件binary log events,可以通过show binlog events命令进行查看

  • MySQL Slave会将Master的binary log中的binary log events拷贝到它的中继日志relay log

  • MySQL Slave重读并执行relay log中的事件,将数据变更映射到它自己的数据库表中


了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似的逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?
Canal工作原理
基于Docker结合Canal实现MySQL实时增量数据传输
  • Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议

  • MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal)

  • Canal解析binary log对象(数据为byte流)


基于这样的原理与方式,便可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现MySQL实时增量数据传输的功能。
既然Canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中。
基于Docker结合Canal实现MySQL实时增量数据传输



因为目前容器化技术的火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解Canal,所以关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。
什么是Docker
相信绝大多数人都使用过虚拟机VMware,在使用VMware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一遍,而且VMware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。
为了便于大家快速理解Docker,便与VMware做对比来做介绍,Docker提供了一个开始,打包,运行APP的平台,把APP(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(类似VMware的系统镜像)与容器(类似VMware里安装的系统)。
什么是Image(镜像)
  • 文件和meta data的集合(root filesystem)

  • 分层的,并且每一层都可以添加改变删除文件,成为一个新的image

  • 不同的image可以共享相同的layer

  • Image本身是read-only的


基于Docker结合Canal实现MySQL实时增量数据传输
什么是Container(容器)
  • 通过Image创建(copy)

  • 在Image layer之上建立一个container layer(可读写)

  • 类比面向对象:类和实例

  • Image负责APP的存储和分发,Container负责运行APP


基于Docker结合Canal实现MySQL实时增量数据传输
Docker的网络介绍
Docker的网络类型有三种:
  • Bridge:桥接网络。默认情况下启动的Docker容器,都是使用Bridge,Docker安装时创建的桥接网络,每次Docker容器重启时,会按照顺序获取对应的IP地址,这个就导致重启下,Docker的IP地址就变了。

  • None:无指定网络。使用 --network=none,Docker容器就不会分配局域网的IP。

  • Host:主机网络。使用--network=host,此时,Docker容器的网络会附属在主机上,两者是互通的。例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器中。


创建自定义网络:(设置固定IP)

docker network create --subnet=172.18.0.0/16 mynetwork


查看存在的网络类型docker network ls:
基于Docker结合Canal实现MySQL实时增量数据传输
搭建Canal环境
附上Docker的下载安装地址:https://www.docker.com/products/docker-desktop。
下载Canal镜像docker pull canal/canal-server:
基于Docker结合Canal实现MySQL实时增量数据传输
下载MySQL镜像docker pull mysql,下载过的则如下图:
基于Docker结合Canal实现MySQL实时增量数据传输
查看已经下载好的镜像docker images:
基于Docker结合Canal实现MySQL实时增量数据传输
接下来通过镜像生成MySQL容器与canal-server容器:基于Docker结合Canal实现MySQL实时增量数据传输


查看Docker中运行的容器docker ps:
基于Docker结合Canal实现MySQL实时增量数据传输
MySQL的配置修改
以上只是初步准备好了基础的环境,但是怎么让Canal伪装成Salve并正确获取MySQL中的binary log呢?
对于自建MySQL,需要先开启Binlog写入功能,配置binlog-format为ROW模式,通过修改MySQL配置文件来开启bin_log,使用find / -name my.cnf查找my.cnf,修改文件内容如下:

[mysqld]log-bin=mysql-bin # 开启binlogbinlog-format=ROW # 选择ROW模式server_id=1 # 配置MySQL replaction需要定义,不要和Canal的slaveId重复


进入MySQL容器docker exec -it mysql bash。
创建链接MySQL的账号Canal并授予作为MySQL slave的权限,如果已有账户可直接GRANT:

mysql -uroot -proot# 创建账号CREATE USER canal IDENTIFIED BY 'canal'; # 授予权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;# 刷新并应用FLUSH PRIVILEGES;


数据库重启后,简单测试 my.cnf 配置是否生效:基于Docker结合Canal实现MySQL实时增量数据传输

show variables like 'log_bin';show variables like 'log_bin';show master status;


canal-server的配置修改
进入canal-server容器docker exec -it canal-server bash。
编辑canal-server的配置vi canal-server/conf/example/instance.properties:基于Docker结合Canal实现MySQL实时增量数据传输

更多配置请参考:https://github.com/alibaba/canal/wiki/AdminGuide。
重启canal-server容器docker restart canal-server 进入容器查看启动日志:

docker exec -it canal-server bashtail -100f canal-server/logs/example/example.log


基于Docker结合Canal实现MySQL实时增量数据传输
至此,我们的环境工作准备完成!


拉取数据并同步保存到ElasticSearch

基于Docker结合Canal实现MySQL实时增量数据传输


本文的ElasticSearch也是基于Docker环境搭建,所以读者可执行如下命令:基于Docker结合Canal实现MySQL实时增量数据传输


环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取Canal解析后的binlog数据。首先我们基于Spring Boot搭建一个canal demo应用。结构如下图所示:
基于Docker结合Canal实现MySQL实时增量数据传输
Student.java基于Docker结合Canal实现MySQL实时增量数据传输


CanalConfig.java基于Docker结合Canal实现MySQL实时增量数据传输

基于Docker结合Canal实现MySQL实时增量数据传输
CanalDataParser.java
由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从GitHub上获取:


  1. public static class TwoTuple<A, B> {


ElasticUtils.java基于Docker结合Canal实现MySQL实时增量数据传输基于Docker结合Canal实现MySQL实时增量数据传输

基于Docker结合Canal实现MySQL实时增量数据传输
BinLogElasticSearch.java
基于Docker结合Canal实现MySQL实时增量数据传输基于Docker结合Canal实现MySQL实时增量数据传输
基于Docker结合Canal实现MySQL实时增量数据传输
基于Docker结合Canal实现MySQL实时增量数据传输
CanalDemoApplication.java(Spring Boot启动类)基于Docker结合Canal实现MySQL实时增量数据传输

application.properties基于Docker结合Canal实现MySQL实时增量数据传输



Canal集群高可用的搭建

基于Docker结合Canal实现MySQL实时增量数据传输


通过上面的学习,我们知道了单机直连方式的Canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么Canala的多实例集群方式如何搭建呢!
基于ZooKeeper获取Canal实例
准备ZooKeeper的Docker镜像与容器:基于Docker结合Canal实现MySQL实时增量数据传输


1、机器准备:

  • 运行Canal的容器IP:172.18.0.4 , 172.18.0.8

  • ZooKeeper容器IP:172.18.0.3:2181

  • MySQL容器IP:172.18.0.6:3306


2、按照部署和配置,在单台机器上各自完成配置,演示时instance name为example。
3、修改canal.properties,加上ZooKeeper配置并修改Canal端口:

canal.port=11113canal.zkServers=172.18.0.3:2181canal.instance.global.spring.xml = classpath:spring/default-instance.xml


4、创建example目录,并修改instance.properties:

canal.instance.mysql.slaveId = 1235 #之前的canal slaveId是1234,保证slaveId不重复即可canal.instance.master.address = 172.18.0.6:3306


注意:两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。
启动两个不同容器的Canal,启动后,可以通过tail -100f logs/example/example.log查看启动日志,只会看到一台机器上出现了启动成功的日志。
比如我这里启动成功的是 172.18.0.4:基于Docker结合Canal实现MySQL实时增量数据传输

查看一下ZooKeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111:

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running{"active":true,"address":"172.18.0.4:11111","cid":1}


客户端链接, 消费数据
可以通过指定ZooKeeper地址和Canal的instance name,canal client会自动从ZooKeeper中的running节点获取当前服务的工作节点,然后与其建立链接:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running{"active":true,"address":"172.18.0.4:11111","cid":1}


对应的客户端编码可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一个HA连接:

CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");


链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端IP,链接的端口信息等(聪明的你,应该也可以发现,canal client也可以支持HA功能):

[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running{"active":true,"address":"192.168.124.5:59887","clientId":1001}


数据消费成功后,canal server会在ZooKeeper中记录下当前最后一次消费成功的binlog位点(下次你重启client时,会从这最后一个位点继续进行消费):

[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}


停止正在工作的172.18.0.4的canal server:

docker exec -it canal-server bashcd canal-server/binsh stop.sh


这时172.18.0.8会立马启动example instance,提供新的数据服务:

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running{"active":true,"address":"172.18.0.8:11111","cid":1}


与此同时,客户端也会随着canal server的切换,通过获取ZooKeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成。
异常与总结

基于Docker结合Canal实现MySQL实时增量数据传输


elasticsearch-head无法访问Elasticsearch
es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下:基于Docker结合Canal实现MySQL实时增量数据传输



修改完配置文件后需重启es服务。
elasticsearch-head查询报406 Not Acceptable
基于Docker结合Canal实现MySQL实时增量数据传输
解决方法:
1、进入head安装目录;
2、cd _site/
3、编辑vendor.js 共有两处

基于Docker结合Canal实现MySQL实时增量数据传输


使用elasticsearch-rest-high-level-client报org.elasticsearch.action.index.IndexRequest.ifSeqNo

基于Docker结合Canal实现MySQL实时增量数据传输


相关参考:https://github.com/elastic/elasticsearch/issues/43023。
为什么ElasticSearch要在7.X版本不能使用type?
参考:https://www.waitig.com/为什么elasticsearch要在7-x版本去掉type.html
使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.NoNodeAvailableException由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。
可参考:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html。
设置Docker容器开启启动

如果创建时未指定 --restart=always ,可通过update 命令docker update --restart=always [containerID]


Docker for Mac network host模式不生效
Host模式是为了性能,但是这却对Docker的隔离性造成了破坏,导致安全性降低。在性能场景下,可以用--netwokr host开启Host模式,但需要注意的是,如果你用Windows或Mac本地启动容器的话,会遇到Host模式失效的问题。原因是Host模式只支持Linux宿主机。
参见官方文档:https://docs.docker.com/network/host/。
客户端连接ZooKeeper报authenticate using SASL(unknow error)
基于Docker结合Canal实现MySQL实时增量数据传输

  • zookeeper.jar与Dokcer中的ZooKeeper版本不一致

  • zookeeper.jar使用了3.4.6之前的版本


出现这个错的意思是ZooKeeper作为外部应用需要向系统申请资源,申请资源的时候需要通过认证,而sasl是一种认证方式,我们想办法来绕过sasl认证。避免等待,来提高效率。
在项目代码中加入System.setProperty("zookeeper.sasl.client", "false");,如果是Spring Boot项目可以在application.properties中加入zookeeper.sasl.client=false。
参考:https://issues.apache.org/jira/browse/ZOOKEEPER-1657。
如果更换canal.client.jar中依赖的zookeeper.jar的版本
把Canal的官方源码下载到本机git clone https://github.com/alibaba/canal.git,然后修改client模块下pom.xml文件中关于ZooKeeper的内容,然后重新mvn install:
基于Docker结合Canal实现MySQL实时增量数据传输
把自己项目依赖的包替换为刚刚mvn install生产的包:
基于Docker结合Canal实现MySQL实时增量数据传输
关于选型的取舍
基于Docker结合Canal实现MySQL实时增量数据传输
原文链接:https://juejin.im/post/5ae82040f265da0ba46993df


上一篇:基于canal实现es写入方案:实现搜索服务与链路服务解耦


下一篇:数据的异构实战(二)手写迷你版同步工程