在java工程中创建项目,内容如下:
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class InterceptorDemo implements Interceptor {
private List<Event> addHeaderEvents;
@Override
public void initialize() {
addHeaderEvents = new ArrayList<>();
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
Map<String, String> headers = event.getHeaders();
String bodyStr = new String(body);
if (bodyStr.contains("kb07")) {
headers.put("type", "kb07");
} else {
headers.put("type", "kb09");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
addHeaderEvents.clear();
for (Event event: events)
addHeaderEvents.add(intercept(event));
return addHeaderEvents;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new InterceptorDemo();
}
@Override
public void configure(Context context) {
}
}
}
点击右侧的maven,先双击clean,再双击package打jar包
之后将jar包导入虚拟机,路径如下
/opt/flume160/lib
在虚拟机中的flume目录下执行命令:
flume-ng agent --name ictdemo2 --conf ./conf/ --conf-file ./conf/jobkb09/netcat-flume-interceptor2-hdfs.conf -Dflume.root.logger=INFO,console
在虚拟机的另一个窗口执行以下命令:telnet localhost 7777
输入
aaaaaaakb09bbbbbbbbbbbbbbbbbbb
bbbbbbbbbkb07cccccccccccccccc
ccccccccckb09cddddddddddddddddd
aaaaaaakb09ddddddddddddddd
bbbbbbbbbkb07rrrrrrrrrrrrrrrrrrr
ccccccccckb092222222222222222
aaaaaaakb0944444444444444444
bbbbbbbbbkb07666666666666666666
ccccccccckb09sssssssssssssssss
aaaaaaakb09wwwwwwwwwwwwww
bbbbbbbbbkb07sssssssssssssss
ccccccccckb09mmmmmmmmmmmmm
浏览器打开192.168.134.104:50070,生成如下文件