Canal实现Mysql数据实时同步到数仓

使用canal 基于mysql数据库binlog的增量订阅消费并通过ETL存储到MongoDB数据库

一:业务数据库和数据仓库的概要说明:

  • 业务数据库中的数据结构是为了完成交易而设计的,不是为了而查询和分析的便利设计的。
  • 业务数据库大多是读写优化的,即又要读(查看商品信息),也要写(产生订单,完成支付)。因此对于大量数据的读(查询指标,一般是复杂的只读类型查询)是支持不足的。
  • 当对数据的分析逐渐演化到非常精细化和具体的用户的集群分析,特定用户在某种使用场景中,例如“海外用户和国内用户在过去五年的第一季度服装供应商采购的购买行为与公司进行的促销活动方案之间的关系”。

二:数据仓库的作用在于:

  • 数据结构为了分析和查询的便利;
  • 只读优化的数据库,即不需要它写入速度多么快,只要做大量数据的复杂查询的速度足够快就行了。

那么在这里前一种业务数据库(读写都优化)的是业务性数据库,后一种是分析性数据库,即数据仓库。

在此记录一下用Canal监听MySQL的binlog事件,实现增量订阅消费,并将其转存储到mongo数据仓库,(此处选用MongoDB作为数据仓库)。

这样把数据从业务性的数据库中提取、加工、导入分析性的数据库就是传统的ETL 工作。

  • 即Extract-Transform-Load,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程

三:涉及到的技术:Docker,Canal,Mysql,MongoDB

简要介绍Canal原理

在row level模式下,bin-log中可以不记录执行的sql语句的上下文相关的信息,仅仅只需要记录那一条被修改。所以rowlevel的日志内容会非常清楚的记录下每一行数据修改的细节。不会出现某些特定的情况下的存储过程或function,以及trigger的调用和触发无法被正确复制的问题
Canal实现Mysql数据实时同步到数仓

四: 查看并设置MySQL的binlog模式为ROW模式

mysql> SHOW VARIABLES LIKE 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (1.58 sec)
  • 确认是使用ROW的binlog模式,给MySQL数据库分配一个canal的角色权限
CREATE USER canaletl IDENTIFIED BY 'canal';
alter user 'canaletl'@'%' identified with mysql_native_password by 'canalpass';
GRANT ALL PRIVILEGES ON *.* TO 'canaletl'@'%' ;
FLUSH PRIVILEGES;

五: Docker 创建启动一个较新和稳定的Canal:1.1.4版本

docker run -d --name=canal --network=cluster-net -p 11111:11111 -e canal.destinations=qcdbv2 -e canal.instance.master.address=mysql8:3306 -e canal.instance.dbUsername=canaletl -e canal.instance.dbPassword=canal1qaz2wsx -e canal.instance.connectionCharset=UTF-8 -e canal.user=canal -e canal.passwd=canal canal/canal-server:v1.1.4

六:启动后查看docker运行容器:

root@server:~# docker ps
CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS              PORTS                                                      NAMES
cdd169d7b076        canal/canal-server:v1.1.4   "/alidata/bin/main.s…"   1 hours ago        Up 1 hours         9100/tcp, 11110/tcp, 11112/tcp, 0.0.0.0:11111->11111/tcp   canal

七:查看Canal监听日志,确认是否成功监听MySQL

[root@62a5f11f7e15 admin]# tail -f -n100 canal-server/logs/test/test.log
]
2020-04-22 15:16:03.997 [Thread-6] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - stop CannalInstance for null-mysqldb
2020-04-22 15:16:04.000 [Thread-6] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
2020-04-22 15:41:47.637 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-04-22 15:41:47.640 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [mysqldb/instance.properties]
2020-04-22 15:41:47.757 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2020-04-22 15:41:47.784 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-04-22 15:41:47.785 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [mysqldb/instance.properties]
2020-04-22 15:41:48.083 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-mysqldb
2020-04-22 15:41:48.092 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-04-22 15:41:48.093 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2020-04-22 15:41:48.183 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-04-22 15:41:48.185 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to mysqldb\..*
2020-04-22 15:41:48.185 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^mysqldb\..*$
2020-04-22 15:41:48.185 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-04-22 15:41:48.204 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"mysql8","port":3306}},"postion":{"gtid":"","included":false,"journalName":"binlog.000006","position":30019,"serverId":1,"timestamp":1587534667000}}
2020-04-22 15:41:48.593 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000006,position=30019,serverId=1,gtid=,timestamp=1587534667000] cost : 401ms , the next step is binlog dump

八:编写ETL代码,主要实现功能为接收Canal的binlog订阅事件字节流,转换为可供MongoDB存储的集合,连接MongoDB进行增删改查等操作

  • 举例,连接类CanalClientStarter
