背景:morphline是一个轻量级的etl工具。除了提供标准化的方法之外,还可以定制化的开发java片段。定制化的java片段会在加载时被作为一个独立的类编译,对源数据作处理。
morphline关于java片段的例子,在配置文件上编写java代码太难了,尤其是在中文输入法下,可能逗号或者引号打错了也浑然不知。
java {
imports : "import java.util.*;"
code: """
// Update some custom metrics - see http://metrics.codahale.com/getting-started/
context.getMetricRegistry().counter("myMetrics.myCounter").inc(1);
context.getMetricRegistry().meter("myMetrics.myMeter").mark(1);
context.getMetricRegistry().histogram("myMetrics.myHistogram").update(100);
com.codahale.metrics.Timer.Context timerContext = context.getMetricRegistry().timer("myMetrics.myTimer").time(); // manipulate the contents of a record field
List tags = record.get("tags");
if (!tags.contains("hello")) {
return false;
}
tags.add("world"); logger.debug("tags: {} for record: {}", tags, record); // log to SLF4J
timerContext.stop(); // measure how much time the code block took
return child.process(record); // pass record to next command in chain
"""
}
在eclipse下开发代码也比较简单。定义一个如下的类,在test()方法里面开发代码段,需要import的包就在上面定义,这样就可以利用eclipse的编译功能来纠错了。然后把import段拷贝到morphline的import字段,把test()里面的内容拷贝到code:”””//[code] ”””里面。
package test;
import java.util.Collections;
import java.util.Iterator;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.Fields;
import com.fasterxml.jackson.databind.JsonNode; public class TestMain
{
Record record;
Command child; public boolean test()
{
JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY);
JsonNode jsonNode = rootNode.get("tags");
if (jsonNode.isArray())
{
Iterator<JsonNode> tags = jsonNode.elements();
while (tags.hasNext())
{
JsonNode next = tags.next();
String name = next.get("name").asText();
JsonNode values = next.get("value");
for (JsonNode value : values)
{
record.put("custom_tag", name + "=" + value.asText());
}
}
}
return child.process(record);
}
}