使用 Canal 向 Tablestore 导入数据

可以使用 Canal 将数据传输进入 Tablestore。

需要部署两部分内容,首先部署 canal.deployer,deployer 负责从上游拉取 binlog 数据,记录位点等。然后再部署 canal.adapter 包,这个服务负责对接 deployer 解析过的数据,并且将数据传输到下游数据库中,在本文中即 Tablestore 数据库。链路如图。

使用 Canal 向 Tablestore 导入数据

Deployer部署

部署步骤

首先部署 canal.deployer 包,这个包里面不存在任何专门为 Tablestore 定制的 jar 包或者配置,因此部署方式也和使用 Canal 对接 MySQL 数据库时的部署方式相同。部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart。大致步骤如下:

  1. 开启 MySQL binlog 功能,并且配置 binlog-format 为 ROW 模式。
  2. 在 Canal 官方 release页,下载 canal.deployer 包。
  3. 将包解压,可以看到项目路径下的 bin、conf、plugin、lib文件夹
  4. 修改 conf 路径下的配置文件
  5. 通过 bin 路径下的脚本启动项目

配置说明

对部署步骤中的第 4 步做一个更具体的说明。无论下游目标库为 MySQL 还是 Tablestore,canal.deployer 部分的配置文件配置方式并没有什么区别。可以参考 Canal 官方文档 QuickStart。本文再进行下更细致的说明。

canal.properties配置

需要关注的配置文件有两处,第一处为 conf 路径下的 canal.properties。在 canal.properties 中,其余配置使用默认项即可,只需要修改 

canal.destinations = #{destinations}   // 默认值为example,填入给当前canal实例的命名即可

instance.properties配置

另一处需要关注的配置为 conf 路径下的#{destinations}/instance.properties,destinations为当前 canal 实例名,即 canal.properties 配置文件中的 canal.destinations 字段。假设 canal.destinations 的值为 test_ots,那么需要在conf 路径下创建 test_ots 文件夹,并且将 conf/example/instance.properties 该文件复制到 conf/test_ots/ 路径下。然后修改该配置文件。

instance.properties 中需要关注的参数如下。

参数

示例值

说明

canal.instance.master.address

host:port

数据库域名端口

canal.instance.rds.accesskey

***

本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。

canal.instance.rds.secretkey

***

本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。

canal.instance.rds.instanceId

rm-bp15p0713f7z6

本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。

canal.instance.dbUsername

***

数据库账号用户名

canal.instance.dbPassword

***

数据库账号密码

canal.instance.filter.regex

.*\\..*

Canal 实例关注的表。通过正则表达式匹配。

这里匹配所有库下的所有表

canal.destinations

test_ots

canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots

ClientAdapter部署

部署步骤

常规步骤

canal.adapter的部署方式与常规部署对接下游 MySQL 时略有不同。详细的部署步骤见 ClientAdapter。这里也做一下针对 Tablestore 的详细说明。大致步骤:

  1. 在 Canal 官方 release页,下载 canal.adapter 包。
  2. 将包解压后,可以看到项目路径下的 bin、conf、plugin、lib文件夹。
  3. 配置 conf 路径下的配置文件。
  4. 然后就可以通过 bin 路径下的启动脚本启动。

在第 2 步中,在plugin路径下,如果存在名字以 client-adapter.tablestore 开头的 jar 包,说明部署包中已经包含了 Canal 对接 Tablestore 部分的适配器代码。可以直接使用该部署包。如果不存在名字以 client-adapter.tablestore 开头的 jar 包,则需要使用额外提供的部署包进行部署。

使用 Canal 向 Tablestore 导入数据

异常处理

若plugin路径下不存在以 client-adapter.tablestore 开头的 jar 包。需要执行以下步骤

  1. 拉取 canal-adapter下的 zip 包。用此 zip 包替代从 release页下载 canal.adapter 包。
  2. 执行部署步骤中的 2、3、4 步,此时可以在 plugin 路径下看到名字以 client-adapter.tablestore 开头的 jar 包。

配置说明

部署步骤中的第 4 步修改配置文件,需要关心两处配置文件修改。

application.yml配置

