最近在项目中需要使用到该工具进行增量数据的同步。
部署环境
按照手册中的要求,使用了三台服务器,用于部署Zookeeper和Kafka,这三台服务器的ip为192.168.2.201,192.168.2.202,192.168.2.203。RTSync可以与Zookeeper和Kafka共用一台服务器,因此部署在201上。
环境及工具安装
-
安装Zookeeper,将Zookeeper安装到三台服务器上,可参考网上的教程;
-
安装Kafka,将Kafka安装的三台服务器上,可参考网上的教程;
-
将RTSync安装到201的/opt/目录下,目录为/opt/RTSync。
工具配置
- 首先配置RTSync使用的Zookeeper和Kafka,按照手册中的描述,修改config_kafka_ora.properties文件中的三个属性:
bootstrap.servers=192.168. 2.201 :9092,192.168. 2.20 2:9092,192.168. 2.20 3:9092
zookeeper.connect= 192.168. 2.201 :2181,192.168. 2.20 2:2181,192.168. 2.20 3:2181
topic.name=test01
- 然后配置RTSync的配置文件config_task.xml
<server syncMode = "increment" dataFormatType = "PUREDATA" id = "server1" mqType = "kafka" queueName = "ora" isHighAvailable = "false" dataRecoveryMode = "file" >
<manager ip = "192.168.2.201" port = "9432" heartbeatPort = "9000" httpPort = "8080" isTableHotPatch = "true" />
<source ip = "192.168.2.201" path = "/opt/RTSync" readParseAdapter = "adapter" user = "root" password = " *** " queueSize = "10000" openMonitor = "true" monitorInterval = "300" rpcPort = "9191" sshPort = "22" dbObjToUpperCase = "false" isConvertSingleQuote = "true" queuePollTimeOut = "600" isEmptyStrPkEqualsNull = "true" isAllowInsertPkNull = "true" />
<target ip = "192.168.2.201" path = "/opt/RTSync" writeDataAdapter = "adapter" user = "root" password = " *** " errorishandle = "true" sendDataBySocket = "true" />
<mappings>
<source-target id = "sync1" >
<db>
<sourcedb
charset = "UTF8"
type = "ORACLE"
startLSN = "0"
fetchSize = "500"
oracleScnStep = "50000"
timestampWithFraction = "false"
maxRecordsPerRead = "200"
maxSizeOfPerRecord = "1024"
timeOut = "2"
driver = "oracle.jdbc.OracleDriver"
url = "jdbc:oracle:thin:test/test@//192.168.2.108:1521/orcl"
user = "TEST"
password = " *** "
catalog = "test"
parallel = "3" >
</sourcedb>
<targetdb
charset = "UTF8"
type = "GCLUSTER"
commitSize = "10000"
queueSize = "200000"
user = "gbase"
password = " *** "
driver = "com.gbase.jdbc.Driver"
catalog = "test"
timeOut = "2"
url = "jdbc:gbase://192.168.2.105:5258/test?useOldAliasMetadataBehavior=true & rewriteBatchedStatements=true" >
</targetdb>
<tables isInclude = "true" >
<table deleteMode = "NORMAL" sourceTableName = "T1" sourcePkColName = "" targetTableName = "t1" targetPkColName = "" />
</tables>
</db>
</source-target>
</mappings>
</server>
为了进行测试,建了个测试用户test,在其中建了一张表t1,这里要注意的是Oracle中建表以后默认是大写表名,在这个配置文件中也要写成大写。
启动同步
-
启动Zookeeper和Kafka;
-
启动RTSync,执行sh /opt/RTSync/RTSyncManagerServer.sh start命令;
-
在GBase8a MPP中创建Kafka的消费者,用于消费Kafka中的数据。在8a MPP中执行创建命令:
create kafka consumer test transaction topic test01 brokers ‘ 192.168. 2.201 :9092,192.168. 2.20 2:9092,192.168. 2.20 3:9092 ’;
- 在GBase8a MPP中启动Kafka消费者。在GBase8a MPP中执行启动命令:
start kafka consumer test;
测试同步
现在同步工具已经启动好了,在oracle中插入一条数据,测试一下:
i nsert into t1 values( 1, ‘asdf’);
commit;
然后在GBase8a MPP执行查询语句:
select * from t1;
结果显示:
±-------±-------+
| col1 | col2 |
±-------±-------+
| 1 | asdf |
±-------±-------+
好了,数据已经同步过来了。
总结
总体来说,环境搭建比较简单,主要需要从网上找一下Zookeeper和Kafka的安装教程。比较麻烦的是RTSync的配置项很多,不过好在有手册可以参考。