DataX工具的使用

一、DataX框架

1、Datax3.0设计框架

DataX工具的使用

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

2、DataX3.0核心架构

DataX工具的使用

核心模块介绍:

1)DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

2)DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3)切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

4)每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

5)DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

3、DataX脚本初试

3.1 Reader

1)MySQL Reader

方法一:简单模式,直接写需要查询的字段

"reader": {
    "name": "mysqlreader",
    "parameter": {
        "username": "Username",
        "password": "Password",
        "column": ["id", "name"],        //查询表字段
        "splitPk": "id",                        //切分键
        "connection": [{
            "table": ["tableName"],                //表名
            "jdbcUrl": ["jdbc:mysql://172.16.4.37:3306/db1"]    //jdbcurl连接串
        }]
}
}

方法二:querySQL模式,可自定义编写SQL内容

"reader": {
    "name": "mysqlreader",
    "parameter": {
        "username": "Username",
        "password": "Password",
        "connection": [{
            "querySql": ["select id,k,c,pad from sbtest1 where all_query_conditions"],    //查询语句,此处可写复杂查询
            "jdbcUrl": ["jdbc:mysql://172.16.4.37:3306/db2"]    //jdbcurl连接串
        }]
    }
}

2)odpsreader

"reader": {
    "name": "odpsreader",
    "parameter": {
        "accessId": "accessId",
        "accessKey": "accessKey",
        "project": "targetProjectName",  //odps项目名
        "table": "tableName",        //odps表名
        "partition": [                 //分区信息
        "all_query_conditions"],
        "column": [                     //查询表字段
        "customer_id", "nickname"],
        "packageAuthorizedProject": "yourCurrentProjectName",
        "splitMode": "record",
        "odpsServer": "http://xxx/api" //odps的endpoint
}
}

3.2 Writer

1)MySQL writer

"writer": {
    "name": "mysqlwriter",
    "parameter": {
        "writeMode": "insert",    //数据写入方式
        "username": "Username",
        "password": "Password",
        "column": ["id", "k", "c", "pad"],    //写入字段
        "session": ["set session innodb_lock_wait_timeout=1000"],    //session操作
        "preSql": ["delete from tableName "],        //前置操作SQL
        "connection": [{
            "jdbcUrl": "jdbc:mysql://172.16.4.37:3306/db2",
            "table": ["tableName"]
        }]
    }
}

2)odpswriter

"writer": {
    "name": "odpswriter",
    "parameter": {
        "accessId": "accessId",
        "accessKey": "accessKey",
        "accountType": "aliyun",
        "column": ["id", "k", "c", "pad"],
        "odpsServer": "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", //endpoint
        "partition": "all_insert_conditions",        //分区条件
        "project": "ProjectName",
        "table": "tableName",
        "truncate": true        //在操作前是否先进行分区删除
    }
}

3.3 Setting

"setting": {
    "errorLimit": {
        "record": ""    //允许脏数据记录数
    },
    "speed": {
        "concurrent": 2,    //最大并发数
        "throttle": true,    //是否限速
        "mbps": 1        //具体限速大小
    }
}

3.4 如何通过脚本实现每日自动化调度

1、数据脚本demo

将原开发脚本中all_query_conditions作为查询条件作为每日替换目标,将all_insert_conditions作为每日写入分区的条件作为每日替换,可查考上述给出的一些模版

2、编写具体数据传输条件的数据集成调度脚本(以myql2odps为例)

脚本查询、写入分区条件定制

#vim imp_data.sh
#根据$1识别不同的数据集成任务名,根据$2识别分区,若有多个分区可继续使用$3、$4。使用一个调度脚本满足多个数据集成任务的数据传输条件
interface_name=$1
interface_name_pt=${interface_name}$2$3$4    #为不同的数据集成脚本任务编写demo模版

#拷贝脚本demo为具体的具体的执行脚本
cp /home/admin/scripts/datax-${interface_name}-demo.json /home/admin/config/datax-${interface_name_pt}-dev.json

#查询条件、写入表分区条件判断
in_conditions=''
if [ ${interface_name} == 'mysqlqury2odps1' ]; then      //根据数据集成任务名$1判断reader条件
    check_time=$2' '$3$4
    check_unixtime=`date -d "${check_time}" +%s`
    in_conditions='pt='$2' and _timestamp='${check_unixtime}
elif [ ${interface_name} == 'mysqlqury2odps2' ]; then
    in_conditions='pt='$2' and hh='$3
fi
echo $in_conditions

out_conditions='ds='$2',hh='$3',mm='$4
if [ ! -n "$4" ]; then
    out_conditions='ds='$2',hh='$3',mm=00'
fi
echo $out_conditions

#参数替换-查询条件
sed -i "s/all_query_conditions/$in_conditions/g" /home/admin/config/datax-${interface_name_pt}-dev.json

#参数替换-分区条件
sed -i "s/all_insert_conditions/$out_conditions/g" /home/admin/config/datax-${interface_name_pt}-dev.json

###命令执行
/opt/datax/bin/datax.py /home/admin/config/datax-${interface_name_pt}-dev.json
#rm /home/admin/gaode_data/config/datax-${interface_name_pthh}-dev.json

3、任务执行

bash imp_data.sh ${interface_name} ${pt} ${hh} ${ss}

4、实现每日自动定时调度

1)若仅是用ECS做数据同步,配置contab进行调度

2)若该ECS作为DataWorks作为自定义资源组进行调度,那可将任务执行命令写在shell脚本中,每个$1配置为一个shell任务,并为$2$3$4配置自定义参数即可实现每日定时调度

文章参考:

https://github.com/alibaba/DataX/blob/master/introduction.md

上一篇:MySQL如何对order by优化?


下一篇:JAVA 几种引用类型学习