首先是配置文件 conf/application.yml。这个文件中的配置项说明可以参考官方文档 ClientAdapter。另外,在 Tablestore 对接中存在一些独有的配置项,在下表中进行了说明。

参数

示例值

说明

是否必填

canal.conf:

canalAdapters:instance

test_ots

与Depolyer中的destinations保持一致

canal.conf:

canalAdapters:outerAdapters: 

-name:

tablestore

定义适配器类型,填入 tablestore,说明此适配器下游写入 Tablestore 库。

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.endpoint

https://test-2009.cn-hangzhou.ots.aliyuncs.com

Tablestore endpoint

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretId

****

AccessSecretId

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

****

AccessSecretKey

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.instanceName

test-2009

Tablestore 中的 InstanceName

canal.conf: terminateOnException

true

默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理

完整 application.yml 配置如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  terminateOnException: true
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://rm-bp15po.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
      username: ****
      password: ****
  canalAdapters:
  - instance: test_ots # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: tablestore
        key: ts
        properties:
          tablestore.endpoint: https://test-2009.cn-hangzhou.ots.aliyuncs.com
          tablestore.accessSecretId: ****
          tablestore.accessSecretKey: ****
          tablestore.instanceName: test-2009

conf/tablestore路径下配置

然后需要关注 conf/tablestore 路径下的配置文件。如果没有该路径,需要手动创建 conf/tablestore 这个路径。

在 conf/tablestore 路径下,创建一个 yml 文件,文件名自定,填入以下格式的内容。然后根据自己的项目配置修改配置文件。

dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
  database: test_ots
  table: order_contract_canal
  targetTable: canal_target_order
  targetPk:
    oId: oId
  targetColumns:
    oId:
    create_time: createTime$string
    pay_time: $string
    update_time: updateTime
  etlCondition: 
  commitBatch: 200 # 批量提交的大小

其中各参数含义见表。

参数

说明

是否必填

dataSourceKey

该任务的源数据库标识,在 application.yml 中 srcDataSources 下可以找到该标识对应的数据库。

destination

canal 实例名,与 application.yml 下的

canal.conf: canalAdapters:instance参数相同

groupId

分组 id,MQ 模式下使用,这里不关心,配置成 application.yml 中 canalAdapters 中相同即可。

outerAdapterKey

使用的 Adapter 标识,应与 application.yml 中 outerAdapters 下的 key 值相同。

threads

筒数量,默认为 1,对应 tablestorewriter 中的 bucket 数量。

dbMapping.database

源库名

dbMapping.table

源表名

dbMapping.targetTable

目标表

dbMapping.targetPk

主键配置,格式如下:

id: target_id 源表主键:目标表主键。多主键可以配置多个,多主键配置顺序需要与 Tablestore 中的主键顺序相同。

dbMapping.targetColumns

配置需要同步的列名,以及列映射,可以配置类型转换。有如下 4 种格式。

id: target_id$string,表示id字段同步后为target_id字段,且类型映射为 string;

id: target_id,表示id字段同步后为 target_id 字段;

id: ,表示 id 字段同步前后字段名不变,字段类型采用默认映射;

id: $string 功能等同于 id: id$string

需要注意,在 targetPk 中配置过的主键也需要在这里再配置一次。

dbMapping.etlCondition

全量抽取数据时的过滤条件

dbMapping.commitBatch

一次批量 RPC 请求导入的行数,对应 tablestorewriter 中的 maxBatchRowsCount,默认取 writerConfig 中的默认值200

updateChangeColumns

行覆盖或行更新。

默认为 false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为 true,为行更新,即记录更新时,只对变化的字段进行操作。

Canal 支持源表到目标 Tablestore 表的字段名映射以及字段类型映射(dbMapping.targetColumns 字段中的映射配置)。可以作为目标类型配置在 $ 后面的有

配置项(大小写不敏感)

目标类型

string

string

int,integer

int

bool,boolean

bool

binary

binary

double,float,decimal

double

若不配置映射,会根据原始字段类型推断目标字段类型。

上一篇:Postfix+Dovecot+LAMP+Extmail搭建web邮件系统(三)


下一篇:三个理由告诉你,企业为什么需要大数据