一、Flume概述
1.1 定义
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。
Flume在实际开发中主要的作用就是,实时的读取服务器本地磁盘的数据,将数据写入到HDFS中。
1.2 Flume架构
-
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent主要有3个部分组成,Source、Channel、Sink。
-
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
-
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
-
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
-
Flume自带两种Channel:Memory Channel和File Channel。
-
Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
-
File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
-
Event:传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
二、Flume下载安装
-
Flume官网地址:http://flume.apache.org/
- 下载地址:http://archive.apache.org/dist/flume/
安装步骤:
1、将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下,并解压到/opt/moudle/目录下
$ tar -zxf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
#修改名字
mv apache-flume-1.9.0-bin flume
2、将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
$ mv flume-env.sh.template flume-env.sh $ vi flume-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_144
3、移除jar,避免冲突
$ rm -rf lib/guava-11.0.2.jar
4、修改log4j配置文件
$ vim log4j.properties #将默认log路径配置:flume.log.dir=./logs修改 flume.log.dir=/opt/module/flume/logs
最后,将flume分发集群。
三、Flume日志采集配置实操
按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
此处选择TaildirSource和KafkaChannel,并配置日志校验拦截器。原因如下:
-
TailDirSource相比ExecSource、SpoolingDirectorySource的优势:
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
-
采用Kafka Channel,省去了Sink,提高了效率。
配置具体步骤:
1、在hadoop102节点的Flume的job目录下创建file_to_kafka.conf
$ mkdir job $ vim job/file_to_kafka.conf
2、配置以下内容
#为各组件命名 a1.sources = r1 a1.channels = c1 #描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1
#读取的数据地址 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
#拦截器的配置文件 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder #配置kafka channel,省略了sink直接连接kafka a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 # 输出到kafka 的topic_log主题上 a1.channels.c1.kafka.topic = topic_log #默认通过channel输出事件,但因为没有了sink,而是直接连接的kafka,所有要设置为false a1.channels.c1.parseAsFlumeEvent = false #绑定source和channel以及sink和channel的关系 a1.sources.r1.channels = c1
3、配置flume拦截器
打开IDEA,
创建Maven工程flume-interceptor,
创建包:com.atguigu.flume.interceptor,
在pom.xml文件中添加如下配置:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
在com.atguigu.flume.interceptor包下创建JSONUtils类:
package com.atguigu.flume.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException; /** * @Readme :判断是否为json */ public class JSONUtils { public static boolean isJSONValidate(String log){ try { JSON.parse(log); return true; }catch (JSONException e){ return false; } } }
在com.atguigu.flume.interceptor包下创建ETLInterceptor 类
package com.atguigu.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; /** * @Readme * 1、继承Interceptor接口 * 2、实现四个方法,public List<Event> intercept(List<Event> list)这个最重要 * 3、静态内部类build实现Interceptor.Builder接口(建造者模式) */ public class ETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { //需求:过滤Event中的数据是否是json格式 byte[] body = event.getBody(); String log = new String(body, StandardCharsets.UTF_8); if (JSONUtils.isJSONValidate(log)) { return event; } else { return null; } } @Override public List<Event> intercept(List<Event> list) { //将处理之后为null的event删除掉 Iterator<Event> iterator = list.iterator(); while (iterator.hasNext()){ Event next = iterator.next(); if(intercept(next)==null){ iterator.remove(); } } return list; } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new ETLInterceptor(); } @Override public void configure(Context context) { } } @Override public void close() { } }
最后,打包并将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
四、Flume日志采集测试