使用DTS同步MySQL增量数据到Tablestore

摘要

从MySQL到Tablestore的全量数据导出可以参考同系列文章《数据同步-从MySQL到Tablestore》,本文主要介绍将MySQL的增量数据同步到Tablestore的一种方式——使用阿里集团的数据传输服务DTS的数据订阅功能做增量数据的读取以及改写。
注意:DTS数据订阅服务支持多种数据库环境,老版现不支持MySQL8.0,使用sdk进行消费;新版新增了分组消费概念,需要使用Kafka客户端消费订阅数据。本文以RDS(MySQL 5.7)订阅为例,使用sdk完成增量数据订阅与改写。

原理介绍

使用DTS同步MySQL增量数据到Tablestore

导出步骤

1.源、目的数据库资源

源数据库:

RDS(新建实例)/实例[pingsheng]/数据库[pingstest]/表[to_tablestore]
数据表结构如图
使用DTS同步MySQL增量数据到Tablestore

目的数据库:

Tablestore(新建实例)/实例[pingsheng]/表[from_rds]
数据表结构如图
使用DTS同步MySQL增量数据到Tablestore

2.云账号资源

准备具有源、目的数据库读写权限的一组云账号AK

3.DTS数据订阅

创建订阅通道参考,选择上述源数据库实例为数据源配置订阅信息
使用DTS同步MySQL增量数据到Tablestore
选择需要订阅的数据表
使用DTS同步MySQL增量数据到Tablestore
通过数据源预检查后,数据订阅配置完成,进入初始化阶段大约需要等待十分钟。初始化完成后,数据订阅状态变为“正常”即可以开始消费增量数据。增量数据的消费点从界面可以看到,支持动态调整参考文档
使用DTS同步MySQL增量数据到Tablestore
从控制台的“订阅数据”可以看到已经拉取到的部分展示数据
使用DTS同步MySQL增量数据到Tablestore
从DTS拉取到的增量数据是经过解析和再封装的,增添了一些解释参数,订阅数据的各字段含义参考

4.订阅数据的解析与改写

从DTS读取MySQL增量数据

下载DTS的SDK,在本地(ECS)进行编译,参考
使用DTS同步MySQL增量数据到Tablestore
在数据订阅“更多”中下载示例代码,替换掉AK信息、订阅ID,编译启动程序尝试获取增量数据,测试rds数据表中若无增量,会每隔1s收到一条“heartbeat”心跳记录
使用DTS同步MySQL增量数据到Tablestore
尝试在源数据表insert、update数据,会打印出以Opt:begin开头,包含Opt:insert、update,以Opt:commit结尾的多行数据。修改代码仅保留改写数据需要的操作类型“Opt”和行信息的前后镜像“FieldList”

public void notify(List<ClusterMessage> messages) throws Exception {
  for (ClusterMessage message : messages) {
    // debug
    System.out.println(message.getRecord().getOpt());
    System.out.println(message.getRecord().getFieldList());
    //you must call ackAsConsumed when you consume the data
    message.ackAsConsumed();
  }
}

//BEGIN
//[]
//UPDATE
//[Field name: pk1  //依次输出各列的前、后镜像
//Field type: 3
//Field length: 2
//Field value: 83
//,Field name: pk1
//Field type: 3
//Field length: 2
//Field value: 80
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: pk2
//Field type: 3
//Field length: 1
//Field value: 3
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 47
//, Field name: v1
//Field type: 3
//Field length: 2
//Field value: 50
//]
//COMMIT
//[]

将增量数据写入Tablestore

下载Tablestore的SDK ,本地(ECS)进行编译参考
调用单行数据操作,将增、删、改的行写入Tablestore参考

//PutRow
private static void putRow(SyncClient client, String pkValue, MyColumnValue columnvalue) {
    // 构造主键
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();
    RowPutChange rowPutChange = new RowPutChange(TABLE_NAME, primaryKey);
    //加入属性列
    rowPutChange.addColumn(new Column("v1", columnvalue.getv1()));
    rowPutChange.addColumn(new Column("v2", columnvalue.getv2()));
    client.putRow(new PutRowRequest(rowPutChange));
}
//DeleteRow
private static void deleteRow(SyncClient client, String pkValue) {
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();
    RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, primaryKey);
    client.deleteRow(new DeleteRowRequest(rowDeleteChange));
}

注意:涉及主键的Update,需要查分成Delete+Put两步操作
使用DTS同步MySQL增量数据到Tablestore
->
使用DTS同步MySQL增量数据到Tablestore

源码参考

下载

上一篇:C++程序设计-第14周数组上机实践项目


下一篇:CentOS安装Oracle 11g R2(x86_64)