Canal原理及其使用

1 什么是canal

  canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

2 canal使用场景

  (1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景

Canal原理及其使用

  (2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就差mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性。

Canal原理及其使用

  (3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。

  (4)取业务表的新增变化数据,用于制作实时统计

3 canal工作原理

Canal原理及其使用

  首先了解一下mysql主备复制原理:

  (1)master主库将改变记录,发送到二进制文件(binary log)中

  (2)slave从库向mysql Master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

  (3)slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

  canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

Canal原理及其使用

4 mysql的binlog

4.1 二进制日志

  mysql的二进制日志记录了所有的DDL和DML(除了数据查询语句),以事件的形式进行记录,包含语句执行消耗的时间,mysql的二进制日志是事务安全型的。

  开启二进制日志大概会有1%的性能损坏。二进制日志有2个主要的使用场景:①mysql的主备复制②数据恢复,通过使用mysqlbinlog工具来恢复数据(用这个做恢复是备选方案,主方案还是定期快照,定期执行脚本导数据,其实就是把当前所有数据导成insert,这个量少)

  二进制日志包括2类文件:①二进制日志索引文件(后缀为.index)用于记录所有的二进制文件②二进制日志文件(后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)

4.2 开启binlog

  修改mysql的配置文件my.cnf。

# vim /etc/my.cnfgG

  在[mysqld] 区块 添加

log-bin=mysql-bin

  mysql-bin表示binlog日志的前缀,以后生成的的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成。 当mysql重启或到达单个文件大小的阈值时,新生一个文件,按顺序编号。

4.3 binlog分类

  binlog的格式有三种:STATEMENT,MIXED,ROW对比如下

格式 描述 优点
STATEMENT 语句级别,记录每一次执行写操作的语句,相对于ROW模式节省了空间,但是可能产生数据不一致如update tt set create_date=now(),由于执行时间不同产生饿得数据就不同 节省空间 可能造成数据不一致
ROW 行级,记录每次操作后每行记录的变化。假如一个update的sql执行结果是1万行statement只存一条,如果是row的话会把这个1000行的结果存这。 持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果 占用较大空间
MIXED 是对statement的升级,如当函数中包含 UUID() 时,包含 AUTO_INCREMENT 字段的表被更新时,执行 INSERT DELAYED 语句时,用 UDF 时,会按照 ROW的方式进行处理 节省空间,同时兼顾了一定的一致性 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便

4.4 binlog格式选择

  如果只考虑主从复制的话可以用mixed,一般情况下使用statement,遇到几种特殊情况使用row,同步的话有SQL就行,因为手里有数据,前提是有数据才能执行这个SQL。在大数据场景下我们抽取数据是用于统计分析,分析的数据,如果用statement抽了SQL手里也没数据,不知道执行修改哪些,因为没有数据,所以没办法分析,所以适合用row,清清楚楚的表明了每一行是什么样。

4.5 修改配置文件

  修改my.cnf文件,在[mysqld]模块下添加如下内容

server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=bigdata

  binlog-do-db用于指定库,缩小监控的范围,server-id不能和mysql集群的其他节点重复

4.6 重启mysql

# service mysqld restart
Redirecting to /bin/systemctl restart mysqld.service

  到数据目录下查询是否生成binlog文件,这里我把数据目录自定义为了/data/mysql/

# cd /data/mysql/
# ll
total 188500
-rw-r----- 1 mysql mysql       56 Jul  1  2020 auto.cnf
-rw------- 1 mysql mysql     1676 Jul  1  2020 ca-key.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 ca.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 client-cert.pem
-rw------- 1 mysql mysql     1676 Jul  1  2020 client-key.pem
drwxr-x--- 2 mysql mysql     4096 Jul  1  2020 dataxweb
-rw-r----- 1 mysql mysql      526 Jan 14 11:03 ib_buffer_pool
-rw-r----- 1 mysql mysql 79691776 Jan 14 11:04 ibdata1
-rw-r----- 1 mysql mysql 50331648 Jan 14 11:04 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 Aug  5 06:20 ib_logfile1
-rw-r----- 1 mysql mysql 12582912 Jan 14 11:04 ibtmp1
drwxr-x--- 2 mysql mysql      116 Jul  1  2020 iot
drwxr-x--- 2 mysql mysql     4096 Jul  1  2020 mysql
-rw-r----- 1 mysql mysql      154 Jan 14 11:03 mysql-bin.000001
-rw-r----- 1 mysql mysql       19 Jan 14 11:03 mysql-bin.index
srwxrwxrwx 1 mysql mysql        0 Jan 14 11:03 mysql.sock
-rw------- 1 mysql mysql        6 Jan 14 11:03 mysql.sock.lock
drwxr-x--- 2 mysql mysql     8192 Jul  1  2020 performance_schema
-rw------- 1 mysql mysql     1680 Jul  1  2020 private_key.pem
-rw-r--r-- 1 mysql mysql      452 Jul  1  2020 public_key.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 server-cert.pem
-rw------- 1 mysql mysql     1676 Jul  1  2020 server-key.pem
drwxr-x--- 2 mysql mysql     8192 Jul  1  2020 sys

  可以发现,这二进制日志索引文件和日志文件生成了。只要重启mysql,mysql-bin后面的序号就会往上涨,他的切分规则就是重启或者到一个大小的阈值,就会切一个

mysql-bin.000001
mysql-bin.index

5 安装canal

5.1 下载地址

https://github.com/alibaba/canal/releases

5.2 mysql为canal配置权限

  在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

  报错:ERROR 1819 (HY000): Your password does not satisfy the current policy requirements

原因是因为密码设置的过于简单会报错,MySQL有密码设置的规范,具体是与validate_password_policy的值有关,下图表明该值规则

Canal原理及其使用

  查看MySQL完整的初始密码规则,登陆后执行以下命令

mysql> SHOW VARIABLES LIKE 'validate_password%';
+--------------------------------------+--------+
| Variable_name                        | Value  |
+--------------------------------------+--------+
| validate_password_check_user_name    | OFF    |
| validate_password_dictionary_file    |        |
| validate_password_length             | 8      |
| validate_password_mixed_case_count   | 1      |
| validate_password_number_count       | 1      |
| validate_password_policy             | MEDIUM |
| validate_password_special_char_count | 1      |
+--------------------------------------+--------+

 密码的长度是由validate_password_length决定的,但是可以通过以下命令修改

set global validate_password_length=4;

  validate_password_policy决定密码的验证策略,默认等级为MEDIUM(中等),可通过以下命令修改为LOW(低)

set global validate_password_policy=0;

  重新执行

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

5.3 解压及配置

$ tar -zxvf canal.deployer-1.1.4.tar.gz

  配置说明:canal server的conf下有几个配置文件

conf/
├── canal_local.properties
├── canal.properties
├── example
│   ├── h2.mv.db
│   ├── instance.properties
│   └── meta.dat
├── logback.xml
├── metrics
│   └── Canal_instances_tmpl.json
└── spring
    ├── base-instance.xml
    ├── default-instance.xml
    ├── file-instance.xml
    ├── group-instance.xml
    ├── memory-instance.xml
    └── tsdb
        ├── h2-tsdb.xml
        ├── mysql-tsdb.xml
        ├── sql
        │   └── create_table.sql
        └── sql-map
            ├── sqlmap-config.xml
            ├── sqlmap_history.xml
            └── sqlmap_snapshot.xml

canal.properties的common属性前四个配置项:

canal.id= 1             #canal的编号,在集群环境下,不同canal的id不同,注意它和mysql的server_id不同。
canal.ip=               # ip这里不指定,默认为本机
canal.port= 11111       # 端口号,是给tcp模式(netty)时候用的,如果用了kafka或者rocketmq,就不会去起这个端口了
canal.zkServers=         # zk用于canal cluster
canal.serverMode = tcp   # 用于指定什么模式拉取数据

destinations相关的配置:

#################################################
#########       destinations        ############# 
#################################################
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.global.mode = spring 
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

canal.destinations = example可以设置多个,比如example1,example2,则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties文件。

全局的canal实例管理用spring,这里的file-instance.xml最终会实例化所有的destinations instances:

        <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
                <property name="destination" value="${canal.instance.destination}" />
                <property name="eventParser">
                        <ref local="eventParser" />
                </property>
                <property name="eventSink">
                        <ref local="eventSink" />
                </property>
                <property name="eventStore">
                        <ref local="eventStore" />
                </property>
                <property name="metaManager">
                        <ref local="metaManager" />
                </property>
                <property name="alarmHandler">
                        <ref local="alarmHandler" />
                </property>
        <property name="mqConfig">
            <ref local="mqConfig" />
        </property>
        </bean>

如canal.instance.destination等于example,就会加载example/instance.properties配置文件

修改instance 配置文件

vi conf/example/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=10.0.165.1:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#################################################

6 canal的instance与消费方式

Canal原理及其使用

  canal.properties这个配置文件负责的是canal服务的基础配置,每个canal可以起n多个实例instance,一个instance代表一个线程,每个instance都有一个独立的配置文件instance.properties,不同的instance可以采集不同的mysql数据库,也就是一个canal可以对应多个mysql数据库。

  在instance里面有一个小队列,可以理解为是jvm级的队列,instance抓取来的数据先放入到队列中,队列可以有很多出口:①一个是canal server自己主动把数据推送到kafka,这个比较简单,一行代码不用写,只需要配个kafka的地址,每个instance对应kafka的一个topic,数据是json串。这种方式虽然简单,但是他的缺点主要体现在2个方面,一个instance对应一个topic,所有表都在这一个topic,所以实时的时候要进行分流。另一方面,因为数据是json,并且携带了很多冗余信息,但是数据量大的时候传输效率比较低。②第二种方式是启动canal客户端主动去拉取数据,可以定义多长周期消费多少数据。他的缺点在于抓取出来的是序列化压缩的数据,所以需要反序列化,解压,比较麻烦。他的优点在于我们可以进行压缩,过滤掉没用的冗余信息,只保留我们需要的信息,提交传输效率。

$ ll
total 16
-rwxrwxr-x 1 canal canal  291 Sep  2  2019 canal_local.properties
-rwxrwxr-x 1 canal canal 5202 Jan 14 12:10 canal.properties
drwxrwxr-x 2 canal canal   33 Jan 14 12:15 example
-rwxrwxr-x 1 canal canal 3119 Sep  2  2019 logback.xml
drwxrwxr-x 2 canal canal   39 Jan 14 12:00 metrics
drwxrwxr-x 3 canal canal  149 Jan 14 12:00 spring

  一个example的目录就是一个instance,canal要配置多个实例采集多个数据源mysql的话如下配置,然后把conf目录下example复制多份,分别重命名。如下

#################################################
#########               destinations            #############
#################################################
canal.destinations = example1,example2,example3

7 canal server主动推送数据

7.1 配置

  修改配置vim conf/canal.properties:这个是总配置,端口号,服务器参数,kafka地址,zookeeper地址(高可用)等

  修改如下内容,这个zookeeper是配置高可用的,配置采用kafka方式,kafka的地址

canal.zkServers = 10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181
canal.serverMode = kafka
canal.mq.servers = 10.0.165.8:9092,10.0.165.9:9092

  修改配置vim conf/example/instance.properties针对要追踪的mysql的实例配置:一个instance实例对应一个数据库(这个是指数据库服务器)服务器的binlog。所以一个instance具体采集几个数据库是binlog定的和canal没关系,canal不管,canal就把binlog里面有什么就采集,不管是一个数据库还是多个,只要在一个binlog都采集

  修改如下内容,配置用户名,密码,地址。canal.mq.partitionsNum这个是发送到第几个分区

canal.instance.master.address=10.0.165.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=ods_bigdata_mysql

7.2 启动canal

./bin/startup.sh

7.3 测试

  启动canal后,在kafka创建topic

bin/kafka-topics.sh --create --zookeeper 10.0.165.4:2181 --replication-factor 2 --partitions 12 --topic ods_bigdata_mysql

  到kafka目录下开销消费端查询是否有数据

bin/kafka-console-consumer.sh --bootstrap-server  10.0.165.8:9092,10.0.165.9:9092 --topic  ods_bigdata_mysql

  (1)往需要采集的库中的user_info表插入一条数据数据

  执行sql

insert  into user_info values (10001,'test','test',NULL,'test','11111111111','111@gmail.com',NULL,'3','1999-09-09','F','2020-02-02 02:02:02',NULL)

  可以看到kafka消费出了如下一条数据

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"test","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610676724000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610676724288,"type":"INSERT"}

  (2)更新前面插入的这条数据

  执行sql

UPDATE user_info SET name="update" WHERE id=10001

  kafka消费出的数据如下

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"update","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610676928000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":[{"name":"test"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610676928644,"type":"UPDATE"}

  (3)删除前面插入的这条数据

  执行sql

DELETE FROM user_info WHERE id=10001

  kafka消费出的数据如下

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"update","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610677003000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610677003637,"type":"DELETE"}

8 canal主动拉取数据客户端

8.1 修改配置

  修改canal.properties,zookeeper配置高可用,配置采用tcp方式

canal.zkServers = 10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181
canal.serverMode = tcp

  注意:需要修改canal.proerties的canal.serverMode为tcp否则不会启动11111端口

  修改instance.properties,配置用户名,密码,地址。

canal.instance.master.address=10.0.165.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

  重新启动后查看11111端口是否被占用

[canal@fbi-local-02 bin]$ lsof -i:11111
COMMAND   PID  USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    38516 canal  108u  IPv4 4281763      0t0  TCP fbi-local-02:vce (LISTEN)

8.2 将binlog转换为ProtoBuf消息

  (1)编写proto描述文件CanalBinLog.proto

syntax = "proto3";
option java_package = "com.quinto.canal";
option java_outer_classname = "CanalBinLog";

/* 行数据 */
message RowData {
    uint64 executeTime = 1;
    string schemaName = 2;
    string tableName = 3;
    string eventType = 4;
    /* 列数据 */
    map<string, string> columns = 5;
    uint64 logfileoffset = 14;
    string logfilename = 15;
}

  (2)canal客户端代码编写

  导入依赖

    <properties>
        <protobuf.version>3.5.0</protobuf.version>
        <kafka.client.version>1.0.0</kafka.client.version>
        <kafka.version>0.11.0.2</kafka.version>
        <canal.version>1.1.4</canal.version>
    </properties>
    <dependencies>
    <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>${canal.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.client.version}</version>
        </dependency>
    </dependency>

  ①工具类

  读取配置文件工具类

package com.quinto.utils;

import java.io.IOException;
import java.util.Properties;

/**
 * 读取config.properties配置文件的工具类
 */
public class ConfigUtil {
    // 定义一个properties对象
    public static Properties properties;
    // 定义一个静态代码块,只执行一次
    static {
        try {
            properties = new Properties();
            properties.load(ConfigUtil.class.getClassLoader().getResourceAsStream("config.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static String canalServerIp() {
        return properties.getProperty("canal.server.ip");
    }

    public static int canalServerPort() {
        return Integer.parseInt(properties.getProperty("canal.server.port"));
    }

    public static String canalServerDestination() {
        return properties.getProperty("canal.server.destination");
    }

    public static String canalServerUsername() {
        return properties.getProperty("canal.server.username");
    }

    public static String canalServerPassword() {
        return properties.getProperty("canal.server.password");
    }

    public static String canalSubscribeFilter() {
        return properties.getProperty("canal.subscribe.filter");
    }

    public static String zookeeperServerIp() {
        return properties.getProperty("zookeeper.server.ip");
    }

    public static String kafkaBootstrap_servers_config() {
        return properties.getProperty("kafka.bootstrap_servers_config");
    }

    public static int kafkaBatch_size() {
        return Integer.parseInt(properties.getProperty("kafka.batch_size"));
    }

    public static String kafkaAcks() {
        return properties.getProperty("kafka.acks");
    }

    public static String kafkaRetries() {
        return properties.getProperty("kafka.retries");
    }

    public static String kafkaBatch() {
        return properties.getProperty("kafka.batch");
    }

    public static String kafkaClient_id_config() {
        return properties.getProperty("kafka.client_id_config");
    }

    public static String kafkaKey_serializer_class_config() {
        return properties.getProperty("kafka.key_serializer_class_config");
    }

    public static String kafkaValue_serializer_class_config() {
        return properties.getProperty("kafka.value_serializer_class_config");
    }

    public static String kafkaTopic() {
        return properties.getProperty("kafka.topic");
    }

    public static void main(String[] args) {
        System.out.println(kafkaTopic());
    }
}

  配置文件

# canal配置
canal.server.ip=10.0.165.2
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=bigdata.*

# zookeeper配置
zookeeper.server.ip=10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181

# kafka配置
# kafka集群地址
kafka.bootstrap_servers_config=10.0.165.8:9092,10.0.165.9:9092
# 配置批次发送数据的大小,满足批次大小才会发送数据
kafka.batch_size= 10240
# ack
kafka.acks=all
# 重试次数
kafka.retries=2
kafka.client_id_config=quinto_canal
# kafka的key序列化
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
# kafka的value序列化,自定义开发
kafka.value_serializer_class_config=com.quinto.protobuf.ProtoBufSerializer
# 数据写入到kafka的哪个topic中
kafka.topic=ods_canal_mysql

  kafka工具类

package com.quinto.utils;

import com.quinto.bean.CanalRowData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * kafka工具类
 */
public class KafkaUtil {


    public static KafkaProducer getKafkaProducer(){
        // 定义一个properties对象接收参数
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigUtil.kafkaBootstrap_servers_config());
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, ConfigUtil.kafkaBatch_size());
        properties.put(ProducerConfig.ACKS_CONFIG, ConfigUtil.kafkaAcks());
        properties.put(ProducerConfig.RETRIES_CONFIG, ConfigUtil.kafkaRetries());
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigUtil.kafkaClient_id_config());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaKey_serializer_class_config());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaValue_serializer_class_config());
        //实例化生产者对象并返回,key使用默认的String序列化范式,value采用自定义的序列化方式,这个序列化需要传递一个Protobufable的子类
        return new KafkaProducer<String, CanalRowData>(properties);
    }

    /**
     * 传递参数,将数据写入到kafka集群
     * @param rowData
     */
    public static void send(KafkaProducer kafkaProducer,CanalRowData rowData){
        kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), rowData));
    }
}

  ②自定义kafka序列化类

