一. DataX3.0概览
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
此前已经开源DataX1.0版本,此次介绍为阿里巴巴开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX。
二、DataX3.0框架设计
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
三. DataX3.0插件体系
经过几年积累,DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
类型 |
数据源 |
Reader(读) |
Writer(写) |
RDBMS 关系型数据库 |
Mysql |
√ |
√ |
|
Oracle |
√ |
√ |
|
SqlServer |
√ |
√ |
|
Postgresql |
√ |
√ |
|
达梦 |
√ |
√ |
阿里云数仓数据存储 |
ODPS |
√ |
√ |
|
ADS |
|
√ |
|
OSS |
√ |
√ |
|
OCS |
√ |
√ |
NoSQL数据存储 |
OTS |
√ |
√ |
|
Hbase0.94 |
√ |
√ |
|
Hbase1.1 |
√ |
√ |
|
MongoDB |
√ |
√ |
无结构化数据存储 |
TxtFile |
√ |
√ |
|
FTP |
√ |
√ |
|
HDFS |
√ |
√ |
DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。详情请看:DataX数据源指南
四、DataX3.0核心架构
DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
核心模块介绍:
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
- DataXJob根据分库分表切分成了100个Task。
- 根据20个并发,DataX计算共需要分配4个TaskGroup。
- 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
五、DataX安装:
(1)、下载DataX源码:
[root@iZ2zeh44pi6rlahxj7s9azZ ~]# git clone git@github.com:alibaba/DataX.git
(2)、通过maven打包:
[root@iZ2zeh44pi6rlahxj7s9azZ ~]# cd {DataX_source_code_home}
[root@iZ2zeh44pi6rlahxj7s9azZ ~]# yum install -y maven
[root@iZ2zeh44pi6rlahxj7s9azZ ~]# mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志显示如下:
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------
打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:
[root@iZ2zeh44pi6rlahxj7s9azZ ~]# cd {DataX_source_code_home}
[root@iZ2zeh44pi6rlahxj7s9azZ ~]# ls ./target/datax/datax/
bin conf job lib log log_perf
六、mysql数据传输到oracle
(1)、生成mysql到oracle的配置文件:
[root@iZ2zeh44pi6rlahxj7s9azZ bin]# python datax.py -r mysqlreader -w oraclewriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.
Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Please refer to the oraclewriter document:
https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"password": "",
"preSql": [],
"username": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
(2)、传输文件配置
[root@iZ2zeh44pi6rlahxj7s9azZ bin]# vim ../job/myor.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","qiye","diqu"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://[HOST_NAME]:PORT/[DATABASE_NAME]"],
"table": ["***"]
}
],
"password": "***",
"username": "***",
"where": ""
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"column": ["id","qiye","diqu"],
"connection": [
{
"jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",
"table": ["***"]
}
],
"password": "***",
"preSql": ["delete from ceshi"],
"username": "***"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
(3)、执行传输过程
[root@iZ2zeh44pi6rlahxj7s9azZ bin]# python datax.py ../job/myor.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.
2017-09-25 20:09:01.200 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2017-09-25 20:09:01.215 [main] INFO Engine - the machine info =>
osInfo: Oracle Corporation 1.8 25.144-b01
jvmInfo: Linux amd64 3.10.0-514.6.2.el7.x86_64
cpu num: 1
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
.
.
.
2017-09-25 20:19:12.557 [job-0] INFO StandAloneJobContainerCommunicator - Total 1419776 records, 31164761 bytes | Speed 54.10KB/s, 2457 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 597.972s | All Task WaitReaderTime 1.983s | Percentage 0.00%
2017-09-25 20:19:32.558 [job-0] INFO StandAloneJobContainerCommunicator - Total 1464832 records, 32195809 bytes | Speed 50.34KB/s, 2252 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 617.847s | All Task WaitReaderTime 2.031s | Percentage 0.00%
2017-09-25 20:19:42.559 [job-0] INFO StandAloneJobContainerCommunicator - Total 1491456 records, 32744862 bytes | Speed 53.62KB/s, 2662 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 628.044s | All Task WaitReaderTime 2.054s | Percentage 0.00%
2017-09-25 20:19:52.561 [job-0] INFO StandAloneJobContainerCommunicator - Total 1516032 records, 33271617 bytes | Speed 51.44KB/s, 2457 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 637.787s | All Task WaitReaderTime 2.076s | Percentage 0.00%
(4)、登录oracle验证传输是否成功
未传输时:
[oracle@iz2zec57gfl6i9vbtdksl1z ~]$ sqlplus ***/***
SQL*Plus: Release 11.2.0.4.0 Production on Mon Sep 25 20:11:44 2017
Copyright (c) 1982, 2013, Oracle. All rights reserved.
Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
With the Partitioning, Oracle Label Security, OLAP, Data Mining,
Oracle Database Vault and Real Application Testing options
SQL> select * from ceshi;
no rows selected
传输后:
[oracle@iz2zec57gfl6i9vbtdksl1z ~]$ sqlplus ***/***
SQL*Plus: Release 11.2.0.4.0 Production on Mon Sep 25 20:15:28 2017
Copyright (c) 1982, 2013, Oracle. All rights reserved.
Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
With the Partitioning, Oracle Label Security, OLAP, Data Mining,
Oracle Database Vault and Real Application Testing options
SQL> select count(*) from ceshi;
COUNT(*)
----------
917504