/**
 * Filter规则描述:适用于instance.properties和Consumer端的subscribe()方法
        1)  所有表:.*   or  .*\\..*
        2)  canal schema下所有表: canal\\..*
        3)  canal下的以canal打头的表:canal\\.canal.*
    4)  canal schema下的一张表:canal.test1
        5)  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
 *
 */
	private String heartBeatCollection4Mongo;
	public void init() {
		doHeartBeat();
		Thread thread = new Thread(new Runnable() {
			@Override
			public void run() {
				CanalConnector connector = getCanalConnector();
				int batchSize = 100;
				int retryCnt = 0;
				while (running) {
					try {					connector.connect();
				connector.subscribe(canalFilterReg);//canal schema下所有表: canal\\..*
						connector.rollback();

		while (running) {

		Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

		long batchId = message.getId();	

		int size = message.getEntries().size();	

		logger.debug("canal subscribed batchId:" + batchId);

		if (batchId == -1 || size == 0) {
								try {
									Thread.sleep(10000);
								} catch (InterruptedException e) {
									logger.error(e.getMessage(), e);
								}
		} else {

		boolean res = consumeCanalMessage(message.getEntries());
								if (res) {
									connector.ack(batchId);
								} else {
									connector.rollback(batchId);
								}
							}
							doHeartBeat();
						}
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
						try {
							Thread.sleep(10000);
						} catch (InterruptedException e1) {
						}
					} finally {
						retryCnt++;
						logger.debug("retry " + retryCnt);
						if (retryCnt > 20) {
							connector.disconnect();
							connector = getCanalConnector();
							retryCnt = 0;
						}
					}
				}
				logger.info("etl服务退出");
				mongoTemplate.save(new HeartBeatDoc(new Date(), destination, zkServers),
						heartBeatCollection4Mongo + "_kill");
			}

		});
		thread.start();
	}

	protected CanalConnector getCanalConnector() {
		CanalConnector connector = CanalConnectors.newSingleConnector(
				new InetSocketAddress(canalServerHost, canalServerPort), destination, canalUser, canalPwd);
		//CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, canalUser, canalPwd);
		return connector;
	}

	protected boolean consumeCanalMessage(List<Entry> entries) {
		for (Entry entry : entries) {
			if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
					|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
				continue;
			}

			RowChange rowChage = null;
			try {
				rowChage = RowChange.parseFrom(entry.getStoreValue());
			} catch (Exception e) {
				throw new RuntimeException("ERROR ## parser of rowChange-event has an error , data:" + entry.toString(),
						e);
			}

			EventType eventType = rowChage.getEventType();
			String tableName = entry.getHeader().getTableName();
			long logfileOffset = entry.getHeader().getLogfileOffset();
			logger.debug(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
					entry.getHeader().getLogfileName(), logfileOffset, entry.getHeader().getSchemaName(), tableName,
					eventType));

			if (!etlServiceScannerAndDispatcher.matchEtlService(tableName, eventType)) {
				logger.info("no etl service bean matching db operation:" + eventType + " on " + tableName);
				continue;
			}

			for (RowData rowData : rowChage.getRowDatasList()) {
				if (eventType == EventType.DELETE) {
					printColumn(rowData.getBeforeColumnsList());
				} else if (eventType == EventType.INSERT) {
					printColumn(rowData.getAfterColumnsList());
				} else {
					//logger.debug("-------> before");
					printColumn(rowData.getBeforeColumnsList());
					//logger.debug("-------> after");
					printColumn(rowData.getAfterColumnsList());
				}

				etlServiceScannerAndDispatcher.doEtl(tableName, eventType, rowData);
			}
		}
		return true;
	}

	......
}

九:启动etl工程项目

发现打出了心跳,则说明已经成功与Canal对接上

11:36:20DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:36:30DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:36:30DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:36:40DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:36:40DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:36:50DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:36:50DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:00DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:00DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:10DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:10DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:20DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:20DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:30DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:30DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:40DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:40DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:50DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:50DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1

  • Mysql执行新增表的操作,初始化跑完影子表后,即创建etl监听的binlog事件,初始化表,更新表数据等

十:流程梳理

Canal监听到binlog事件进行增量订阅

2020-04-22 15:41:48.204 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"mysql8","port":3306}},"postion":{"gtid":"","included":false,"journalName":"binlog.000006","position":30019,"serverId":1,"timestamp":1587534667000}}
2020-04-22 15:41:48.593 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000006,position=30019,serverId=1,gtid=,timestamp=1587534667000] cost : 401ms , the next step is binlog dump
2020-04-23 11:59:37.220 [New I/O server worker #1-3] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to mysqldb\..*
2020-04-23 11:59:37.220 [New I/O server worker #1-3] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^mysqldb\..*$

ETL监听到Canal订阅到数据,开始转换存储到MongoDB,输出更新日志,

14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:721728] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:729938] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.doHeartBeat:L177]Canal client HeartBeat...
14:01:44DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:49
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:737877] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:745600] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.doHeartBeat:L177]Canal client HeartBeat...
14:01:44DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:50
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:753293] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:761419] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.doHeartBeat:L177]Canal client HeartBeat...
14:01:44DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:51

查看mongo确实有刚刚新增的集合

Canal实现Mysql数据实时同步到数仓

整体效果:

Mysql --> Canal --> ETL --> MongoDB
Canal实现Mysql数据实时同步到数仓

上一篇:Canal:同步mysql增量数据工具,一篇详解核心知识点


下一篇:一文带你快速入门Canal,看这篇就够了!