package com.quinto.protobuf;


import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

/**
 * 实现kafka的value的自定义序列化对象
 * 要求传递的泛型必须是集成ProtoBufabl接口的实现列,才可以被序列化成功
 */
public class ProtoBufSerializer implements Serializer<ProtoBufable> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }
    @Override
    public byte[] serialize(String s, ProtoBufable data) {
        return data.toBytes();
    }
    @Override
    public void close() {

    }
}

  ③protobuf序列化接口,所有能够使用protobuf序列化的bean都需要集成这个接口

package com.quinto.protobuf;

/**
 * 定义protobuf序列化接口,返回的是byte[]二进制对象,所有能够使用protobuf序列化的bean都需要集成这个接口
 */
public interface ProtoBufable {
    /**
     * 将对象转换成二进制数组
     * @return
     */
    byte[] toBytes();
}

  ④canal数据的Protobuf的实现类

package com.quinto.bean;

import com.quinto.protobuf.CanalBinLog;
import com.quinto.protobuf.ProtoBufable;

import java.util.Map;

/**
 * canal数据的Protobuf的实现类,使用protobuf序列化成bean对象。
 * 用于将binlog解析后的map对象转换成protobuf序列化后的字节码数据,写入kafka集群
 */
public class CanalRowData implements ProtoBufable {
    private String logfileName;
    private Long logfileOffset;
    private Long executeTime;
    private String schemaName;
    private String tableName;
    private String eventType;
    private Map<String, String> columns;

