使用canal监听mysql数据变化

### canal介绍 canal是阿里开源的数据库同步框架,采用非侵入式方式,解析mysql的`binary log`,再发送到目的地,目的地可是`mq`,`hbase`,`mysql`,`es`等. ### 本章流程 1. 开启mysql的bin-log日志 2. 创建mysql用户获取bin-log日志 3. canal采集bin-log日志 4. canal-client获取mysql变化信息 #### 开启bin-log日志 只需要在`mysqld.cnf`新增配置 ``` server-id=1 log-bin=mysql-bin ``` #### 创建mysql用户 ```sql create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; ``` #### 配置canal ```bash # 配置文件1:canal-server/conf/canal.properties # 端口 canal.port = 11111 # 配置文件2:canal-server/conf/example/instance.properties # 数据库连接信息 canal.instance.master.address=192.168.41.128:3307 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 监听的表(正则表达式) canal.instance.filter.regex=.*\\..* # 主题 canal.mq.topic=example ``` #### 启动mysql/canal ```bash # 本地测试采用docker方式启动 docker run -d --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=123456 -v $PWD/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf hub.c.163.com/library/mysql:5.7 docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server ``` #### 编写canal-client ```java package com.deri.stream.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; /** * @ClassName: Main * @Description: TODO * @Author: wuzhiyong * @Time: 2021/6/11 9:41 * @Version: v1.0 **/ public class Main { public static void main(String[] args) throws InterruptedException { CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("192.168.41.128",11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // 没有变化,等一秒钟再去拉取数据 Thread.sleep(1000); } else { printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } finally { connector.disconnect(); } } private static void printEntry(List entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } } ``` ### 参考连接 - https://www.cnblogs.com/moris5013/p/12371549.html - https://blog.csdn.net/weixin_35852328/article/details/87600833
上一篇:Flink实战(八十):flink-sql使用(七)Flink SQL Clien读取Kafka数据流式写入Hive(用hive 管理kafka元数据)


下一篇:Flink实战(八十):flink-sql使用(七)Flink SQL Clien读取Kafka数据流式写入Hive(用hive 管理kafka元数据)