简介
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github。
代码托管地址:
https://github.com/alibaba/DataX
插件的开发可以参考:
https://github.com/alibaba/DataX/wiki/DataX%E6%8F%92%E4%BB%B6%E5%BC%80%E5%8F%91%E5%AE%9D%E5%85%B8
下面是参考《DataX插件开发宝典》,进行插件开发的一个完整的流程。
新建模块
首先从 https://github.com/alibaba/DataX 克隆代码,然后执行如下命令:
mvn clean package -DskipTests assembly:assembly
这个命令会生成datax.tar.gz,我们可以将生成的文件部署到相应的环境,进行数据的导入导出。
后续,开发测试之后,可以使用上面的命令,将我们自己开发的插件集成到DataX中。
然后将代码导入到Eclispe(或者其它IDE)中,然后在这个项目下新建一个maven模块。
接着做一些配置,配置之后就可以进行开发了
配置
DataX使用Maven工具来管理项目,插件和框架使用多模块的方式组织在一起。使用maven-assembly-plugin将所有的模块打包在一起。
在开发的过程中,需要进行两方面的配置
- 插件模块本身的配置:目录和文件要满足约定
- DataX主模块的配置:需要配置2个文件:./pom.xml和./package.xml
插件本身的配置:
建立一个插件模块之后,源代码的结果应该满足如下结构:
pom的配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>qvxfilereader</artifactId>
<properties>
<datax-project-version>0.0.1-SNAPSHOT</datax-project-version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
这里要注意的是,
1 增加了对datax-common的依赖,这个依赖是DataX的框架,
2 配置了maven-assembly-plugin,在这里应用了src/main/assembly/package.xml。package.xml定义了打包之后的目录结构,这个目录结果是DataX约定好的。具体如下:
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/qvxfilereader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>qvxfilereader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/qvxfilereader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/qvxfilereader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
除了上面的两个xml文件,还需要两个json文件
plugin.json是对插件本省的描述,在框架根据这个文件来加载插件,例如:
{
"name": "qvxfilereader",
"class": "com.alibaba.datax.plugin.reader.qvxfilereader.QvxFileReader",
"description": "useScene: test. mechanism: use datax framework to transport data from qvx file. warn: The more you know about the data, the less problems you encounter.",
"developer": "dtstack.com"
}
plugin_job_template.json:插件的配置模块,用户在使用插件是,根据这个模版来进行配置。例如:
{
"name": "qvxfilereader",
"parameter": {
"path": [],
"fieldDelimiter": ""
}
}
DataX主模块的配置
pom.xml 中添加本插件的模块名称
模块名称artifactId
<modules>
<module>qvxfilereader</module>
</modules>
package.xml 中添加本插件的打包内容,以让 DataX 能够把该插件纳入整个插件体系。
<fileSet>
<directory>qvxfilereader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
开发
主要参考《DataX插件开发宝典》,这里面讲的比较详细。
下面是插件如何通过RecordSender
往channel
写入数据的伪代码:
public void startRead(RecordSender recordSender) {
Record record=recordSender.createRecord();
record.addColumn(new LongColumn(1));
record.addColumn(new StringColumn("hello,world!"));
recordSender.sendToWriter(record);
recordSender.flush();
}
测试
开发完成之后,可以通过执行命令:
mvn clean package -DskipTests assembly:assembly
来生成DataX,然后进行测试。
因为每次执行这条命令,会把DataX中所有的插件重新编译和打包,速度会比较慢。所以,可以修改DataX中的pom.xml和package.xml,模块只保留common和正在开发中的插件。