Canal中间件学习总结

目录

一、什么是Canal

二、Canal的工作原理

三、Canal应用用场景

四、Canal环境搭建

五、Canal应用案例实践

六、总结


一、什么是Canal

在项目中,不知道小伙伴们是否有遇到如下一些业务场景:

1)、我们修改了商品的信息之后,需要同步商品信息到ElasticSearch中重新建立索引;

2)、数据库实时备份;

3)、根据数据库日志表,进行日志信息采集并分析;

拿到上述需求,或许我们首先能想到的就是定时调度去同步Mysql与ElasticSearch、备份MySQL数据库、间隔一段时间去查询日志表的数据,手动发送到Kafka等中间件中。当然是用定时调度也是一种实现方式,但是阿里开源了框架Canal,它可以很方便地同步数据库的增量数据,拿到这些增量数据,我们同样可以完成上述这些需求。本篇文章就将总结一下阿里巴巴Canal中间件的简单使用。

Canal,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发。

历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;

Canal的Github项目地址:http://github.com/alibaba/canal/

二、Canal的工作原理

传统Mysql主从复制原理:

Canal中间件学习总结

MySQL主从复制将经过如下步骤:

  • 1、当master主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
  • 2、slave从服务器会在一定时间间隔内对master主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到master主服务器的二进制事件日志发生了改变,则开始一个I/O Thread请求master二进制日志文件;
  • 3、同时master主服务器为每个I/O Thread启动一个dump Thread,用于向其发送二进制事件日志;
  • 4、slave从服务器将接收到的二进制事件日志保存至自己本地的中继日志中;
  • 5、slave从服务器将启动SQL Thread从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
  • 6、最后I/O Thread和SQL Thread将进入睡眠状态,等待下一次被唤醒;

Canal工作原理:

  • 1、Canal模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发送dump 协议;
  • 2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即Canal);
  • 3、Canal解析 binary log 对象 (原始数据为byte流);

整体流程图如下所示:

Canal中间件学习总结

三、Canal应用用场景

Canal是基于MySQL变更日志增量订阅和消费的组件,当前Canal主要是支持 MySQL,可以使用在如下一些应用场景:

  • 数据库实时备份;
  • 缓存刷新;
  • 搜索构建,比如es索引重建;
  • 价格变化等重要业务消息,比如商品价格发生变化,我们监听到发送消息通知等;
  • 跨数据库的数据备份(异构数据同步),例如mysql => oracle,mysql =>redis,mysql => elasticsearch等;
  • 带业务逻辑的增量数据处理

四、Canal环境搭建

下面我们通过一个简单的案例,来学习下如何搭建Canal实现对mysql的binlog日志的监听,然后拿到Canal解析之后的结果,我们就可以使用在上述说到的那些场景中。

【a】准备好MySQL运行环境

这里为了方便,直接使用docker启动了一个mysql 5.6,当然自己搭建一个mysql环境也是OK的。

sudo docker pull mysql:5.6

# --name指定容器名字 -v目录挂载 -p指定端口映射  -e设置mysql参数 -d后台运行
docker run -p 3306:3306 --name mysql5.6 -v /mysql2/conf:/etc/mysql -e MYSQL_ROOT_PASSWORD=root -d mysql:5.6

【b】开启 MySQL的binlog写入功能,配置 binlog-format 为 ROW 模式

因为有目录映射,所以我们可以直接在镜像外执行,不需要进入容器:

vim /mysql2/conf/my.cnf

my.cnf中配置如下:

[mysqld]
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

如下图所示:

Canal中间件学习总结

添加完成之后需要重启一下mysql服务:

docker restart mysql

注意binlog日志格式要求为row格式

Mysql Binlog的三种基本类型分别为:

  • ROW模式

除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看;

  • STATEMENT模式

只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况; 

  • MIX模式

比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

【c】授权Canal连接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接 grant授权

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Canal中间件学习总结