    public String getLogfileName() {
        return logfileName;
    }

    public void setLogfileName(String logfileName) {
        this.logfileName = logfileName;
    }

    public Long getLogfileOffset() {
        return logfileOffset;
    }

    public void setLogfileOffset(Long logfileOffset) {
        this.logfileOffset = logfileOffset;
    }

    public Long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(Long executeTime) {
        this.executeTime = executeTime;
    }

    public String getSchemaName() {
        return schemaName;
    }

    public void setSchemaName(String schemaName) {
        this.schemaName = schemaName;
    }

    public String getTableName() {
        return tableName;
    }

    public void setTableName(String tableName) {
        this.tableName = tableName;
    }

    public String getEventType() {
        return eventType;
    }

    public void setEventType(String eventType) {
        this.eventType = eventType;
    }

    public Map<String, String> getColumns() {
        return columns;
    }

    public void setColumns(Map<String, String> columns) {
        this.columns = columns;
    }

    /**
     * 构造方法中解析map对象的binlog日志
     */
    public CanalRowData(Map map){
        //解析map对象中所有的参数
        if(map.size()>0){
            this.logfileName = map.get("logfileName").toString();
            this.logfileOffset = Long.parseLong(map.get("logfileOffset").toString());
            this.executeTime = Long.parseLong(map.get("executeTime").toString());
            this.schemaName = map.get("schemaName").toString();
            this.tableName = map.get("tableName").toString();
            this.eventType = map.get("eventType").toString();
            this.columns = (Map<String, String>)map.get("columns");
        }
    }
    /**
     * 将map对象解析出来的参数,赋值给protobuf对象,然后序列化后字节码返回
     * @return
     */
    @Override
    public byte[] toBytes() {
        CanalBinLog.RowData.Builder builder = CanalBinLog.RowData.newBuilder();
        builder.setLogfileName(this.getLogfileName());
        builder.setLogfileOffset(this.getLogfileOffset());
        builder.setExecuteTime(this.getExecuteTime());
        builder.setSchemaName(this.getSchemaName());
        builder.setTableName(this.getTableName());
        builder.setEventType(this.getEventType());
        for (String key : this.getColumns().keySet()) {
            builder.putColumns(key, this.getColumns().get(key));
        }

        //将传递的binlog数据解析后序列化成字节码数据返回
        return builder.build().toByteArray();
    }
}

  ⑤canal客户端类

