canal redis mysql实现数据同步

原文链接:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide

canal 原理【官方文档

  • 简介
    canal redis mysql实现数据同步
    canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务

  • 基于日志增量订阅和消费的业务包括

    数据库镜像
    数据库实时备份
    索引构建和实时维护(拆分异构索引、倒排索引等)
    业务 cache 刷新
    带业务逻辑的增量数据处理
    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • MySQL主备复制原理
    canal redis mysql实现数据同步
    MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
    MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
    MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

  • canal 工作原理

    canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
    MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
    canal 解析 binary log 对象(原始为 byte 流)

Linux安装canal admin项目【基于webUI管理canal无需linux运营】

下载canal

mkdir /usr/cancal
cd /usr/cancal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解压

mkdir canal-admin
tar zxvf canal.admin-1.1.4.tar.gz -C ./canal-admin/

解压后的内容

[root@whotw cancal]# cd canal-admin/
[root@whotw canal-admin]# ls
bin  conf  lib  logs
[root@whotw canal-admin]# cd conf
[root@whotw conf]# ls
application.yml  canal_manager.sql  canal-template.properties  instance-template.properties  logback.xml  public

修改yml信息【解压后的文件操作】

vi application.yml

server:
  port: 8089
	spring:
	  jackson:
	    date-format: yyyy-MM-dd HH:mm:ss
	    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306 
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

配置MySQL bin-log【linux:my.cnf | windows:my.ini】【MySQL配置操作】

[mysqld]    
log-bin=mysql-bin #添加这一行就ok    
binlog-format=ROW #选择row模式    
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复    

创建canal用户【MySQL DDL操作】

CREATE USER canal IDENTIFIED BY 'canal';      
GRANT SELECT,INSERT,UPDATE,DELETE,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    
FLUSH PRIVILEGES;  

初始化数据库【MySQL导入SQL操作】

 conf/canal_manager.sql

启动canal

sh bin/startup.sh

查看 admin 日志

cat logs/admin.log
2019-08-31 15:43:38.162 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2019-08-31 15:43:38.180 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2019-08-31 15:43:38.191 [main] INFO  org.apache.catalina.core.StandardService - Starting service [Tomcat]
2019-08-31 15:43:38.194 [main] INFO  org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2019-08-31 15:43:39.789 [main] INFO  o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2019-08-31 15:43:39.825 [main] INFO  o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]
此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456 

关闭

sh bin/stop.sh

canal-server端配置

使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
启动admin-server即可。

或在启动命令中使用参数:sh bin/startup.sh local 指定配置

如果是windows直接下载canal进行配置即可

java语言使用【已经启动canal】

public class CanalClient{  
   public static void main(String args[]) {  
       CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),  
               8089), "example", "canal", "canal");
       int batchSize = 1000;  
       try {  
           connector.connect();  
           connector.subscribe(".*\\..*");  
           connector.rollback();    
           while (true) {  
               Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
               long batchId = message.getId();  
               int size = message.getEntries().size();  
               if (batchId == -1 || size == 0) {  
                   try {  
                       Thread.sleep(1000);  
                   } catch (InterruptedException e) {  
                       e.printStackTrace();  
                   }  
               } else {  
                   printEntry(message.getEntries());  
               }  
               connector.ack(batchId); // 提交确认  
               // connector.rollback(batchId); // 处理失败, 回滚数据  
           }  
       } finally {  
           connector.disconnect();  
       }  
   }  

   private static void printEntry( List<Entry> entrys) {  
       for (Entry entry : entrys) {  
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
               continue;  
           }  
           RowChange rowChage = null;  
           try {  
               rowChage = RowChange.parseFrom(entry.getStoreValue());  
           } catch (Exception e) {  
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                       e);  
           }  
           EventType eventType = rowChage.getEventType();  
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                   eventType));  

       for (RowData rowData : rowChage.getRowDatasList()) {  
           if (eventType == EventType.DELETE) {
               //redisDelete(rowData.getBeforeColumnsList());
           } else if (eventType == EventType.INSERT) {  
               //redisInsert(rowData.getAfterColumnsList());
           } else {  
               System.out.println("-------> before");  
               printColumn(rowData.getBeforeColumnsList());  
               System.out.println("-------> after");  
               //redisUpdate(rowData.getAfterColumnsList());
           }  
       }  
   }  
   }  

   private static void printColumn( List<Column> columns) {  
       for (Column column : columns) {  
           System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
       }  
   }  

}  

cannal可配置zk,mq,redis等

上一篇:Canal概述及部署


下一篇:CanalSharp.AspNetCore v0.0.4-支持输出到MongoDB