拦截器主要分两种:ETL 拦截器、日志类型区分拦截器。 ETL 拦截器主要用于过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往 Kafka 的不同Topic。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ldy</groupId> <artifactId>flume-interceptor</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
自定义ETL拦截器:
package com.ldy.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; public class LogETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1 获取数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 判断数据类型并向Header中赋值
//log中有个en(event_name)字段表明该日志是不是启动日志类型
if (log.contains("start")) { //3 校验数据 if (LogUtils.validateStart(log)){ return event; } }else { if (LogUtils.validateEvent(log)){ return event; } } // 4 返回校验结果 return null; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); if (intercept1 != null){ interceptors.add(intercept1); } } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogETLInterceptor(); } @Override public void configure(Context context) { } } }
自定义Utils(ETL的清洗工具类):
package com.ldy.flume.interceptor; import org.apache.commons.lang.math.NumberUtils; public class LogUtils { //检测event数据格式 public static boolean validateEvent(String log) { // 样本: 服务器时间 | json /* 1613956146081| {"cm": {"ln":"-69.2","sv":"V2.3.0","os":"8.1.1","g":"G0MLZ2K4@gmail.com","mid":"1","nw":"4G","l":"pt","vc":"10","hw":"1080*1920","ar":"MX","uid":"1","t":"1613861466406","la":"11.9","md":"sumsung-4","vn":"1.0.9","ba":"Sumsung","sr":"V"}, "ap":"app", "et":[{"ett":"1613872348614", "en":"newsdetail", "kv":{"entry":"3","goodsid":"0","news_staytime":"24","loading_time":"0","action":"2","showtype":"1","category":"9","type1":""}}, {"ett":"1613873812304", "en":"loading", "kv":{"extend2":"","loading_time":"6","action":"3","extend1":"","type":"1","type1":"325","loading_way":"2"}}, ]} */ // 1 在|切割 String[] logContents = log.split("\\|"); // 2 校验 if(logContents.length != 2){ return false; } //3 校验服务器时间 if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){ return false; } // 4 校验json if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){ return false; } return true; } //检测start数据格式 public static boolean validateStart(String log) { /* {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2", "extend1 ":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4", "ln":"-98.3","lo ading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1", "os":"8.2. 1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"} */ if (log == null){ return false; } // 校验json if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){ return false; } return true; } }
自定义Type拦截器:
package com.ldy.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * Event有Headers,Body两个部位 * Body存放具体数据,如果数据为启动日志,会专门有一个字段来记录 "en":"start" * Headers可以存放主题信息区分日志类型,以便发送不同的topic */ public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1 获取body数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 获取header Map<String, String> headers = event.getHeaders(); // 3 判断数据类型并向Header中赋值 if (log.contains("start")) { headers.put("topic","topic_start"); }else { headers.put("topic","topic_event"); } return event; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); interceptors.add(intercept1); } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } }
最后打包放到flume/lib下
只要在flume的job文件使用到拦截器就能生效了