package com.quinto.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.quinto.bean.CanalRowData;
import com.quinto.utils.ConfigUtil;
import com.quinto.utils.KafkaUtil;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class CanalClient {
    // Canal客户端连接器
    private CanalConnector canalConnector;
    // kafka生产者工具类
    private KafkaProducer kafkaProducer;
    public CanalClient(){
        // 在构造方法中初始化连接与kafka工具类
        kafkaProducer = KafkaUtil.getKafkaProducer();
    }

    public void statrt() {
        // 1 创建连接并建立连接,连接的是高可用集群
        System.out.println(ConfigUtil.zookeeperServerIp()+
            ConfigUtil.canalServerDestination()+
            ConfigUtil.canalServerUsername()+
            ConfigUtil.canalServerPassword());
        canalConnector = CanalConnectors.newClusterConnector("10.0.165.4:2181",
            "example", "canal", "canal");

        // 不停拉取的标识
        boolean isFetching = true;
        // 建立连接
        try {
            canalConnector.connect();
            // 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿
            canalConnector.rollback();
            // 2 订阅主题
            canalConnector.subscribe(ConfigUtil.canalSubscribeFilter());

            // 不停的拉取数据
            while (isFetching){
                // 3 获取数据,尝试拿batchSize条记录,有多少取多少,不会阻塞等待
                Message message = canalConnector.getWithoutAck(ConfigUtil.kafkaBatch_size());
                // 获取这个批次的id
                long batchId = message.getId();
                // 获取拉取到的日志数据总数
                int size = message.getEntries().size();
                // 判断是否又获取到数据
                if (batchId == -1 | size == 0){
                    System.out.println("没有抓取到数据");
                    Thread.sleep(1000);
                }else {
                    System.out.println("发送数据:"+ message);
                    // 将binlog日志解析成Map对象
                    Map map = binlogToMap(message);
                    // 将map对象序列化成protobuf格式写入到kafka中
                    CanalRowData canalRowData = new CanalRowData(map);
                    // 有数据将数据发送到kafka集群
                    if(map.size()>0){
                        KafkaUtil.send(kafkaProducer,canalRowData);
                    }
                }
                // 4 提交确认
                // 提交确认,进行batch id的确认,确认之后,小于等于此 batchId 的 Message 都会被确认。
                canalConnector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 5 关闭连接
            canalConnector.disconnect();
        }
    }

    private Map binlogToMap(Message message) throws InvalidProtocolBufferException {
        Map rowDataMap = new HashMap();
        // 构建CanalClient.RowData实体
//        CanalBinLog.RowData.Builder builder = CanalBinLog.RowData.newBuilder();
        // 遍历message中的所有binlog实体
        for (CanalEntry.Entry entry: message.getEntries()){
            // 只处理事务型的binlog
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
                continue;
            }
            // 获取文件名
            String logfileName = entry.getHeader().getLogfileName();
            // 获取logfile的偏移量
            long logfileOffset = entry.getHeader().getLogfileOffset();
            // 获取sql语句的执行时间戳
            long executeTime = entry.getHeader().getExecuteTime();
            // 获取数据库名称
            String schemaName = entry.getHeader().getSchemaName();
            // 获取表名
            String tableName = entry.getHeader().getTableName();
            // 获取事件类型 insert/update/delete
            String eventType = entry.getEntryType().toString().toLowerCase();

            rowDataMap.put("logfileName", logfileName);
            rowDataMap.put("logfileOffset", logfileOffset);
            rowDataMap.put("executeTime", executeTime);
            rowDataMap.put("schemaName", schemaName);
            rowDataMap.put("tableName", tableName);
            rowDataMap.put("eventType", eventType);

            // 封装列数据
            HashMap<String, String> columnDataMap = new HashMap<>();
            // 获取所有行上的变更
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList){
                if(eventType.equals("insert") || eventType.equals("update")){
                    for (CanalEntry.Column column : rowData.getAfterColumnsList()){
                        columnDataMap.put(column.getName(), column.getValue());
                    }
                }else if(eventType.equals("delete")) {
                    for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                        columnDataMap.put(column.getName(), column.getValue());
                    }
                }
            }
            rowDataMap.put("columns",columnDataMap);
        }
        return rowDataMap;
    }
}

  ⑥入口类

