可以使用 Canal 将数据传输进入 Tablestore。
需要部署两部分内容,首先部署 canal.deployer,deployer 负责从上游拉取 binlog 数据,记录位点等。然后再部署 canal.adapter 包,这个服务负责对接 deployer 解析过的数据,并且将数据传输到下游数据库中,在本文中即 Tablestore 数据库。链路如图。
Deployer部署
部署步骤
首先部署 canal.deployer 包,这个包里面不存在任何专门为 Tablestore 定制的 jar 包或者配置,因此部署方式也和使用 Canal 对接 MySQL 数据库时的部署方式相同。部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart。大致步骤如下:
- 开启 MySQL binlog 功能,并且配置 binlog-format 为 ROW 模式。
- 将包解压,可以看到项目路径下的 bin、conf、plugin、lib文件夹
- 修改 conf 路径下的配置文件
- 通过 bin 路径下的脚本启动项目
配置说明
对部署步骤中的第 4 步做一个更具体的说明。无论下游目标库为 MySQL 还是 Tablestore,canal.deployer 部分的配置文件配置方式并没有什么区别。可以参考 Canal 官方文档 QuickStart。本文再进行下更细致的说明。
canal.properties配置
需要关注的配置文件有两处,第一处为 conf 路径下的 canal.properties。在 canal.properties 中,其余配置使用默认项即可,只需要修改
canal.destinations = #{destinations} // 默认值为example,填入给当前canal实例的命名即可
instance.properties配置
另一处需要关注的配置为 conf 路径下的#{destinations}/instance.properties,destinations为当前 canal 实例名,即 canal.properties 配置文件中的 canal.destinations 字段。假设 canal.destinations 的值为 test_ots,那么需要在conf 路径下创建 test_ots 文件夹,并且将 conf/example/instance.properties 该文件复制到 conf/test_ots/ 路径下。然后修改该配置文件。
instance.properties 中需要关注的参数如下。
参数 |
示例值 |
说明 |
canal.instance.master.address |
host:port |
数据库域名端口 |
canal.instance.rds.accesskey |
*** |
本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。 |
canal.instance.rds.secretkey |
*** |
本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。 |
canal.instance.rds.instanceId |
rm-bp15p0713f7z6 |
本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。 |
canal.instance.dbUsername |
*** |
数据库账号用户名 |
canal.instance.dbPassword |
*** |
数据库账号密码 |
canal.instance.filter.regex |
.*\\..* |
Canal 实例关注的表。通过正则表达式匹配。 这里匹配所有库下的所有表 |
canal.destinations |
test_ots |
canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots |
ClientAdapter部署
部署步骤
常规步骤
canal.adapter的部署方式与常规部署对接下游 MySQL 时略有不同。详细的部署步骤见 ClientAdapter。这里也做一下针对 Tablestore 的详细说明。大致步骤:
- 将包解压后,可以看到项目路径下的 bin、conf、plugin、lib文件夹。
- 配置 conf 路径下的配置文件。
- 然后就可以通过 bin 路径下的启动脚本启动。
在第 2 步中,在plugin路径下,如果存在名字以 client-adapter.tablestore 开头的 jar 包,说明部署包中已经包含了 Canal 对接 Tablestore 部分的适配器代码。可以直接使用该部署包。如果不存在名字以 client-adapter.tablestore 开头的 jar 包,则需要使用额外提供的部署包进行部署。
异常处理
若plugin路径下不存在以 client-adapter.tablestore 开头的 jar 包。需要执行以下步骤
- 执行部署步骤中的 2、3、4 步,此时可以在 plugin 路径下看到名字以 client-adapter.tablestore 开头的 jar 包。
配置说明
部署步骤中的第 4 步修改配置文件,需要关心两处配置文件修改。
application.yml配置
首先是配置文件 conf/application.yml。这个文件中的配置项说明可以参考官方文档 ClientAdapter。另外,在 Tablestore 对接中存在一些独有的配置项,在下表中进行了说明。
参数 |
示例值 |
说明 |
是否必填 |
canal.conf: canalAdapters:instance |
test_ots |
与Depolyer中的destinations保持一致 |
是 |
canal.conf: canalAdapters:outerAdapters: -name: |
tablestore |
定义适配器类型,填入 tablestore,说明此适配器下游写入 Tablestore 库。 |
是 |
canal.conf: canalAdapters:outerAdapters: properties:tablestore.endpoint |
Tablestore endpoint |
是 |
|
canal.conf: canalAdapters:outerAdapters: properties:tablestore.accessSecretId |
**** |
AccessSecretId |
是 |
canal.conf: canalAdapters:outerAdapters: properties:tablestore.accessSecretKey |
**** |
AccessSecretKey |
是 |
canal.conf: canalAdapters:outerAdapters: properties:tablestore.instanceName |
test-2009 |
Tablestore 中的 InstanceName |
是 |
canal.conf: terminateOnException |
true |
默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理 |
否 |
完整 application.yml 配置如下
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 3
timeout:
accessKey:
secretKey:
terminateOnException: true
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://rm-bp15po.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
username: ****
password: ****
canalAdapters:
- instance: test_ots # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: tablestore
key: ts
properties:
tablestore.endpoint: https://test-2009.cn-hangzhou.ots.aliyuncs.com
tablestore.accessSecretId: ****
tablestore.accessSecretKey: ****
tablestore.instanceName: test-2009
conf/tablestore路径下配置
然后需要关注 conf/tablestore 路径下的配置文件。如果没有该路径,需要手动创建 conf/tablestore 这个路径。
在 conf/tablestore 路径下,创建一个 yml 文件,文件名自定,填入以下格式的内容。然后根据自己的项目配置修改配置文件。
dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
database: test_ots
table: order_contract_canal
targetTable: canal_target_order
targetPk:
oId: oId
targetColumns:
oId:
create_time: createTime$string
pay_time: $string
update_time: updateTime
etlCondition:
commitBatch: 200 # 批量提交的大小
其中各参数含义见表。
参数 |
说明 |
是否必填 |
dataSourceKey |
该任务的源数据库标识,在 application.yml 中 srcDataSources 下可以找到该标识对应的数据库。 |
是 |
destination |
canal 实例名,与 application.yml 下的 canal.conf: canalAdapters:instance参数相同 |
是 |
groupId |
分组 id,MQ 模式下使用,这里不关心,配置成 application.yml 中 canalAdapters 中相同即可。 |
是 |
outerAdapterKey |
使用的 Adapter 标识,应与 application.yml 中 outerAdapters 下的 key 值相同。 |
是 |
threads |
筒数量,默认为 1,对应 tablestorewriter 中的 bucket 数量。 |
是 |
dbMapping.database |
源库名 |
是 |
dbMapping.table |
源表名 |
是 |
dbMapping.targetTable |
目标表 |
是 |
dbMapping.targetPk |
主键配置,格式如下: id: target_id 源表主键:目标表主键。多主键可以配置多个,多主键配置顺序需要与 Tablestore 中的主键顺序相同。 |
是 |
dbMapping.targetColumns |
配置需要同步的列名,以及列映射,可以配置类型转换。有如下 4 种格式。 id: target_id$string,表示id字段同步后为target_id字段,且类型映射为 string; id: target_id,表示id字段同步后为 target_id 字段; id: ,表示 id 字段同步前后字段名不变,字段类型采用默认映射; id: $string 功能等同于 id: id$string 需要注意,在 targetPk 中配置过的主键也需要在这里再配置一次。 |
是 |
dbMapping.etlCondition |
全量抽取数据时的过滤条件 |
否 |
dbMapping.commitBatch |
一次批量 RPC 请求导入的行数,对应 tablestorewriter 中的 maxBatchRowsCount,默认取 writerConfig 中的默认值200 |
否 |
updateChangeColumns |
行覆盖或行更新。 默认为 false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为 true,为行更新,即记录更新时,只对变化的字段进行操作。 |
否 |
Canal 支持源表到目标 Tablestore 表的字段名映射以及字段类型映射(dbMapping.targetColumns 字段中的映射配置)。可以作为目标类型配置在 $ 后面的有
配置项(大小写不敏感) |
目标类型 |
string |
string |
int,integer |
int |
bool,boolean |
bool |
binary |
binary |
double,float,decimal |
double |
若不配置映射,会根据原始字段类型推断目标字段类型。