环境准备
主机名 | IP地址 | 部署服务 |
---|---|---|
study01 | 10.186.65.68 | DTLE、MySQL |
study02 | 10.186.65.71 | DTLE、MySQL |
study03 | 10.186.65.72 | DTLE、MySQL |
Ps:如果不适用容器进行部署,首先安装三台或以上MySQL实例,并开启binlog以及GTID
DTLE概述
dtle (Data-Transformation-le) 是上海爱可⽣信息技术股份有限公司 开发并开源的 CDC ⼯具. 其功能特点是:
-
多种数据传输模式
- ⽀持链路压缩
- ⽀持同构传输和异构传输
- ⽀持跨⽹络边际的传输
-
多种数据处理模式
- ⽀持库/表/⾏级别 数据过滤
-
多种数据通道模式
- ⽀持多对多的数据传输
- ⽀持回环传输
-
多种源/⽬标端
- ⽀持MySQL - MySQL的数据传输
- ⽀持MySQL - Kafka的数据传输
-
集群模式
- 提供可靠的元数据存储
- 可进⾏⾃动任务分配
- ⽀持⾃动故障转移
单向复制/聚合/分散
DTLE支持的常见复制场景如下
-
按数据源/数据目标划分
- 支持1个源端到一个目标端的复制
- 支持多个源端到一个目标端的聚合复制
- 支持一个源端到多个目标的拆分复制
-
按网络类型划分
- 支持网络内部数据传输
- 支持跨网络的数据传输(可使⽤ 链路压缩/链路限流 等功能)
-
按集群规模划分
- 可配置单⼀dtle实例处理单⼀数据通道
- 可配置 dtle集群 处理 多个数据通道
DTLE单项复制
- 下载DTLE的RPM安装包
https://github.com/actiontech/dtle/releases/tag/v3.21.03.0
- 安装RPM包
[root@study01 ~]# rpm -ivh dtle-3.21.03.0.x86_64.rpm --prefix=/data/dtle
- 修改consul配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
[root@study01 dtle]# pwd
/data/dtle/etc/dtle
[root@study01 dtle]# cat consul.hcl
# Rename for each node
node_name = "consul0" #定义consul名称,多集群名字不能重复
data_dir = "/data/dtle/var/lib/consul"
ui = true
disable_update_check = true
# Address that should be bound to for internal cluster communications
bind_addr = "0.0.0.0"
# Address to which Consul will bind client interfaces, including the HTTP and DNS servers
client_addr = "10.186.65.68" #本机ip地址
advertise_addr = "10.186.65.68" #本机ip地址
ports = {
# Customize if necessary. -1 means disable.
#dns = -1
#server = 8300
#http = 8500
#serf_wan = -1
#serf_lan = 8301
}
limits = {
http_max_conns_per_client = 4096
}
server = true
# For single node
bootstrap_expect = 1 #一台consul
# For 3-node cluster
#bootstrap_expect = 3 #三台consul,部署集群开启
#retry_join = ["127.0.0.1", "127.0.0.2", "127.0.0.3"] # will use default serf port
log_level = "INFO"
log_file = "/data/dtle/var/log/consul/"
- 修改nomad配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
root@study01 dtle]# ls
consul.hcl nomad.hcl
[root@study01 dtle]# pwd
/data/dtle/etc/dtle
[root@study01 dtle]# cat nomad.hcl
name = "nomad0" # rename for each node 定义nomad名字,部署集群名字不可重复
datacenter = "dc1"
data_dir = "/data/dtle/var/lib/nomad"
plugin_dir = "/data/dtle/usr/share/dtle/nomad-plugin"
log_level = "Info"
log_file = "/data/dtle/var/log/nomad/"
disable_update_check = true
bind_addr = "0.0.0.0" #监听地址
# change ports if multiple nodes run on a same machine
ports {
http = 4646
rpc = 4647
serf = 4648
}
addresses {
# Default to `bind_addr`. Or set individually here.
#http = "127.0.0.1"
#rpc = "127.0.0.1"
#serf = "127.0.0.1"
}
advertise {
http = "10.186.65.68:4646"
rpc = "10.186.65.68:4647"
serf = "10.186.65.68:4648"
}
server {
enabled = true #服务端开启
bootstrap_expect = 1
# Set bootstrap_expect to 3 for multiple (high-availablity) nodes.
# Multiple nomad nodes will join with consul.
}
client {
enabled = true
options = {
"driver.blacklist" = "docker,exec,java,mock,qemu,rawexec,rkt"
}
# Will auto join other server with consul.
}
consul {
# dtle-plugin and nomad itself use consul separately.
# nomad uses consul for server_auto_join and client_auto_join.
# Only one consul can be set here. Write the nearest here,
# e.g. the one runs on the same machine with the nomad server.
address = "10.186.65.68:8500" #客户端开启
}
plugin "dtle" {
config {
log_level = "Info" # repeat nomad log level here
data_dir = "/data/dtle/var/lib/nomad"
nats_bind = "10.186.65.68:8193"
nats_advertise = "10.186.65.68:8193"
# Repeat the consul address above.
consul = "10.186.65.68:8500" #对接consul地址
# By default, API compatibility layer is disabled.
#api_addr = "127.0.0.1:8190" # for compatibility API
nomad_addr = "10.186.65.68:4646" # compatibility API need to access a nomad server
publish_metrics = false
stats_collection_interval = 15
}
}
- 启动nomad以及consul服务
systemctl restart dtle-consul.service
systemctl restart dtle-consul.service
[root@study01 dtle]# ps -ef | grep dtle
dtle 8580 1 1 14:21 ? 00:00:00 /data/dtle/usr/bin/consul agent -config-file=/data/dtle/etc/dtle/consul.hcl
dtle 8660 1 10 14:22 ? 00:00:00 /data/dtle/usr/bin/nomad agent -config /data/dtle/etc/dtle/nomad.hcl
root 8720 2787 0 14:22 pts/0 00:00:00 grep --color=auto dtle
-
登录nomad和consul的Web页面进行查看
- nomad,Web界面地址:http://IP:4646
- consul,Web界面地址:http://IP:8500
-
在源端MySQL实例创建测试库和测试表,并在表中插入数据
mysql> create database test;
mysql> use test;
mysql> create table t1 (id int primary key);
mysql> insert into t1 values(1),(2),(3);
mysql> select * from t1;
- 创建job的json文件
[root@study01 dtle]# cat job.json
{
"Job": {
"ID": "job1", #job名字
"Datacenters": ["dc1"],
"TaskGroups": [{
"Name": "src",
"Tasks": [{
"Name": "src",
"Driver": "dtle",
"Config": {
"Gtid": "",
"ReplicateDoDb": [{
"TableSchema": "test", #复制的库
"Tables": [{
"TableName": "t1" #复制的表
}]
}],
"ConnectionConfig": {
"Host": "10.186.65.68", #源端连接信息
"Port": 3333,
"User": "test",
"Password": "test"
}
}
}]
}, {
"Name": "dest",
"Tasks": [{
"Name": "dest",
"Driver": "dtle",
"Config": {
"ConnectionConfig": {
"Host": "10.186.65.72", #目标端连接信息
"Port": 4444,
"User": "test",
"Password": "test"
}
}
}]
}]
}
}
- 利用curl命令调用nomad客户端的接口,创建这个job
[root@study01 dtle]# curl -XPOST "http://10.186.65.68:4646/v1/jobs" -d @job.json -s | jq
{
"EvalID": "a551d0fc-5de9-9e7b-3673-21b115919646",
"EvalCreateIndex": 151,
"JobModifyIndex": 150,
"Warnings": "",
"Index": 151,
"LastContact": 0,
"KnownLeader": false
}
- -d @指定的是job的json文件的路径,我是在当前路径下执行所以不需要指定绝对路径
- jq命令是用于提取返回结果的内容,类似与linux命令grep,需要安装才可以使用
- 查看创建job的状态
[root@study01 dtle]# curl -XGET "http://10.186.65.68:4646/v1/job/job1" -s | jq '.Status'
- /v1/job/{创建job的ID名字}
- 进入到目标端数据库进行查看数据是否已经复制过去
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| dtle | #有一张二进制的表。用于记录job的GTID号
| information_schema |
| mysql |
| performance_schema |
| sys |
| test | #同步过来的库
| universe |
+--------------------+
mysql> select * from test.t1;
+----+
| id |
+----+
| 1 |
| 2 |
| 3 |
+----+
- 在源端数据库在插入几条数据,在进行测试
mysql> use test;
mysql> insert into t1 values(7),(8),(9);
此时我们在到目标端数据库查看数据是否有增量数据进来
mysql> select * from test.t1;
+----+
| id |
+----+
| 1 |
| 2 |
| 3 |
| 7 |
| 8 |
| 9 |
+----+
HTTP API、nomad 命令⾏⼯具
nomad和consul之间的关系
nomad 本体使⽤consul进⾏多节点注册和发现,dtle nomad 插件使⽤consul进⾏任务元数据储存,也就是说,当我们的nomad agent创建一个job的时候会通过consul进行记录元数据,以及复制位置点,如果我们要删除一个job的时候,不单单只是把job给删除,还需要删除consul中的记录,如果不删除consul中的记录,那么下次创建job的时候默认还是会从consul中记录的位置点开始复制。
HTTP API
实际上我们使用curl命令调用的是nomad agent端的HTTP端的接口,将我们本地的job.json文件提交到nomad agent端
nomad命令行工具使用
nomad工具启动一个job使用的是hcl格式的文件,不可以使用json格式,模板默认放在/usr/share/dtle/scripts下面
- 先使用curl的方式删除job
[root@study01 dtle]# curl -XDELETE 10.186.65.68:4646/v1/job/job1?purge=true
{"EvalID":"c3e69b56-eaab-fa02-afbb-a15e0b395170","EvalCreateIndex":198,"JobModifyIndex":197,"VolumeEvalID":"","VolumeEvalIndex":0,"Index":198,"LastContact":0,"KnownLeader":false}
- 删除job后如果想清空consul记录使用以下命令
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
true
-
使用nomad命令创建一个job
- 复制一份hcl的模板,进行修改
cp usr/share/dtle/scripts/example.job.hcl .
- 修改hcl文件
[root@study01 dtle]# mv example.job.hcl job.hcl [root@study01 dtle]# vim job.hcl #修改复制\源\目标信息
-
使用nomad命令指定这个文件创建job
[root@study01 dtle]# ./usr/bin/nomad job run job.hcl
==> Monitoring evaluation "21c79d55"
Evaluation triggered by job "job1"
Evaluation within deployment: "c5aeae1d"
Allocation "8640b0af" created: node "e1be240c", group "dest"
Allocation "e44c8d29" created: node "e1be240c", group "src"
Evaluation status changed: "pending" -> "complete"
==> Evaluation "21c79d55" finished with status "complete"
- 查询我们刚才创建job的状态
[root@study01 dtle]# ./usr/bin/nomad job status
ID Type Priority Status Submit Date
job1 service 50 running 2021-05-07T16:26:21+08:00
- 在源端插入数据,在目标端验证数据
- 停止job并删除
[root@study01 dtle]# ./usr/bin/nomad job stop -purge job1
==> Monitoring evaluation "dce8f4f3"
Evaluation triggered by job "job1"
Evaluation within deployment: "c5aeae1d"
Evaluation status changed: "pending" -> "complete"
==> Evaluation "dce8f4f3" finished with status "complete"
- 依然需要手动删除consul记录
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
nomad其他命令介绍
- 查看server节点
[root@study01 dtle]# ./usr/bin/nomad server members
Name Address Port Status Leader Protocol Build Datacenter Region
nomad0.global 10.186.65.68 4648 alive true 2 0.11.1 dc1 global
- 查看client节点
[root@study01 dtle]# ./usr/bin/nomad node status
ID DC Name Class Drain Eligibility Status
e1be240c dc1 nomad0 <none> false eligible ready
- 查看某个job
[root@study01 dtle]# ./usr/bin/nomad job status {jobname}
- 查看nomad版本
[root@study01 dtle]# ./usr/bin/nomad version
Nomad v0.11.1 (b43457070037800fcc8442c8ff095ff4005dab33)
- 查看某⼀节点的dtle插件版本
[root@study01 dtle]# ./usr/bin/nomad node status -verbose e1be240c | grep dtle
dtle true true Healthy 2021-05-07T14:22:17+08:00
driver.dtle = 1
driver.dtle.full_version = 3.21.03.0-3.21.03.x-2df8ad7
driver.dtle.version = 3.21.03.0
- 此时nomad命令作为HTTP客⼾端连接nomad agent, 如果agent不在默认地址,则需要指定 例如:--address=http://127.0.0.1:4646
[root@study01 dtle]# ./usr/bin/nomad node status --address=http://10.186.65.68:4646
ID DC Name Class Drain Eligibility Status
e1be240c dc1 nomad0 <none> false eligible ready
MySQL汇聚复制
准备两个源端MySQL一个目标端MySQL,先记录GTID,再创建测试库和测试表
- src1数据库操作
mysql> show master status\G
Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-140
mysql> use test;
mysql> create table t2(id int);
mysql> insert into t2 values(1),(2),(3);
mysql> select * from test.t2;
+------+
| id |
+------+
| 1 |
| 2 |
| 3 |
+------+
- src2数据库操作
mysql> show master status \G
Executed_Gtid_Set: efc79686-af16-11eb-bbaa-02000aba4147:1-135
mysql> use test;
mysql> create table t2(id int);
mysql> insert into t2 values(4),(5),(6);
- 创建src1的配置文件
[root@study01 dtle]# cat job_src1.hcl
job "job1" {
datacenters = ["dc1"]
group "src" {
task "src" {
driver = "dtle"
config {
ReplicateDoDb = [{
TableSchema = "test"
Tables = [{
TableName = "t2"
}]
}]
DropTableIfExists = false
Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-140"
ChunkSize = 2000
ConnectionConfig = {
Host = "10.186.65.68"
Port = 3333
User = "test"
Password = "test"
}
}
}
}
group "dest" {
task "dest" {
driver = "dtle"
config {
ConnectionConfig = {
Host = "10.186.65.72"
Port = 4444
User = "test"
Password = "test"
}
# For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
#KafkaConfig = {
# Topic = "kafka1"
# Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
# Converter = "json"
#}
}
}
}
reschedule {
# By default, nomad will unlimitedly reschedule a failed task.
# We limit it to once per 30min here.
attempts = 1
interval = "30m"
unlimited = false
}
}
- 创建src2的配置文件
[root@study01 dtle]# cat job_src2.hcl
job "job2" {
datacenters = ["dc1"]
group "src" {
task "src" {
driver = "dtle"
config {
ReplicateDoDb = [{
TableSchema = "test"
Tables = [{
TableName = "t2"
}]
}]
DropTableIfExists = false
Gtid = "efc79686-af16-11eb-bbaa-02000aba4147:1-135"
ChunkSize = 2000
ConnectionConfig = {
Host = "10.186.65.71"
Port = 5555
User = "test"
Password = "test"
}
}
}
}
group "dest" {
task "dest" {
driver = "dtle"
config {
ConnectionConfig = {
Host = "10.186.65.72"
Port = 4444
User = "test"
Password = "test"
}
# For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
#KafkaConfig = {
# Topic = "kafka1"
# Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
# Converter = "json"
#}
}
}
}
reschedule {
# By default, nomad will unlimitedly reschedule a failed task.
# We limit it to once per 30min here.
attempts = 1
interval = "30m"
unlimited = false
}
}
- 两个文件不同的地方有,job名称,GTID,源端连接地址
- 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_src1.hcl
==> Monitoring evaluation "28ae70e4"
Evaluation triggered by job "job1"
Evaluation within deployment: "95f9a76b"
Allocation "d07787a4" created: node "e1be240c", group "dest"
Allocation "f46d9291" created: node "e1be240c", group "src"
Evaluation status changed: "pending" -> "complete"
==> Evaluation "28ae70e4" finished with status "complete"
[root@study01 dtle]# ./usr/bin/nomad job run job_src2.hcl
==> Monitoring evaluation "1d2da1f7"
Evaluation triggered by job "job2"
Evaluation within deployment: "716b11a5"
Allocation "2dd7fe5c" created: node "e1be240c", group "src"
Allocation "724ec296" created: node "e1be240c", group "dest"
Evaluation status changed: "pending" -> "complete"
==> Evaluation "1d2da1f7" finished with status "complete"
- 目标端数据库验证数据
mysql> select * from test.t2;
+------+
| id |
+------+
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
+------+
- 在任意一个源端写入数据,然后到目标端查看是否增量数据
mysql> insert into t2 values (7);
Query OK, 1 row affected (0.01 sec)
mysql> select * from test.t2;
+------+
| id |
+------+
| 1 |
| 2 |
| 3 |
| 7 |
+------+
4 rows in set (0.01 sec)
- 验证目标端数据库的数据
mysql> select * from test.t2;
+------+
| id |
+------+
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
+------+
7 rows in set (0.01 sec)
MySQL分散复制
准备三台MySQL实例,一台源端,两台目标端。根据条件,分别复制到不同的目标端数据库
- 首先记录下源端数据库当前的GTID
mysql> show master status \G
Executed_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-143
- 创建测试表,写入一些数据
mysql> use test;
mysql> create table t3(id int);
mysql> insert into t3 values(1),(2);
- 创建dst1配置文件
[root@study01 dtle]# cat job_dst1.hcl
job "dst1" {
datacenters = ["dc1"]
group "src" {
task "src" {
driver = "dtle"
config {
ReplicateDoDb = [{
TableSchema = "test"
Tables = [{
TableName = "t3"
Where = "id<10"
}]
}]
DropTableIfExists = false
Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-143"
ChunkSize = 2000
ConnectionConfig = {
Host = "10.186.65.68"
Port = 3333
User = "test"
Password = "test"
}
}
}
}
group "dest" {
task "dest" {
driver = "dtle"
config {
ConnectionConfig = {
Host = "10.186.65.71"
Port = 5555
User = "test"
Password = "test"
}
# For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.
#KafkaConfig = {
# Topic = "kafka1"
# Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]
# Converter = "json"
#}
}
}
}
reschedule {
# By default, nomad will unlimitedly reschedule a failed task.
# We limit it to once per 30min here.
attempts = 1
interval = "30m"
unlimited = false
}
}
- 创建dst2配置文件
两个配置文件不同的地方有,job名字,where条件,以及目标端的连接信息
- 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_dst1.hcl
[root@study01 dtle]# ./usr/bin/nomad job run job_dst2.hcl
[root@study01 dtle]# ./usr/bin/nomad status
ID Type Priority Status Submit Date
dst1 service 50 running 2021-05-07T20:13:05+08:00
dst2 service 50 running 2021-05-07T20:13:10+08:00
-
验证数据,小于10的会在71这台数据库,大于等于10的会在72这台数据库
-
在源端插入数据,然后分别到目标端进行查看
mysql> insert into t3 values(9),(10);
mysql> select * from test.t3;
+------+
| id |
+------+
| 1 |
| 2 |
| 9 |
| 10 |
+------+
- 到71上查看数据
mysql> select * from test.t3;
+------+
| id |
+------+
| 1 |
| 2 |
| 9 |
+------+
- 到72上查看数据
mysql> select * from test.t3;
+------+
| id |
+------+
| 10 |
+------+