package com.quinto;

import com.quinto.canal.CanalClient;

public class App {
    public static void main(String[] args) {
        // 实例化canal客户端对象,调用start方法拉取canalserver的binlog日志发送到kafka集群
        CanalClient canalClient = new CanalClient();
        canalClient.statrt();
    }
}

8.3 测试

  (1)往user_info表插入一条数据

insert  into user_info values (10001,'test','test',NULL,'test','11111111111','111@gmail.com',NULL,'3','1999-09-09','F','2020-02-02 02:02:02',NULL)

  canal client从canal server拉取到的数据如下

发送数据:Message[id=8,entries=[header {
  version: 1
  logfileName: "mysql-bin.000002"
  logfileOffset: 4615211
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1610943288000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 75
}
entryType: TRANSACTIONBEGIN
storeValue: " `"
, header {
  version: 1
  logfileName: "mysql-bin.000002"
  logfileOffset: 4615374
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1610943288000
  sourceType: MYSQL
  schemaName: "bigdata"
  tableName: "user_info"
  eventLength: 105
  eventType: INSERT
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA
storeValue: "\b\210\001\020\001P\000b\241\004\022*\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\00510001R\nbigint(20)\022*\b\001\020\f\032\nlogin_name \000(\0010\000B\004testR\fvarchar(200)\022)\b\002\020\f\032\tnick_name \000(\0010\000B\004testR\fvarchar(200)\022 \b\003\020\f\032\006passwd \000(\0010\001R\fvarchar(200)\022$\b\004\020\f\032\004name \000(\0010\000B\004testR\fvarchar(200)\0220\b\005\020\f\032\tphone_num \000(\0010\000B\v11111111111R\fvarchar(200)\022.\b\006\020\f\032\005email \000(\0010\000B\r111@gmail.comR\fvarchar(200)\022\"\b\a\020\f\032\bhead_img \000(\0010\001R\fvarchar(200)\022\'\b\b\020\f\032\nuser_level \000(\0010\000B\0013R\fvarchar(200)\022&\b\t\020[\032\bbirthday \000(\0010\000B\n1999-09-09R\004date\022!\b\n\020\f\032\006gender \000(\0010\000B\001FR\nvarchar(1)\0226\b\v\020]\032\vcreate_time \000(\0010\000B\0232020-02-02 02:02:02R\bdatetime\022\"\b\f\020]\032\foperate_time \000(\0010\001R\bdatetime"
, header {
  version: 1
  logfileName: "mysql-bin.000002"
  logfileOffset: 4615479
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1610943288000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00522379"
],raw=false,rawEntries=[]]

  查看kafka消费出来的数据

ϝ
上一篇:一文带你快速入门Canal,看这篇就够了!


下一篇:阿里canal