flume拦截器

拦截器主要分两种:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要用于过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往 Kafka 的不同Topic。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ldy</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>

                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

自定义ETL拦截器:

package com.ldy.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 1 获取数据
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 判断数据类型并向Header中赋值
     //log中有个en(event_name)字段表明该日志是不是启动日志类型
if (log.contains("start")) { //3 校验数据 if (LogUtils.validateStart(log)){ return event; } }else { if (LogUtils.validateEvent(log)){ return event; } } // 4 返回校验结果 return null; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); if (intercept1 != null){ interceptors.add(intercept1); } } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogETLInterceptor(); } @Override public void configure(Context context) { } } }

 

自定义Utils(ETL的清洗工具类):

package com.ldy.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {

    //检测event数据格式
    public static boolean validateEvent(String log) {
        // 样本: 服务器时间 | json
        /*  1613956146081|
        {"cm":
            {"ln":"-69.2","sv":"V2.3.0","os":"8.1.1","g":"G0MLZ2K4@gmail.com","mid":"1","nw":"4G","l":"pt","vc":"10","hw":"1080*1920","ar":"MX","uid":"1","t":"1613861466406","la":"11.9","md":"sumsung-4","vn":"1.0.9","ba":"Sumsung","sr":"V"},

         "ap":"app",
         "et":[{"ett":"1613872348614",
                "en":"newsdetail",
                "kv":{"entry":"3","goodsid":"0","news_staytime":"24","loading_time":"0","action":"2","showtype":"1","category":"9","type1":""}},

             {"ett":"1613873812304",
                "en":"loading",
                 "kv":{"extend2":"","loading_time":"6","action":"3","extend1":"","type":"1","type1":"325","loading_way":"2"}},
               ]}

         */

        // 1 在|切割
        String[] logContents = log.split("\\|");

        // 2 校验
        if(logContents.length != 2){
            return false;
        }

        //3 校验服务器时间
        if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
            return false;
        }

        // 4 校验json
        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
            return false;
        }

        return true;
    }

    //检测start数据格式
    public static boolean validateStart(String log) {
    /* {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2",
    "extend1 ":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4",
    "ln":"-98.3","lo ading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1",
    "os":"8.2. 1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}
    */
        if (log == null){
            return false;
        }

        // 校验json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
            return false;
        }

        return true;
    }
}

 

自定义Type拦截器:

package com.ldy.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Event有Headers,Body两个部位
 * Body存放具体数据,如果数据为启动日志,会专门有一个字段来记录 "en":"start"
 * Headers可以存放主题信息区分日志类型,以便发送不同的topic
 */
public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 1 获取body数据
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        // 2 获取header
        Map<String, String> headers = event.getHeaders();

        // 3 判断数据类型并向Header中赋值
        if (log.contains("start")) {
            headers.put("topic","topic_start");
        }else {
            headers.put("topic","topic_event");
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event intercept1 = intercept(event);

            interceptors.add(intercept1);
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements  Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

 

最后打包放到flume/lib下

只要在flume的job文件使用到拦截器就能生效了

上一篇:Linux下安装Flume


下一篇:Spark Streaming处理Flume数据练习