canal实践运用

canal实践运用

了解canal的原理

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

canal实践运用

1.mysql的配置

1.1开启mysql的binlog模块

切换到mysql的安装路径,找到my.cnf(Linux)/my.ini (windows),加入如下内容:

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

配置完成后,需要重启数据库

1.2 创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容

CREATE USER canal IDENTIFIED BY 'canal';    
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON 数据库名.表名 TO 'canal'@'%';  
-- GRANT ALL PRIVILEGES ON 数据库名.表名 TO 'canal'@'%' ;  
FLUSH PRIVILEGES;

比如(给全库全表的读,拷贝,复制的权限):GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

可能出现问题

报错:ERROR 1819 (HY000): Your password does not satisfy the current policy requirements

是因为密码设置过于简单MySQL有密码设置的规范 具体是与validate_password_policy的值有关

查看MySQL完整的初始密码规则,登陆后执行以下命令:

mysql> SHOW VARIABLES LIKE 'validate_password%';

密码的长度是由validate_password_length决定的,但是可以通过以下命令修改

set global validate_password_length=4;

validate_password_policy决定密码的验证策略,默认等级为MEDIUM(中等),可通过以下命令修改为LOW(低)

set global validate_password_policy=0;

2.canal配置

下载地址 https://github.com/alibaba/canal/releases/

git下载可能很慢或者下不下来,这边附上我的网盘链接,过期了的话可以联系我

链接:https://pan.baidu.com/s/1zL_Mx_EqskjHALPvvq_GeQ
提取码:wu6s

​ 2.1 解压 tar zxvf canal.deployer-1.0.0.tar.g

​ 2.2.文件说明:canal 下有几个文件夹

bin 
conf 配置
lib  jar包
logs  日志
一般是这四个

2.3 配置说明

​ 主要的两个配置文件是canal.properties 和instance.properties 后者在conf中的example文件夹中,正常情况下canal.properties这个配置文件是不需要改动的这个后面会提,这边贴上我的配置(instance.properties),

## mysql serverId
canal.instance.mysql.slaveId = 2 #注意这个id不能和mysql配置时候的server_id一样

# position info
canal.instance.master.address = localhost:3306 #自己的数据库连接地址
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbUsername = canal 
canal.instance.dbPassword = canal 
canal.instance.defaultDatabaseName = pifi #数据库名
canal.instance.connectionCharset = UTF-8  #编码

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =

2.4 启动

./bin/startup.sh
这边注意这个Linux
    
windows是
./bin/startup.bat

2.5 查看启动状态

通过logs/canal/canal.log 和logs/example/example.log日志来判断canal是否启动成功。

logs/canal/canal.log

2021-04-08 09:43:55.450 [main] INFO  c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2021-04-08 09:43:55.455 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2021-04-08 09:43:55.668 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.0.75.1:11111]
2021-04-08 09:43:56.600 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
2021-04-08 10:36:26.771 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2021-04-08 10:36:26.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations

logs/example/example.log

2021-04-08 09:38:49.733 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2021-04-08 09:38:49.885 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2021-04-08 09:38:50.086 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2021-04-08 09:38:50.090 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2021-04-08 09:38:50.091 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 
2021-04-08 09:38:50.096 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

基本到这里就可以直接写代码测试或者直接看上面的日志文件

package com.example.demo.canal;


import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;

/**
 * @author zhang ho jian
 * @date 2021/4/8
 * @time 10:16
 * @Description :测试Canal
 */
public class CanalTest {


