原来做datax-web,现将datax原来使用开发过程中的一些坑进行记录
一. DataX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
二.Features
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
三.System Requirements
- Linux
- JDK(1.8以上,推荐1.8)
- [Python(推荐Python2.6.X) ] (https://www.python.org/downloads/)
- Apache Maven 3.x (Compile DataX)
Quick Start
-
方法一、下载DataX源码,自己编译:DataX源码
(1)、下载DataX源码:
$ git clone https://github.com/waterWang/DataX.git
(2)、通过maven打包:
$ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志显示如下:
打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:
-
配置示例:从stream读取数据并打印到控制台
-
第一步、创建作业的配置文件json格式(想写datax任务,不知道模板怎么写)
可以通过命令查看配置模板:
>python datax.py -r {YOUR_READER} -w {YOUR_WRITER} # 模板 >python D:\idea-workspace\github\DataX\target\datax\datax\bin\datax.py -r streamreader -w streamwriter #具体例子
-
第二步:启动DataX(已经有datax的json,要运行)
>python datax.py ./xxx.json # 模板 >python D:\idea-workspace\github\DataX\target\datax\datax\bin\datax.py D:\idea-workspace\github\DataX\target\datax\datax\job\job.json #具体例子
同步结束,显示日志如下:
-
小坑
如果本地是Python3环境,运行官方给出的\job\job.json 报错
解決方法:
- core.transport.channel.speed.record和job.setting.speed.record需要同时进行设置或全都不设置
- core.transport.channel.speed.byte和job.setting.speed.byte需要同时进行设置或全都不设置
最终json如下
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 100000
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
}
}
]
}
}
原因:
源代码入下
JobContainer.java
if (isByteLimit) {
long globalLimitedByteSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
}
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
}
#===========
if (isRecordLimit) {
long globalLimitedRecordSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
Long channelLimitedRecordSpeed = this.configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
}
needChannelNumberByRecord =
(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
}