开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

开源 ETL 工具 DataX 实践,从mysql 到不同结构的另一个mysql的全量同步和批量更新

链接: datax官方项目地址 查看全量同步 查看批量更新

实践步骤:

参照官方文档,采用方法一部署

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

如果点击下载没反应,手动复制地址,把http换成https
开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

下载解压完成,运行自检脚本

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

File “datax.py”, line 114 print readerRef 。因为我电脑安装的是python3 ,脚本里是python2语法

修改下 datax.py 中 114行后面的print print xx 改为 print(xx)

try except Exception, e: 改为 try except Exception as e

修改完datax.py后再次运行

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

成功!

参考官方文档,编写一个从一个mysql到另一个mysql的全量同步
先准备数据库

1、建两张表 datax_src和datax_target 表 ,就两个字段,其中第二个字段名称不一样 ,案列演示 从datax_src读取数据,写入到datax_target中

-- datax_src 表  字段 id、src_name
CREATE TABLE `datax_src` (
  `id` bigint NOT NULL,
  `src_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- datax_target 表  字段 id、target_name
CREATE TABLE `datax_target` (
  `id` bigint NOT NULL,
  `target_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

2、创建一个函数,向datax_src插入5000条数据

CREATE DEFINER=`root`@`%` FUNCTION `AUTO_INSERT`() RETURNS int
    DETERMINISTIC
BEGIN
	DECLARE index_num int DEFAULT 0;
		WHILE index_num < 5000 
			DO
				SET index_num = index_num + 1;
				INSERT INTO datax_src VALUES (index_num,CONCAT('name',index_num));
		END WHILE;
	RETURN 0;
END
编写job json文件

按照 文档中的 mysqlreader 、mysqlwriter 的示例修改

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

1、修改mysqlreader的示例 从datax_src同步抽取数据到本地:
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "datax_src"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

将json文件放在job文件夹中,运行

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

抛异常了:

DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确

可是密码明明是正确的。然后想着修改json文件换了一个数据库,再执行,成功了!
发现区别就是mysql的版本不一样 mysql5.0.27的执行成功了, 失败这个库是mysql8.0.22。
所以看了下datax目录结构,发现连接器位置如下图。 找了下连接器jar包,把reader和writer文件夹中的mysql-connector5换成了8(这里只截图了reader)

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

再次执行json文件job:成功执行

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

2、修改mysqlwriter的示例 ,结合mysqlreader ,把读写合并,
完整的 json文件如下 ,具体参数的含义。官方文档中有详细介绍
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "datax_src"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "target_name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from target_name"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "datax_target"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

执行!成功!!

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

查看 datax_target 表: 数据已成功写入

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

以上是全表数据同步

参考官方文档,再编写一个从一个mysql到另一个mysql的批量更新

下面通过自定义querySql实现一下部分更新的操作

1、更新datax_src表的部分数据,更新id小于10的9条数据

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

datax 的 job 的 json文件如下: 加了一个 querySql 参数,并把 writer 中的 writeMode 换成 update

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
								 "querySql": [
                                    "select id,src_name from datax_src where id < 10;"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "target_name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from datax_target"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "datax_target"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

执行,成功

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

查看下datax_target中的数据:也已被成功更新

开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新

上一篇:netcore Datax Web项目 docker打包


下一篇:datax(21):编写自己的Transformer