前言
在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 DTS,它成本更低,因此它更适合小规模的数据同步。
Canal 简介
Canal 是阿里开源 CDC 工具,他可以获取 MySQL binlog 解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库同步工作。其架构如图:
Deployer 负责拉取 Binlog,解析数据,分发,记录位点。而 Client-adapter 负责接收上游数据,通过 Adapter适配器,将数据持久化到目标库。Deployer 和 Client-Adapter 作为 Canal 中的两个模块,分别独立部署。Tablestore 团队已经在 Adapter 中增加了 TablestoreAdapter,可以支持向 Tablestore 中写入数据。
下面,我们将部署 Canal,并将处于 Rds 中的 MySQL 订单数据同步进入 Tablestore,实现数据全量、增量的同步。
Canal 部署
环境准备
准备部署 canal 程序的机器。本文中在阿里云官网申请了一台 8 vCPU,16 GiB内存的 Linux 机器作为部署机器。如果读者同样需要申请 ECS,请参考:ECS入门概述。
源表准备
使用下面 SQL 在 MySQL 中新建测试表 order_contract_canal,其表结构与 order_contract 相同。
CREATE TABLE `order_contract_canal` (
`oId` varchar(50) NOT NULL,
`create_time` datetime NOT NULL COMMENT '下单时间',
`pay_time` datetime DEFAULT NULL COMMENT '支付时间',
`has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
`c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
`c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
`p_brand` tinytext COMMENT '产品品牌',
`p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
`p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
`p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
`p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
`s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
`s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
`total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
PRIMARY KEY (`oId`),
KEY `idx_sid` (`s_id`),
KEY `idx_paytime_sid` (`pay_time`,`s_id`) USING BTREE,
KEY `idx_cid` (`c_id`),
KEY `idx_paytime_cid_totalprice` (`pay_time`,`c_id`,`total_price`) USING BTREE,
KEY `idx_sid_paytime` (`s_id`,`pay_time`),
KEY `idx_sid_paytime_totalprice` (`s_id`,`pay_time`,`total_price`),
KEY `idx_paytime_totalprice_pbrand` (`p_price`,`total_price`,`pay_time`) USING BTREE,
KEY `idx_paytime` (`pay_time`),
KEY `idx_pbrand_paytime` (`p_brand`(10),`pay_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
目标表准
在 Tablestore 中创建表 canal_target_order作为测试表,使其表结构与订单表 order_contract 相同。其表结构如图:
Deployer部署
在 Canal 官方 release页,下载 canal.deployer 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。配置 MySQL Binlog 模式以及部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart。
本文创建新的实例,在 Deployer 的 conf 目录下创建文件夹 test_ots。将 conf/example/instance.properties 复制到 test_ots 路径下。然后修改 test_ots 路径下的 instance.properties 配置文件。需要关注如下配置项:
参数 |
值 |
说明 |
canal.instance.master.address |
rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306 |
数据库域名端口 |
canal.instance.rds.accesskey |
*** |
本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。 |
canal.instance.rds.secretkey |
*** |
本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。 |
canal.instance.rds.instanceId |
rm-bp15p07134rkvf7z6 |
本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。 |
canal.instance.dbUsername |
*** |
数据库账号用户名 |
canal.instance.dbPassword |
*** |
数据库账号密码 |
canal.instance.filter.regex |
test_ots\\.[test|order_contract_canal].* |
Canal 实例关注的表。通过正则表达式匹配。 这里匹配 test_ots 库下表名以 test 开头或者以 order_contract_canal 开头的表 |
canal.destinations |
test_ots |
canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots |
ClientAdapter部署
在 Canal 官方 release页,下载 canal.adapter 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。包解压后,若 plugin 路径下不存在以 client-adapter.tablestore 开头的 jar 包,说明此安装包不包含 Tablestore 对接部分代码,需要使用 canal-adapter下的 zip 包作为安装包,该安装包与官网安装包部署路径相同,配置、启动方式相同。详细的部署步骤见 ClientAdapter。
conf 路径下 application.yml中需要额外关注配置见表。
参数 |
值 |
说明 |
是否必填 |
canal.conf: canalAdapters:instance |
test_ots |
与depolyer中的destinations保持一致 |
是 |
canal.conf: canalAdapters:outerAdapters: -name: |
tablestore |
定义适配器类型,填入 tablestore 说明此适配器下游写入 Tablestore |
是 |
canal.conf: canalAdapters:outerAdapters: properties:tablestore.endpoint |
tablestore endpoint |
是 |
|
canal.conf: canalAdapters:outerAdapters: properties:tablestore.accessSecretId |
**** |
AccessSecretId |
是 |
canal.conf: canalAdapters:outerAdapters: properties:tablestore.accessSecretKey |
**** |
AccessSecretKey |
是 |
canal.conf: canalAdapters:outerAdapters: properties:tablestore.instanceName |
test-20210609 |
tablestore 中的 InstanceName |
是 |
canal.conf: terminateOnException |
true |
默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理 |
否 |
完整配置 application.yml 配置如下
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 3
timeout:
accessKey:
secretKey:
terminateOnException: true
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
username: ****
password: ****
canalAdapters:
- instance: test_ots # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: tablestore
key: ts
properties:
tablestore.endpoint: https://test-20210609.cn-hangzhou.ots.aliyuncs.com
tablestore.accessSecretId: ****
tablestore.accessSecretKey: ****
tablestore.instanceName: test-20210609
在 conf/tablestore 路径下,创建 order.yml 文件,填入以下内容。配置表示从源库表 test_ots. order_contract_canal 向目标表 canal_target_order 同步数据。
dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
database: test_ots
table: order_contract_canal
targetTable: canal_target_order
targetPk:
oId: oId
targetColumns:
oId:
create_time:
pay_time:
has_paid:
c_id:
c_name:
p_brand:
p_count:
p_id:
p_name:
p_price:
s_id:
s_name:
total_price:
etlCondition:
commitBatch: 200 # 批量提交的大小
其中各参数含义见表。
参数 |
说明 |
是否必填 |
dataSourceKey |
该任务的源数据库标识,在application.yml中可以找到该标识对应的数据库 |
是 |
destination |
canal实例名,与application.yml下的instance相同 |
是 |
groupId |
分组id,MQ模式下使用,这里不关心,配置成application.yml中canalAdapters中相同即可 |
是 |
outerAdapterKey |
使用的Adapter标识,应与application.yml中outerAdapters下的key值相同 |
是 |
threads |
筒数量,默认为1,对应tablestorewriter中的bucket数量 |
是 |
dbMapping.database |
源库名 |
是 |
dbMapping.table |
源表名 |
是 |
dbMapping.targetTable |
目标表 |
是 |
dbMapping.targetPk |
主键配置 id: target_id 源表主键:目标表主键。多主键可以配置多个 |
是 |
dbMapping.targetColumns |
配置需要同步的列名,以及列映射,可以配置类型转换。 id: target_id$string,表示id字段同步后为target_id字段,且类型映射为string。 id: target_id,表示id字段同步后为target_id字段 id: ,表示id字段同步前后字段名不变,字段类型采用默认映射。 id: $string 功能等同于id: id$string |
是 |
dbMapping.etlCondition |
全量抽取数据时的过滤条件 |
否 |
dbMapping.commitBatch |
一次批量RPC请求导入的行数,对应tablestorewriter中的maxBatchRowsCount,默认取writerConfig中的默认值200 |
否 |
updateChangeColumns |
行覆盖或行更新。 默认为false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为true,为行更新,即记录更新时,只对变化的字段进行操作。 |
否 |
全量同步数据
使用程序向原始表 order_contract_canal 中插入 1 千万行记录。
同步数据
调用 client-adapter 服务的方法触发同步任务。指令格式为
curl "localhost:8081/etl/{type}/{key}/{task}" -X POST
type 为下游数据库类型;key 是 adapter key;task 为任务配置文件的名称。在本文中,指令为:
curl "localhost:8081/etl/tablestore/ts/order.yml" -X POST
程序会首先中止增量数据传输,然后同步全量历史数据。同步开始后,可以在日志中看到 Adapter 中 TablestoreWriter 的传输日志变化。
性能测试
在 Tablestore 监控页面查看数据写入速率,首先进入 Tablestore控制台。点击对应实例进入实例详情页。
点击表 canal_target_order。
点击监控指标进入监控页面。
同步任务开始后,在监控页面可以看到数据如图。此时 Canal 所在机器配置为 8 核 16G,order.yml 中 threads 配置为 8。源库记录数在 1千万,每行数据大小约 0.5KB。可以看到在任务开始的时候,并发写入 Tablestore 速率很高,在 2w行/s 左右。而随着任务的进行,写入速率开始下降,这是由于全量导入数据时从源库获取数据时使用 limit offset 导致的,受限于上游数据的获取。
1千万数据完成写入共耗时 28 分钟。耗时统计见下表。可以看到,在数据导入初期,导入速率相对较快,而数据导入后期,导入效率明显降低。时间统计和控制台中监控数据吻合。
从程序开始到完成导入 |
使用时间 |
完成 300w 行导入 |
3m |
完成 400w 行导入 |
5m |
完成 500w 行导入 |
8m |
完成 600w 行导入 |
11m |
完成 700w 行导入 |
14m |
完成 800w 行导入 |
18m |
完成 900w 行导入 |
22.5m |
完成 1000w 行导入 |
28m |
增量同步数据
使用附录中的程序中的接口("/canal/press"),向原始表持续写入数据。
性能测试
输入如下指令调用接口,使用 3 线程写入数据,每个线程每秒写入 4000 行记录。
curl "localhost:8082/canal/press?rps=4000&threads=3" -X POST
在控制台可以看到数据持续写入,速率约在 1.2w 行/s。
在当前 8 核 16G 的机器配置下,继续增加并发写入量,写入 1.6w 到 2w 行每秒,测试出的增量同步上限约在 1.5w 行/s,每行记录约 0.5KB。
异常处理
ClientAdapter 配置文件application.yml 中 terminateOnException 若不配置或配置为 false,同步程序同步后仍报错,则程序会记录日志,跳过报错数据,继续同步任务。而若 terminateOnException 配置为 true,则同步报错后,程序会中止增量数据同步任务,等待用户介入处理报错。此时,用户可以通过下面接口查看任务的开启、中断状态。命令格式如下:
curl "localhost:8081/syncSwitch/{destination}"
在本文中命令为
curl "localhost:8081/syncSwitch/test_ots"
处理异常后,可以调用如下接口重新启动增量同步任务。
curl "localhost:8081/syncSwitch/test_ots/on" -X PUT
总结
本文简要介绍了 Canal,并且详细的展示了如何使用 Canal 从 MySQL 库向 Tablestore 中同步全量、增量数据。
附录
Canal 测试程序git地址: