Flume实战值时间戳的拦截器

背景:公司属于教育公司,自研一款线上教育app。由于疫情,导致公司业务扩大,数据量剧增。于是公司打算自研一套数据中台。本人有幸负责公司数据采集这一块项目。

解决的问题:根据埋点数据会产生一条json日志写到服务器指定的目录下。因此我需要采集到数据传入kafka之中,所以Flume组件成了必选项。本次主要介绍flume基于时间戳的拦截器

package com.tuoqing.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //  获取事件的值并转换成UTF_8的格式
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        //  将获取的字符串转换成对象
        JSONObject jsonObject = JSON.parseObject(log);
        //  获取对象中key值为"ts"的字段
        if (jsonObject.containsKey("ts")){
            String ts = jsonObject.getString("ts");
            event.getHeaders().put("timestamp",ts);
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{


        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

 

上一篇:C语言单链表


下一篇:单向链表的查删改功能,以及约瑟夫环,相交链表的第一个相交节点的查找等相关问题