DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能
github地址 https://github.com/alibaba/DataX
在github上找到 Quick Start 部分
https://github.com/alibaba/DataX/blob/master/userGuid.md
1.安装
JDK 1.8.0_151
Python 2.7.18
下载后解压
2.启动
进入bin目录
python datax.py {YOUR_JOB.json}
3.简单输出
(1)创建作业的配置文件(json格式)
stream.json
{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 2, "column": [ { "type": "long", "value": "2" }, { "type": "string", "value": "hello,world-DataX" } ] } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 3 } } } }
(2)启动DataX
python datax.py ./stream.json
输出
…… 2021-02-04 16:58:42.891 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started 2 hello,world-DataX 2 hello,world-DataX 2021-02-04 16:58:42.893 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[2] attemptCount[1] is started 2 hello,world-DataX 2 hello,world-DataX 2 hello,world-DataX 2 hello,world-DataX 2021-02-04 16:58:42.993 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[102]ms 2021-02-04 16:58:42.993 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[1] is successed, used[105]ms 2021-02-04 16:58:42.993 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[2] is successed, used[101]ms 2021-02-04 16:58:42.994 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks. 2021-02-04 16:58:52.887 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 108 bytes | Speed 10B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2021-02-04 16:58:52.888 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks. 2021-02-04 16:58:52.889 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work. 2021-02-04 16:58:52.889 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work. 2021-02-04 16:58:52.889 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. 2021-02-04 16:58:52.891 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: D:\data\datax\hook 2021-02-04 16:58:52.893 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2021-02-04 16:58:52.894 [job-0] INFO JobContainer - PerfTrace not enable! 2021-02-04 16:58:52.894 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 108 bytes | Speed 10B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2021-02-04 16:58:52.896 [job-0] INFO JobContainer - 任务启动时刻 : 2021-02-04 16:58:42 任务结束时刻 : 2021-02-04 16:58:52 任务总计耗时 : 10s 任务平均流量 : 10B/s 记录写入速度 : 0rec/s 读出记录总数 : 6 读写失败总数 : 0
4.获取mysql数据本地打印
mysqltest.json
{ "job": { "setting": { "speed": { "channel":1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "connection": [ { "querySql": [ "select id,name from sys_user limit 3;" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/dream" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": true, "encoding": "UTF-8" } } } ] } }
输出
…… 2021-02-04 17:00:24.237 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,name from sys_user limit 3; ] jdbcUrl:[jdbc:mysql://localhost:3306/dream?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]. 33 abc 1 admin 25 caocao 2021-02-04 17:00:24.291 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[102]ms 2021-02-04 17:00:24.291 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks. 2021-02-04 17:00:34.187 [job-0] INFO StandAloneJobContainerCommunicator - Total 3 records, 19 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2021-02-04 17:00:34.188 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks. 2021-02-04 17:00:34.189 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work. 2021-02-04 17:00:34.189 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work. 2021-02-04 17:00:34.189 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. 2021-02-04 17:00:34.191 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: D:\data\datax\hook 2021-02-04 17:00:34.194 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2021-02-04 17:00:34.194 [job-0] INFO JobContainer - PerfTrace not enable! 2021-02-04 17:00:34.195 [job-0] INFO StandAloneJobContainerCommunicator - Total 3 records, 19 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2021-02-04 17:00:34.196 [job-0] INFO JobContainer - 任务启动时刻 : 2021-02-04 17:00:23 任务结束时刻 : 2021-02-04 17:00:34 任务总计耗时 : 10s 任务平均流量 : 1B/s 记录写入速度 : 0rec/s 读出记录总数 : 3 读写失败总数 : 0
5.mysql 不同库表数据迁移
migration.json
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": ["id","name"], "connection": [ { "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/dream" ], "table": ["sys_user"] } ], "password": "root", "username": "root", "where": "id>29" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["id","name"], "connection": [ { "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/dream1", "table": ["user"] } ], "password": "root", "username": "root", "preSql": [], "session": [], "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "1" } } } }
输出
…… 2021-02-04 17:10:17.061 [job-0] INFO StandAloneJobContainerCommunicator - Total 4 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2021-02-04 17:10:17.061 [job-0] INFO JobContainer - 任务启动时刻 : 2021-02-04 17:10:06 任务结束时刻 : 2021-02-04 17:10:17 任务总计耗时 : 10s 任务平均流量 : 3B/s 记录写入速度 : 0rec/s 读出记录总数 : 4 读写失败总数 : 0
查看 dream1的 user数据已经插入
其他参数可以查看github上mysqlreader和mysqlwriter下的doc