【d】验证mysql服务端环境

使用如下命令查看是否打开binlog模式:

#是否启用了日志
show variables like 'log_bin';

Canal中间件学习总结

可以看到,binlog日志功能已经开启。接着查看binlog日志文件列表:

#获取binlog文件列表
show binary logs;

Canal中间件学习总结

接着查看当前正在写入的binlog文件:

#查看当前正在写入的binlog文件
show master status\G

Canal中间件学习总结

如果上述三个命令都能查询出结果,说明mysql数据库服务器这一端环境配置成功,注意这里可以通过观察mysql的启动日志,查看是否有权限问题等报错。

【e】下载 Canal安装包

浏览器访问:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz,然后将下载好的canal安装包上传到服务器中并解压缩:

[root@wsh /]# mkdir -p  /usr/local/canal.deployer-1.1.4
[root@wsh /]# tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4

这里笔者解压到/usr/local/canal.deployer-1.1.4目录中,注意需要提前创建好这个目录,否则会报错。解压完成后如下图:

Canal中间件学习总结

【f】配置修改

主要是修改配置文件中与自己的数据库配置相关的信息:

vim conf/example/instance.properties

【g】启动Canal

[root@wsh canal.deployer-1.1.4]# cd bin/
[root@wsh bin]# ll
total 16
-rwxr-xr-x. 1 root root   58 Sep  2  2019 restart.sh
-rwxr-xr-x. 1 root root 1181 Sep  2  2019 startup.bat
-rwxr-xr-x. 1 root root 3167 Sep  2  2019 startup.sh
-rwxr-xr-x. 1 root root 1356 Sep  2  2019 stop.sh
[root@wsh bin]# ./startup.sh 
cd to /usr/local/canal.deployer-1.1.4/bin for workaround relative path
LOG CONFIGURATION : /usr/local/canal.deployer-1.1.4/bin/../conf/logback.xml
canal conf : /usr/local/canal.deployer-1.1.4/bin/../conf/canal.properties
CLASSPATH :/usr/local/canal.deployer-1.1.4/bin/../conf:/usr/local/canal.deployer-1.1.4/bin/../lib/zookeeper-3.4.5.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/zkclient-0.10.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-core-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-context-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/snappy-java-1.1.7.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/snakeyaml-1.19.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/slf4j-api-1.7.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_httpserver-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_hotspot-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient_common-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/simpleclient-0.4.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/scala-reflect-2.11.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/scala-logging_2.11-3.8.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/scala-library-2.11.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-srvutil-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-remoting-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-logging-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-common-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-client-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/rocketmq-acl-4.5.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/protobuf-java-3.6.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/oro-2.0.8.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/netty-all-4.1.6.Final.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/netty-3.2.2.Final.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/mysql-connector-java-5.1.47.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/metrics-core-2.2.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/lz4-java-1.4.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/logback-core-1.1.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/logback-classic-1.1.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/kafka-clients-1.1.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/kafka_2.11-1.1.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jsr305-3.0.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jopt-simple-5.0.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jctools-core-2.1.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jcl-over-slf4j-1.7.12.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/javax.annotation-api-1.3.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jackson-databind-2.9.6.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jackson-core-2.9.6.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/jackson-annotations-2.9.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/httpcore-4.4.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/httpclient-4.5.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/h2-1.4.196.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/guava-18.0.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/fastsql-2.0.0_preview_973.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/fastjson-1.2.58.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/druid-1.1.9.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/disruptor-3.4.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-logging-1.1.3.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-lang3-3.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-lang-2.6.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-io-2.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-compress-1.9.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-codec-1.9.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-cli-1.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/commons-beanutils-1.8.2.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.store-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.sink-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.server-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.protocol-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.prometheus-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.parse.driver-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.parse.dbsync-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.parse-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.meta-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.instance.spring-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.instance.manager-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.instance.core-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.filter-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.deployer-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/canal.common-1.1.4.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/aviator-2.2.1.jar:/usr/local/canal.deployer-1.1.4/bin/../lib/aopalliance-1.0.jar:
cd to /usr/local/canal.deployer-1.1.4/bin for continue

