基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

前言 

在基于 MySQL + Tablestore 分层存储架构的大规模订单系统中,利用 CDC 技术将 MySQL 数据同步到 Tablestore 是不可缺少的一步。前文已经详细讲述了如何使用 DTS 向 Tablestore 同步数据。对于中小规模的数据库,或者个人开发者,还可以使用 Canal 从 MySQL 向 Tablestore 同步数据。Canal 部署简单,易于运维,且相比于 DTS,它成本更低,因此它更适合小规模的数据同步。

Canal 简介

Canal 是阿里开源 CDC 工具,他可以获取 MySQL binlog 解析,并将数据变动传输给下游。详情可参考Canal官网。基于 Canal,可以实现从 MySQL 到其他数据库的实时同步。Canal 部署简单、成本低,适合中小规模 Mysql 数据库同步工作。其架构如图:

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

Deployer 负责拉取 Binlog,解析数据,分发,记录位点。而 Client-adapter 负责接收上游数据,通过 Adapter适配器,将数据持久化到目标库。Deployer 和 Client-Adapter 作为 Canal 中的两个模块,分别独立部署。Tablestore 团队已经在 Adapter 中增加了 TablestoreAdapter,可以支持向 Tablestore 中写入数据。

下面,我们将部署 Canal,并将处于 Rds 中的 MySQL 订单数据同步进入 Tablestore,实现数据全量、增量的同步。

Canal 部署

环境准备

准备部署 canal 程序的机器。本文中在阿里云官网申请了一台 8 vCPU,16 GiB内存的 Linux 机器作为部署机器。如果读者同样需要申请 ECS,请参考:ECS入门概述

源表准备

使用下面 SQL 在 MySQL 中新建测试表 order_contract_canal,其表结构与 order_contract 相同。