    public static void main(String[] args) {
        //ip和端口换成自己的canal默认是11111
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry( List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                 //如果需要业务的处理可以在这边进行对应的处理,我这边是需要将多个数据库的数据同步到一个数据库中,会出现主键冲突的问题,所以在这里处理主键。正常只是做主从的话是不需要用到这里的
                if (eventType == EventType.DELETE) {
                    System.out.println("删除操作"+rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    System.out.println("新增操作"+rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("更新"+rowData.getAfterColumnsList());

                }
            }
        }
    }

    private static void printColumn( List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

    }

这边我操作一下数据库

canal实践运用

可以看到控制台输出

-------> before
preset :     update=false
besponsible_person :     update=false
breaking_current :     update=false
breaking_capacity :     update=false
switch_property : 进线开关    update=false
switch_state :     update=false
operating_mechanism :     update=false
coal_mine_name : 智能电网仿真实验室    update=false
equipment_manufacturer :     update=false
device_name : 开关19    update=false
device_model :     update=false
correspondence_address : 测试3    update=false
equipment_number : 19    update=false
preset_status :     update=false
rated_capacity :     update=false
rated_voltage :     update=false
rated_current :     update=false
protector_category : ZBT-11C(DSP)保护器    update=false
protector_number : 19    update=false
Substation_no : TEST-BDS-001    update=false
short_value : 2.000000000    update=false
overcurrent_value : 1.000000000    update=false
overload_setting_value : 3.000000000    update=false
leakage_current1 : 2.000000000    update=false
leakage_current2 : 44.000000000    update=false
negative_sequence1 : 24.000000000    update=false
negative_sequence2 : 3.000000000    update=false
load_power : 2.000000000    update=false
three_phase : 12.000000000    update=false
two_phase : 12.000000000    update=false
time_setting : 321.000000000    update=false
negative_sequence_thime : 1.200000000    update=false
protector_manufacturer : 2    update=false
更新[index: 0
sqlType: 12
name: "preset"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(32)"
, index: 1
sqlType: 12
name: "besponsible_person"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 2
sqlType: 12
name: "breaking_current"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 3
sqlType: 12
name: "breaking_capacity"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 4
sqlType: 12
name: "switch_property"
isKey: false
updated: false
isNull: false
value: "\350\277\233\347\272\277\345\274\200\345\205\263"
mysqlType: "varchar(100)"
, index: 5
sqlType: 12
name: "switch_state"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 6
sqlType: 12
name: "operating_mechanism"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 7
sqlType: 12
name: "coal_mine_name"
isKey: false
updated: false
isNull: false
value: "\346\231\272\350\203\275\347\224\265\347\275\221\344\273\277\347\234\237\345\256\236\351\252\214\345\256\244"
mysqlType: "varchar(100)"
, index: 8
sqlType: 12
name: "equipment_manufacturer"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 9
sqlType: 12
name: "device_name"
isKey: true
updated: false
isNull: false
value: "\345\274\200\345\205\26319"
mysqlType: "varchar(100)"
, index: 10
sqlType: 12
name: "device_model"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 11
sqlType: 12
name: "correspondence_address"
isKey: false
updated: true
isNull: false
value: "\346\265\213\350\257\225"
mysqlType: "varchar(100)"
, index: 12
sqlType: 12
name: "equipment_number"
isKey: true
updated: false
isNull: false
value: "19"
mysqlType: "varchar(100)"
, index: 13
sqlType: 12
name: "preset_status"
isKey: false
updated: false
isNull: true
mysqlType: "varchar(100)"
, index: 14
sqlType: 3
name: "rated_capacity"
isKey: false
updated: false
isNull: true
mysqlType: "decimal(20,10)"
, index: 15
sqlType: 3
name: "rated_voltage"
isKey: false
updated: false
isNull: true
mysqlType: "decimal(20,10)"
, index: 16
sqlType: 3
name: "rated_current"
isKey: false
updated: false
isNull: true
mysqlType: "decimal(20,10)"
, index: 17
sqlType: 12
name: "protector_category"
isKey: false
updated: false
isNull: false
value: "ZBT-11C(DSP)\344\277\235\346\212\244\345\231\250"
mysqlType: "varchar(100)"
, index: 18
sqlType: 12
name: "protector_number"
isKey: false
updated: false
isNull: false
value: "19"
mysqlType: "varchar(100)"
, index: 19
sqlType: 12
name: "Substation_no"
isKey: false
updated: false
isNull: false
value: "TEST-BDS-001"
mysqlType: "varchar(100)"
, index: 20
sqlType: 3
name: "short_value"
isKey: false
updated: false
isNull: false
value: "2.000000000"
mysqlType: "decimal(20,10)"
, index: 21
sqlType: 3
name: "overcurrent_value"
isKey: false
updated: false
isNull: false
value: "1.000000000"
mysqlType: "decimal(20,10)"
, index: 22
sqlType: 3
name: "overload_setting_value"
isKey: false
updated: false
isNull: false
value: "3.000000000"
mysqlType: "decimal(20,10)"
, index: 23
sqlType: 3
name: "leakage_current1"
isKey: false
updated: false
isNull: false
value: "2.000000000"
mysqlType: "decimal(20,10)"
, index: 24
sqlType: 3
name: "leakage_current2"
isKey: false
updated: false
isNull: false
value: "44.000000000"
mysqlType: "decimal(20,10)"
, index: 25
sqlType: 3
name: "negative_sequence1"
isKey: false
updated: false
isNull: false
value: "24.000000000"
mysqlType: "decimal(20,10)"
, index: 26
sqlType: 3
name: "negative_sequence2"
isKey: false
updated: false
isNull: false
value: "3.000000000"
mysqlType: "decimal(20,10)"
, index: 27
sqlType: 3
name: "load_power"
isKey: false
updated: false
isNull: false
value: "2.000000000"
mysqlType: "decimal(20,10)"
, index: 28
sqlType: 3
name: "three_phase"
isKey: false
updated: false
isNull: false
value: "12.000000000"
mysqlType: "decimal(20,10)"
, index: 29
sqlType: 3
name: "two_phase"
isKey: false
updated: false
isNull: false
value: "12.000000000"
mysqlType: "decimal(20,10)"
, index: 30
sqlType: 3
name: "time_setting"
isKey: false
updated: false
isNull: false
value: "321.000000000"
mysqlType: "decimal(20,10)"
, index: 31
sqlType: 3
name: "negative_sequence_thime"
isKey: false
updated: false
isNull: false
value: "1.200000000"
mysqlType: "decimal(20,10)"
, index: 32
sqlType: 12
name: "protector_manufacturer"
isKey: false
updated: false
isNull: false
value: "2"
mysqlType: "varchar(255)"
]

显然这样的数据我们并不好操作看下面

  for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    System.out.println("删除操作"+rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    System.out.println("新增操作"+rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    jsonUpdate(rowData.getAfterColumnsList());
                   /* printColumn(rowData.getBeforeColumnsList());*/


                }
            }
            
            
            
       private static  void jsonUpdate( List<Column> columns){
        JSONObject json=new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if(columns.size()>0){
            System.out.println(("user:"+ columns.get(0).getValue()));
            System.out.println("+++++++++++++++++++++++++++");
            System.out.println(json);
        }
    }

控制台

user:
+++++++++++++++++++++++++++
{"Substation_no":"TEST-BDS-001","besponsible_person":"","breaking_capacity":"","breaking_current":"","coal_mine_name":"智能电网仿真实验室","correspondence_address":"测试3","device_model":"","device_name":"开关19","equipment_manufacturer":"","equipment_number":"19","leakage_current1":"2.000000000","leakage_current2":"44.000000000","load_power":"2.000000000","negative_sequence1":"24.000000000","negative_sequence2":"3.000000000","negative_sequence_thime":"1.200000000","operating_mechanism":"","overcurrent_value":"1.000000000","overload_setting_value":"3.000000000","preset":"","preset_status":"","protector_category":"ZBT-11C(DSP)保护器","protector_manufacturer":"2","protector_number":"19","rated_capacity":"","rated_current":"","rated_voltage":"","short_value":"2.000000000","switch_property":"进线开关","switch_state":"","three_phase":"12.000000000","time_setting":"321.000000000","two_phase":"12.000000000"}

扩展

若需要听说监控多个数据库的数据就需要配置canal.properties这个配置文件了

#################################################
######### 		common argument		############# 
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip =
canal.port = 11111 #端口默认11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example,example2 
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml


canal.destinations 这个地方就是配置多个数据源的会在 名字可以自定义,会在你的conf中生成对应的名字的文件夹,需要吧之前的配置文件复制进去

上一篇:大数据同步工具DataX、Sqoop、Canal之比较


下一篇:geoserver发布瓦片,geoserver发布arcgis切片和geoserver发布金字塔切片