背景:公司属于教育公司,自研一款线上教育app。由于疫情,导致公司业务扩大,数据量剧增。于是公司打算自研一套数据中台。本人有幸负责公司数据采集这一块项目。
解决的问题:根据埋点数据会产生一条json日志写到服务器指定的目录下。因此我需要采集到数据传入kafka之中,所以Flume组件成了必选项。本次主要介绍flume基于时间戳的拦截器
package com.tuoqing.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 获取事件的值并转换成UTF_8的格式
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
// 将获取的字符串转换成对象
JSONObject jsonObject = JSON.parseObject(log);
// 获取对象中key值为"ts"的字段
if (jsonObject.containsKey("ts")){
String ts = jsonObject.getString("ts");
event.getHeaders().put("timestamp",ts);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}