一、DataX框架
1、Datax3.0设计框架
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
2、DataX3.0核心架构
核心模块介绍:
1)DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
2)DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
3)切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
4)每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
5)DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
3、DataX脚本初试
3.1 Reader
1)MySQL Reader
方法一:简单模式,直接写需要查询的字段
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "Username",
"password": "Password",
"column": ["id", "name"], //查询表字段
"splitPk": "id", //切分键
"connection": [{
"table": ["tableName"], //表名
"jdbcUrl": ["jdbc:mysql://172.16.4.37:3306/db1"] //jdbcurl连接串
}]
}
}
方法二:querySQL模式,可自定义编写SQL内容
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "Username",
"password": "Password",
"connection": [{
"querySql": ["select id,k,c,pad from sbtest1 where all_query_conditions"], //查询语句,此处可写复杂查询
"jdbcUrl": ["jdbc:mysql://172.16.4.37:3306/db2"] //jdbcurl连接串
}]
}
}
2)odpsreader
"reader": {
"name": "odpsreader",
"parameter": {
"accessId": "accessId",
"accessKey": "accessKey",
"project": "targetProjectName", //odps项目名
"table": "tableName", //odps表名
"partition": [ //分区信息
"all_query_conditions"],
"column": [ //查询表字段
"customer_id", "nickname"],
"packageAuthorizedProject": "yourCurrentProjectName",
"splitMode": "record",
"odpsServer": "http://xxx/api" //odps的endpoint
}
}
3.2 Writer
1)MySQL writer
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert", //数据写入方式
"username": "Username",
"password": "Password",
"column": ["id", "k", "c", "pad"], //写入字段
"session": ["set session innodb_lock_wait_timeout=1000"], //session操作
"preSql": ["delete from tableName "], //前置操作SQL
"connection": [{
"jdbcUrl": "jdbc:mysql://172.16.4.37:3306/db2",
"table": ["tableName"]
}]
}
}
2)odpswriter
"writer": {
"name": "odpswriter",
"parameter": {
"accessId": "accessId",
"accessKey": "accessKey",
"accountType": "aliyun",
"column": ["id", "k", "c", "pad"],
"odpsServer": "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", //endpoint
"partition": "all_insert_conditions", //分区条件
"project": "ProjectName",
"table": "tableName",
"truncate": true //在操作前是否先进行分区删除
}
}
3.3 Setting
"setting": {
"errorLimit": {
"record": "" //允许脏数据记录数
},
"speed": {
"concurrent": 2, //最大并发数
"throttle": true, //是否限速
"mbps": 1 //具体限速大小
}
}
3.4 如何通过脚本实现每日自动化调度
1、数据脚本demo
将原开发脚本中all_query_conditions作为查询条件作为每日替换目标,将all_insert_conditions作为每日写入分区的条件作为每日替换,可查考上述给出的一些模版
2、编写具体数据传输条件的数据集成调度脚本(以myql2odps为例)
脚本查询、写入分区条件定制
#vim imp_data.sh
#根据$1识别不同的数据集成任务名,根据$2识别分区,若有多个分区可继续使用$3、$4。使用一个调度脚本满足多个数据集成任务的数据传输条件
interface_name=$1
interface_name_pt=${interface_name}$2$3$4 #为不同的数据集成脚本任务编写demo模版
#拷贝脚本demo为具体的具体的执行脚本
cp /home/admin/scripts/datax-${interface_name}-demo.json /home/admin/config/datax-${interface_name_pt}-dev.json
#查询条件、写入表分区条件判断
in_conditions=''
if [ ${interface_name} == 'mysqlqury2odps1' ]; then //根据数据集成任务名$1判断reader条件
check_time=$2' '$3$4
check_unixtime=`date -d "${check_time}" +%s`
in_conditions='pt='$2' and _timestamp='${check_unixtime}
elif [ ${interface_name} == 'mysqlqury2odps2' ]; then
in_conditions='pt='$2' and hh='$3
fi
echo $in_conditions
out_conditions='ds='$2',hh='$3',mm='$4
if [ ! -n "$4" ]; then
out_conditions='ds='$2',hh='$3',mm=00'
fi
echo $out_conditions
#参数替换-查询条件
sed -i "s/all_query_conditions/$in_conditions/g" /home/admin/config/datax-${interface_name_pt}-dev.json
#参数替换-分区条件
sed -i "s/all_insert_conditions/$out_conditions/g" /home/admin/config/datax-${interface_name_pt}-dev.json
###命令执行
/opt/datax/bin/datax.py /home/admin/config/datax-${interface_name_pt}-dev.json
#rm /home/admin/gaode_data/config/datax-${interface_name_pthh}-dev.json
3、任务执行
bash imp_data.sh ${interface_name} ${pt} ${hh} ${ss}
4、实现每日自动定时调度
1)若仅是用ECS做数据同步,配置contab进行调度
2)若该ECS作为DataWorks作为自定义资源组进行调度,那可将任务执行命令写在shell脚本中,每个$1配置为一个shell任务,并为$2$3$4配置自定义参数即可实现每日定时调度
文章参考:
https://github.com/alibaba/DataX/blob/master/introduction.md