背景
SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中。
开启cdc的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部分数据。
CDC的使用条件
1.SQL server 2008及以上的企业版、开发版和评估版;
2.需要开启代理服务(作业)。
3.CDC需要业务库之外的额外的磁盘空间。
4.CDC的表需要主键或者唯一主键。
图1:Sqlserver CDC原理
ADB4PG Sink使用条件
- 需要提前使用建表语句,在ADB4PG端建表,系统不会自动创建(如果有需要可以加这部分功能)
- 每张表需要有主键或唯一主键
- 当前支持的数据格式:INTEGER,BIGINT,SMALLINT,NUMERIC,DECIMAL,REAL,DOUBLEPERICISION,BOOLEAN,DATE,TIMESTAMP,VARCHAR
环境准备
SQLServer环境准备
- 已有自建SQLServer或云上RDS实例(示例使用云上RDS SQLServer实例)
- 已有windows环境,并安装SSMS(SQL Server Management Studio),部分命令需要在SSMS执行
SQLServer环境建表
-- 创建源表
create database connect
GO
use connect
GO
create table t1
(
a int NOT NULL PRIMARY KEY,
b BIGINT,
c SMALLINT,
d REAL,
e FLOAT,
f DATETIME,
g VARCHAR
);
-- 开启db级的cdc
exec sp_rds_cdc_enable_db
-- 验证数据库是否开启cdc成功
select * from sys.databases where is_cdc_enabled = 1
-- 对源表开启cdc
exec sp_cdc_enable_table @source_schema='dbo', @source_name='t1', @role_name=null;
ADB4PG端创建目标表
CREATE DATABASE connect;
create table t1
(
a int NOT NULL PRIMARY KEY,
b BIGINT,
c SMALLINT,
d REAL,
e FLOAT,
f TIMESTAMP,
g VARCHAR
);
Kafka环境准备
安装Kafka Server
1. 下载kafka安装包,并解压
SQL Server Source Connect目前只支持2.1.0及以上版本的Kafka Connect,故需要安装高版本kafka,实例使用kfakf-2.11-2.1.0。 http://kafka.apache.org/downloads?spm=a2c4g.11186623.2.19.7dd34587dwy89h#2.1.0
2. 编辑$KAFKA_HOME/config/server.properties
修改以下参数
...
## 为每台broker配置一个唯一的id号
broker.id=0
...
## log存储地址
log.dirs=/home/gaia/kafka_2.11-2.1.0/logs
## kafka集群使用的zk地址
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
...
3. 启动kafka server
bin/kafka-server-start.sh config/server.properties
安装Kafka Connect
1. 修改kafka connect配置文件
修改$KAFKA_HOME/config/connect-distributed.properties
## kafka server地址
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
## 为kafka connector选定一个消费group id
group.id=
## 安装插件的地址,每次kafka connector启动时会动态加载改路径下的jar包,可以将每个插件单独放到一个子路径
plugin.path=
安装需要的kafka-connect插件
1. 将插件jar包放在我们在前面已经配置过的配置的plugin.path路径下
sqlserver-source-connector
oss-sink-connector, 需要使用代码自行编译,注意在pom修改依赖的kafka及scala版本号
adb4pg-jdbc-sink-connector,需要下载以下jar包及对应ADB For PG的JDBC驱动
https://yq.aliyun.com/attachment/download/?spm=a2c4e.11153940.0.0.70ed10daVH6ZQO&id=7282
2. 编辑配置文件
# CDC connector的配置文件 sqlserver-cdc-source.json
▽
{
"name": "sqlserver-cdc-source",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "database hostname",
"database.port" : "1433",
"database.user" : "xxxx",
"database.password" : "xxxxxx",
"database.dbname" : "connect",
"schemas.enable" : "false",
"mode":"incrementing",
"incrementing.column.name":"a",
"database.history.kafka.bootstrap.servers" : "kafka-broker:9092",
"database.history.kafka.topic": "server1.dbo.t1",
"value.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}
# oss sink的配置文件 oss-sink.json
{
"name":"oss-sink",
"config": {
"name":"oss-sink",
"topics":"server1.dbo.testdata",
"connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",
"format.class":"com.aliyun.oss.connect.kafka.format.json.JsonFormat",
"flush.size":"1",
"tasks.max":"4",
"storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"oss.bucket":"traffic-csv",
"partition.duration.ms":"10000",
"path.format":"YYYY-MM-dd-HH",
"locale":"US",
"timezone":"Asia/Shanghai",
"rotate.interval.ms":"30000"
}
}
有关oss sinker更详尽的配置,见文档 https://github.com/aliyun/kafka-connect-oss
## adb4pg-jdbc-sink配置文件
{
"name":"adb4pg-jdbc-sink",
"config": {
"name":"adb4pg-jdbc-sink",
"topics":"server1.dbo.t1",
"connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
"connection.url":"jdbc:postgresql://gp-8vb8xi62lohhh2777o.gpdb.zhangbei.rds.aliyuncs.com:3432/connect",
"connection.user":"xxx",
"connection.password":"xxxxxx",
"col.names":"a,b,c,d,e,f,g",
"col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
"pk.fields":"a",
"target.tablename":"t1",
"tasks.max":"1",
"auto.create":"false",
"table.name.format":"t1",
"batch.size":"1"
}
}
由于OSS sinker使用了hdfs封装的FileSystem,需要将OSS相关的信息维护到$KAFKA_HOME/config/core-site.xml文件中
<configuration>
<property>
<name>fs.oss.endpoint</name>
<value>xxxxxxx</value>
</property>
<property>
<name>fs.oss.accessKeyId</name>
<value>xxxxxxx</value>
</property>
<property>
<name>fs.oss.accessKeySecret</name>
<value>xxxxxxx</value>
</property>
<property>
<name>fs.oss.impl</name>
<value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
</property>
<property>
<name>fs.oss.buffer.dir</name>
<value>/tmp/oss</value>
</property>
<property>
<name>fs.oss.connection.secure.enabled</name>
<value>false</value>
</property>
<property>
<name>fs.oss.connection.maximum</name>
<value>2048</value>
</property>
</configuration>
3. 启动已经配置好的kafka-connector插件
启动及删除connect任务命令
## 启动命令
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-cdc-source.json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-jdbc-sink.json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oss-sink.json
## 删除命令
curl -s -X DELETE http://localhost:8083/connectors/sqlserver-cdc-source
curl -s -X DELETE http://localhost:8083/connectors/adb4pg-jdbc-sink
curl -s -X DELETE http://localhost:8083/connectors/oss-sink
在ADB For PG获取更新数据
SQLServer插入赠/更/删数据记录
insert into t1(a,b,c,d,e,f,g) values(1, 2, 3, 4, 5, convert(datetime,'24-12-19 10:34:09 PM',5), 'h');
在kafka topic获取更新结果
先确认是否生成了kafka-connect所需的topic信息
bin/kafka-topics.sh --zookeeper zk_address --list
如截图,connect-configs, connect-offsets, connect-status为kafka-connect用来存储任务数据更新状态的topic。schema-changes-inventory是维护sqlserver表结构的topic。
可以通过kafka consloe-consumer上获取到的topic信息,以确认cdc数据正确被采集到kafka topic
bin/kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 --topic server1.dbo.t1
在ADB For PG上查询同步过来的数据
注意:因为是不同数据库之间的同步,时区设置的不同可能会导致同步结果产生时区偏移,需要在两侧数据库做好设置。