开源 ETL 工具 DataX 实践,从mysql 到不同结构的另一个mysql的全量同步和批量更新
链接: datax官方项目地址 查看全量同步 查看批量更新
实践步骤:
参照官方文档,采用方法一部署
如果点击下载没反应,手动复制地址,把http换成https
下载解压完成,运行自检脚本
File “datax.py”, line 114 print readerRef 。因为我电脑安装的是python3 ,脚本里是python2语法
修改下 datax.py 中 114行后面的print print xx 改为 print(xx)
try except Exception, e: 改为 try except Exception as e
修改完datax.py后再次运行
成功!
参考官方文档,编写一个从一个mysql到另一个mysql的全量同步
先准备数据库
1、建两张表 datax_src和datax_target 表 ,就两个字段,其中第二个字段名称不一样 ,案列演示 从datax_src读取数据,写入到datax_target中
-- datax_src 表 字段 id、src_name
CREATE TABLE `datax_src` (
`id` bigint NOT NULL,
`src_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- datax_target 表 字段 id、target_name
CREATE TABLE `datax_target` (
`id` bigint NOT NULL,
`target_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
2、创建一个函数,向datax_src插入5000条数据
CREATE DEFINER=`root`@`%` FUNCTION `AUTO_INSERT`() RETURNS int
DETERMINISTIC
BEGIN
DECLARE index_num int DEFAULT 0;
WHILE index_num < 5000
DO
SET index_num = index_num + 1;
INSERT INTO datax_src VALUES (index_num,CONCAT('name',index_num));
END WHILE;
RETURN 0;
END
编写job json文件
按照 文档中的 mysqlreader 、mysqlwriter 的示例修改
1、修改mysqlreader的示例 从datax_src同步抽取数据到本地:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"table": [
"datax_src"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
将json文件放在job文件夹中,运行
抛异常了:
DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确
可是密码明明是正确的。然后想着修改json文件换了一个数据库,再执行,成功了!
发现区别就是mysql的版本不一样 mysql5.0.27的执行成功了, 失败这个库是mysql8.0.22。
所以看了下datax目录结构,发现连接器位置如下图。 找了下连接器jar包,把reader和writer文件夹中的mysql-connector5换成了8(这里只截图了reader)
再次执行json文件job:成功执行
2、修改mysqlwriter的示例 ,结合mysqlreader ,把读写合并,
完整的 json文件如下 ,具体参数的含义。官方文档中有详细介绍
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"table": [
"datax_src"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "1131310577",
"column": [
"id",
"target_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from target_name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"datax_target"
]
}
]
}
}
}
]
}
}
执行!成功!!
查看 datax_target 表: 数据已成功写入
以上是全表数据同步
参考官方文档,再编写一个从一个mysql到另一个mysql的批量更新
下面通过自定义querySql实现一下部分更新的操作
1、更新datax_src表的部分数据,更新id小于10的9条数据
datax 的 job 的 json文件如下: 加了一个 querySql 参数,并把 writer 中的 writeMode 换成 update
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"querySql": [
"select id,src_name from datax_src where id < 10;"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "update",
"username": "root",
"password": "1131310577",
"column": [
"id",
"target_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from datax_target"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"datax_target"
]
}
]
}
}
}
]
}
}
执行,成功
查看下datax_target中的数据:也已被成功更新