前面2篇文章,已经看完+学习完transform的内容,今天继续编写一个自己的transformer;
一、环境
- win10
- DataX 3.0(从我的datax分支打包而来)
- job.json使用datax的样例json,源文件在xxx\DataX\core\src\main\job\中,打包编译后在xxx\DataX\target\datax\datax\job下。本文略做修改,主要修改2出,是否打印和记录行数
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 59880504,
"type": "long"
},
{
"value": "2010-05-04 00:00:00",
"type": "string" // 原来是date
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1 // 原来是100000,修改为1行,看到效果就行
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true, //原来是false,不打印,为了在控制台看到效果,修改为true
"encoding": "UTF-8"
}
}
}
]
}
}
二、效果
先看下不加transformer时候的运行效果
加上自定义的DateTransformer(主要实现时间类型字符串格式转换)
配置后运行效果:
三、实现步骤
- 写一个类DateTransformer继承Transformer;
- 里面构造方法设置一个唯一标识符,用来代表本transformer;
- 重写evaluate()方法,根据自己实现情况定义paras参数个数及含义
- 将DateTransformer注册到TransformerRegistry中;
四、具体代码
编写DateTransformer类,写构造方法,写evaluate方法
package com.alibaba.datax.core.transport.transformer;
import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER;
import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.transformer.Transformer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
/**
* @author water
* @desc 自己定义一个时间类型转换的类
*/
public class DateTransformer extends Transformer {
/**
* 通过设置name,标注出唯一的标识符,代表被类
*/
public DateTransformer() {
setTransformerName("dx_date");
}
/**
* @param record Record
* @param paras Object 第一个参数是 colIndex。 第二个参数是原时间格式,第三个是目标时间格式
* @return Record
*/
@Override
public Record evaluate(Record record, Object... paras) {
int colIndex;
String oldPattern;
String newPattern;
try {
if (paras.length != 3) {
throw new RuntimeException("dx_date paras must be 3");
}
colIndex = (Integer) paras[0];
oldPattern = (String) paras[1];
newPattern = (String) paras[2];
} catch (Exception e) {
throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,
"paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
}
Column column = record.getColumn(colIndex);
try {
String oriValue = column.asString();
//如果字段为空,跳过处理
if (oriValue == null) {
return record;
}
SimpleDateFormat oldSdf = new SimpleDateFormat(oldPattern);
Date date = oldSdf.parse(oriValue);
SimpleDateFormat newSdf = new SimpleDateFormat(newPattern);
String newValue = newSdf.format(date);
record.setColumn(colIndex, new StringColumn(newValue));
} catch (Exception e) {
throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
}
return record;
}
}
将DateTransformer注册到TransformerRegistry中
TransformerRegistry类中
static {
/**
* add native transformer
* local storage and from server will be delay load.
* 官方默认注册了 5 个方法,分别是截取字符串、填补、替换、过滤、groovy 代码段(后面会详细介绍)
*/
registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
//将自己写的transformer注册进来
registTransformer(new DateTransformer());
}
so easy,哪里需要写哪里~
注:
-
对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;
-
所有代码都已经上传到github(master分支和dev),可以免费白嫖