【h】查看Canal server 日志

[root@wsh logs]# cat canal/canal.log 
2021-05-01 16:09:06.812 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2021-05-01 16:09:07.123 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2021-05-01 16:09:07.297 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2021-05-01 16:09:07.712 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
2021-05-01 16:09:17.579 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

【i】查看示例 instance 的日志

cat example.log

Canal中间件学习总结

如果日志输出如上图,说明Canal正在准备dump数据库的binlog了。

使用canal用户查看当前正在写入的binlog文件:

Canal中间件学习总结

如上图,成功看到此时Canal正作为一个slave监听着binlog。

五、Canal应用案例实践

【a】添加Canal的maven依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

【b】创建Canal客户端,具体代码如下:

package com.wsh.mybatis.mybatisdemo.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

public class CanalDemo {
    private final static int BATCH_SIZE = 1000;
    private final static String SERVER_HOST = "192.168.121.128";
    private final static String DESTINATION = "example";
    private final static String USERNAME = "";
    private final static String PASSWORD = "";

    public static void main(String[] args) {
        // 创建连接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(SERVER_HOST, 11111), DESTINATION, USERNAME, PASSWORD);
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    for (Entry entry : message.getEntries()) {
                        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                            //开启/关闭事务的实体类型,跳过
                            continue;
                        }
                        //RowChange对象,包含了一行数据变化的所有特征
                        //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
                        RowChange rowChage;
                        try {
                            rowChage = RowChange.parseFrom(entry.getStoreValue());
                        } catch (Exception e) {
                            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                        }
                        //操作类型
                        EventType eventType = rowChage.getEventType();
                        //获取RowChange对象里的每一行数据,打印出来
                        for (RowData rowData : rowChage.getRowDatasList()) {
                            //删除
                            if (eventType == EventType.DELETE) {
                                System.out.println("/*************删除操作***************/");
                                for (Column column : rowData.getBeforeColumnsList()) {
                                    System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                                }
                            } else if (eventType == EventType.INSERT) {
                                System.out.println("/*************新增操作***************/");
                                for (Column column : rowData.getAfterColumnsList()) {
                                    System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                                }
                            } else if (eventType == EventType.UPDATE) {
                                System.out.println("/*************修改操作***************/");
                                //更新
                                System.out.println("/*************变更前***************/");
                                for (Column column : rowData.getBeforeColumnsList()) {
                                    System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                                }
                                System.out.println("/*************变更后***************/");
                                for (Column column : rowData.getAfterColumnsList()) {
                                    System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                                }
                            }
                        }
                    }

                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

【d】测试

插入数据:

INSERT INTO `user`(`name`) VALUES ('name');

Canal中间件学习总结

更新数据:

UPDATE `canal_demo`.`user`
SET `name` = 'zhangsan'
WHERE `id` = '2';

Canal中间件学习总结

删除数据:

DELETE
FROM `canal_demo`.`user`
WHERE `id` = '2';

Canal中间件学习总结

如上,我们看到成功利用Canal监听到数据库的变化,拿到这些信息之后,我们就可以将它封装成DTO,发送到Redis或者ElasticSearch等中间件去做一些业务处理。

六、总结

因为是基于监听binlog日志去进行同步数据,所以Canal对业务代码没有侵入,实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。通过本篇文章,我们明白了如何搭建一个Canal的基本环境(注:生产环境一般都是集群部署)以及如何使用代码拿到Canal帮我们分析的一些数据库增量同步信息,拿到信息后,我们就可以借助Redis、RabbitMQ等一些中间件去实现业务处理。

上一篇:【Canal】数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!


下一篇:大促突围:京东到家基于Canal的数据异构设计