CREATE TABLE `order_contract_canal` (
  `oId` varchar(50) NOT NULL,
  `create_time` datetime NOT NULL COMMENT '下单时间',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `has_paid` tinyint(4) DEFAULT NULL COMMENT '是否已经支付',
  `c_id` varchar(20) DEFAULT NULL COMMENT '消费者id',
  `c_name` varchar(20) DEFAULT NULL COMMENT '消费者姓名',
  `p_brand` tinytext COMMENT '产品品牌',
  `p_count` mediumint(9) DEFAULT NULL COMMENT '产品数量',
  `p_id` varchar(20) DEFAULT NULL COMMENT '产品id',
  `p_name` varchar(20) DEFAULT NULL COMMENT '产品名',
  `p_price` decimal(16,2) DEFAULT NULL COMMENT '产品价格',
  `s_id` varchar(20) DEFAULT NULL COMMENT '店铺id',
  `s_name` varchar(20) DEFAULT NULL COMMENT '店铺名称',
  `total_price` decimal(16,2) DEFAULT NULL COMMENT '总价格',
  PRIMARY KEY (`oId`),
  KEY `idx_sid` (`s_id`),
  KEY `idx_paytime_sid` (`pay_time`,`s_id`) USING BTREE,
  KEY `idx_cid` (`c_id`),
  KEY `idx_paytime_cid_totalprice` (`pay_time`,`c_id`,`total_price`) USING BTREE,
  KEY `idx_sid_paytime` (`s_id`,`pay_time`),
  KEY `idx_sid_paytime_totalprice` (`s_id`,`pay_time`,`total_price`),
  KEY `idx_paytime_totalprice_pbrand` (`p_price`,`total_price`,`pay_time`) USING BTREE,
  KEY `idx_paytime` (`pay_time`),
  KEY `idx_pbrand_paytime` (`p_brand`(10),`pay_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

目标表准

在 Tablestore 中创建表 canal_target_order作为测试表,使其表结构与订单表 order_contract 相同。其表结构如图:

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

Deployer部署

在 Canal 官方 release页,下载 canal.deployer 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。配置 MySQL Binlog 模式以及部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart

本文创建新的实例,在 Deployer 的 conf 目录下创建文件夹 test_ots。将 conf/example/instance.properties 复制到 test_ots 路径下。然后修改 test_ots 路径下的 instance.properties 配置文件。需要关注如下配置项:

参数

说明

canal.instance.master.address

rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306

数据库域名端口

canal.instance.rds.accesskey

***

本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。

canal.instance.rds.secretkey

***

本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。

canal.instance.rds.instanceId

rm-bp15p07134rkvf7z6

本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。

canal.instance.dbUsername

***

数据库账号用户名

canal.instance.dbPassword

***

数据库账号密码

canal.instance.filter.regex

test_ots\\.[test|order_contract_canal].*

Canal 实例关注的表。通过正则表达式匹配。

这里匹配 test_ots 库下表名以 test 开头或者以 order_contract_canal 开头的表

canal.destinations

test_ots

canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots

ClientAdapter部署

在 Canal 官方 release页,下载 canal.adapter 包。将包解压后,配置 conf 路径下的配置文件,然后就可以通过 bin 路径下的启动脚本启动。包解压后,若 plugin 路径下不存在以 client-adapter.tablestore 开头的 jar 包,说明此安装包不包含 Tablestore 对接部分代码,需要使用 canal-adapter下的 zip 包作为安装包,该安装包与官网安装包部署路径相同,配置、启动方式相同。详细的部署步骤见 ClientAdapter

conf 路径下 application.yml中需要额外关注配置见表。

参数

说明

是否必填

canal.conf:

canalAdapters:instance

test_ots

与depolyer中的destinations保持一致

canal.conf:

canalAdapters:outerAdapters: 

-name:

tablestore

定义适配器类型,填入 tablestore 说明此适配器下游写入 Tablestore

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.endpoint

https://test-20210609.cn-hangzhou.ots.aliyuncs.com

tablestore endpoint

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretId

****

AccessSecretId

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

****

AccessSecretKey

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.instanceName

test-20210609

tablestore 中的 InstanceName

canal.conf: terminateOnException

true

默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理

完整配置 application.yml 配置如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  terminateOnException: true
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://rm-bp15p07134rkvf7z6wo.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
      username: ****
      password: ****
  canalAdapters:
  - instance: test_ots # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: tablestore
        key: ts
        properties:
          tablestore.endpoint: https://test-20210609.cn-hangzhou.ots.aliyuncs.com
          tablestore.accessSecretId: ****
          tablestore.accessSecretKey: ****
          tablestore.instanceName: test-20210609

在 conf/tablestore 路径下,创建 order.yml 文件,填入以下内容。配置表示从源库表 test_ots. order_contract_canal 向目标表 canal_target_order 同步数据。

dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
  database: test_ots
  table: order_contract_canal
  targetTable: canal_target_order
  targetPk:
    oId: oId
  targetColumns:
    oId:
    create_time:
    pay_time:
    has_paid:
    c_id:
    c_name:
    p_brand:
    p_count:
    p_id:
    p_name:
    p_price:
    s_id:
    s_name:
    total_price:
  etlCondition: 
  commitBatch: 200 # 批量提交的大小

其中各参数含义见表。

参数

说明

是否必填

dataSourceKey

该任务的源数据库标识,在application.yml中可以找到该标识对应的数据库

destination

canal实例名,与application.yml下的instance相同

groupId

分组id,MQ模式下使用,这里不关心,配置成application.yml中canalAdapters中相同即可

outerAdapterKey

使用的Adapter标识,应与application.yml中outerAdapters下的key值相同

threads

筒数量,默认为1,对应tablestorewriter中的bucket数量

dbMapping.database

源库名

dbMapping.table

源表名

dbMapping.targetTable

目标表

dbMapping.targetPk

主键配置

id: target_id 源表主键:目标表主键。多主键可以配置多个

dbMapping.targetColumns

配置需要同步的列名,以及列映射,可以配置类型转换。

id: target_id$string,表示id字段同步后为target_id字段,且类型映射为string。

id: target_id,表示id字段同步后为target_id字段

id: ,表示id字段同步前后字段名不变,字段类型采用默认映射。

id: $string 功能等同于id: id$string

dbMapping.etlCondition

全量抽取数据时的过滤条件

dbMapping.commitBatch

一次批量RPC请求导入的行数,对应tablestorewriter中的maxBatchRowsCount,默认取writerConfig中的默认值200

updateChangeColumns

行覆盖或行更新。

默认为false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为true,为行更新,即记录更新时,只对变化的字段进行操作。

全量同步数据

使用程序向原始表 order_contract_canal 中插入 1 千万行记录。

同步数据

调用 client-adapter 服务的方法触发同步任务。指令格式为

curl "localhost:8081/etl/{type}/{key}/{task}" -X POST

type 为下游数据库类型;key 是 adapter key;task 为任务配置文件的名称。在本文中,指令为:

curl "localhost:8081/etl/tablestore/ts/order.yml" -X POST

程序会首先中止增量数据传输,然后同步全量历史数据。同步开始后,可以在日志中看到 Adapter 中 TablestoreWriter 的传输日志变化。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

性能测试

在 Tablestore 监控页面查看数据写入速率,首先进入 Tablestore控制台。点击对应实例进入实例详情页。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

点击表 canal_target_order。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

点击监控指标进入监控页面。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

同步任务开始后,在监控页面可以看到数据如图。此时 Canal 所在机器配置为 8 核 16G,order.yml 中 threads 配置为 8。源库记录数在 1千万,每行数据大小约 0.5KB。可以看到在任务开始的时候,并发写入 Tablestore 速率很高,在 2w行/s 左右。而随着任务的进行,写入速率开始下降,这是由于全量导入数据时从源库获取数据时使用 limit offset 导致的,受限于上游数据的获取。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

1千万数据完成写入共耗时 28 分钟。耗时统计见下表。可以看到,在数据导入初期,导入速率相对较快,而数据导入后期,导入效率明显降低。时间统计和控制台中监控数据吻合。

从程序开始到完成导入

使用时间

完成 300w 行导入

3m

完成 400w 行导入

5m

完成 500w 行导入

8m

完成 600w 行导入

11m

完成 700w 行导入

14m

完成 800w 行导入

18m

完成 900w 行导入

22.5m

完成 1000w 行导入

28m

增量同步数据

使用附录中的程序中的接口("/canal/press"),向原始表持续写入数据。

性能测试

输入如下指令调用接口,使用 3 线程写入数据,每个线程每秒写入 4000 行记录。

curl "localhost:8082/canal/press?rps=4000&threads=3"  -X POST

在控制台可以看到数据持续写入,速率约在 1.2w 行/s。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

在当前 8 核 16G 的机器配置下,继续增加并发写入量,写入 1.6w 到 2w 行每秒,测试出的增量同步上限约在 1.5w 行/s,每行记录约 0.5KB。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇

异常处理

ClientAdapter 配置文件application.yml 中 terminateOnException 若不配置或配置为 false,同步程序同步后仍报错,则程序会记录日志,跳过报错数据,继续同步任务。而若 terminateOnException 配置为 true,则同步报错后,程序会中止增量数据同步任务,等待用户介入处理报错。此时,用户可以通过下面接口查看任务的开启、中断状态。命令格式如下:

curl "localhost:8081/syncSwitch/{destination}"

在本文中命令为

curl "localhost:8081/syncSwitch/test_ots"

处理异常后,可以调用如下接口重新启动增量同步任务。

curl "localhost:8081/syncSwitch/test_ots/on" -X PUT

总结

本文简要介绍了 Canal,并且详细的展示了如何使用 Canal 从 MySQL 库向 Tablestore 中同步全量、增量数据。

附录

Canal 测试程序git地址:

https://github.com/aliyun/tablestore-examples

上一篇:基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据处理ETL篇


下一篇:基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