文章目录
1、DataX模板
方式一:DataX配置文件模板
python bin/datax.py -r mysqlreader -w hdfswriter
方式二:官方文档https://github.com/alibaba/DataX/blob/master/README.md
2、同步Mysql数据到HDFS案例
2.1 MySQLReader之TableMode
使用table,column,where等属性声明需要同步的数据
mysql_to_hdfs_T.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"where": "id>=3",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"table": [
"base_province"
]
}
],
"password": "123456",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
MysqlReader–TableMode格式:
HDFSWriter格式:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(’’),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决方案
修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑https://blog.csdn.net/u010834071/article/details/105506580
在Hive中建表时指定null值存储格式为空字符串(’’)
DROP TABLE IF EXISTS base_province; CREATE EXTERNAL TABLE base_province ( `id` STRING COMMENT '编号', `name` STRING COMMENT '省份名称', `region_id` STRING COMMENT '地区ID', `area_code` STRING COMMENT '地区编码', `iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用', `iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用' ) COMMENT '省份表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' NULL DEFINED AS '' LOCATION '/base_province/';
Setting参数说明:
容错比例配置均设为0
提交任务测试
-
需要先在hdfs上创建好指定目录
hadoop fs -mkdir /base_province
可通过修改DataX源码进行创建目录
-
执行命令将mysql中的数据导入hdfs
python bin/datax.py job/mysql_to_hdfs_T.json
-
查看hdfs
hadoop fs -cat /base_province/* | zcat
2.2 MySQLReader之QuerySQLMode
通过使用一条SQL查询语句声明需要同步的数据。
mysql_to_hdfs_sql.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
MysqlReader–QuerySQLMode格式:
提交任务测试
-
删除/base_province/下的所有文件
hadoop fs -rm -r -f /base_province/*
-
执行命令
python bin/datax.py job/mysql_to_hdfs_sql.json
-
查看HDFS文件
hadoop fs -cat /base_province/* | zcat
3、同步HDFS数据到Mysql案例
hdfs_to_mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"defaultFS": "hdfs://hadoop102:8020",
"path": "/base_province",
"column": [
"*"
],
"fileType": "text",
"compress": "gzip",
"encoding": "UTF-8",
"nullFormat": "\\N",
"fieldDelimiter": "\t",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"table": [
"test_province"
],
"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"
}
],
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
HDFSReader:
MySQLWriter:
提交任务测试
-
在Mysql中创建test_province表
DROP TABLE IF EXISTS `test_province`; CREATE TABLE `test_province` ( `id` bigint(20) NOT NULL, `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
可通过Navicate工具创建表
-
执行命令
python bin/datax.py job/test_province.json
-
刷新查看mysql数据
4、DataX传参案例
离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。
用法:
在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值。
test_parameter.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querySql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "hdfs://hadoop102:8020",
"fieldDelimiter": "\t",
"fileName": "base_province",
"fileType": "text",
"path": "/base_province/${dt}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
提交任务测试
-
在hdfs上创建路径
hadoop fs -mkdir /base_province/2022-01-11
-
执行命令
python bin/datax.py job/test_parameter.json -p"-Ddt=2020-06-14"
-
在hdfs上查看