Flume入门

一、Flume概述

1.1 定义

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

Flume在实际开发中主要的作用就是,实时的读取服务器本地磁盘的数据,将数据写入到HDFS中。

1.2 Flume架构

Flume入门

 

  • Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent主要有3个部分组成,Source、Channel、Sink

  • Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

  • Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

  • Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

  • Flume自带两种Channel:Memory ChannelFile Channel

  • Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

  • File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

  • Event:传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。Flume入门

     

     

二、Flume下载安装

安装步骤:

1、将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下,并解压到/opt/moudle/目录下

$ tar -zxf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

#修改名字
mv apache-flume-1.9.0-bin flume

2、将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

$ mv flume-env.sh.template flume-env.sh
$ vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144

3、移除jar,避免冲突

$ rm -rf  lib/guava-11.0.2.jar

4、修改log4j配置文件

$ vim log4j.properties 

#将默认log路径配置:flume.log.dir=./logs修改
flume.log.dir=/opt/module/flume/logs

最后,将flume分发集群。

三、Flume日志采集配置实操

  按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。

  此处选择TaildirSource和KafkaChannel,并配置日志校验拦截器。原因如下:

  • TailDirSource相比ExecSource、SpoolingDirectorySource的优势:

    TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

    ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

    SpoolingDirectorySource监控目录,支持断点续传。

  • 采用Kafka Channel,省去了Sink,提高了效率。

配置具体步骤:

1、在hadoop102节点的Flume的job目录下创建file_to_kafka.conf

$ mkdir job
$ vim job/file_to_kafka.conf

2、配置以下内容

#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
#读取的数据地址 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
#拦截器的配置文件 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder #配置kafka channel,省略了sink直接连接kafka a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 # 输出到kafka 的topic_log主题上 a1.channels.c1.kafka.topic = topic_log #默认通过channel输出事件,但因为没有了sink,而是直接连接的kafka,所有要设置为false a1.channels.c1.parseAsFlumeEvent = false #绑定source和channel以及sink和channel的关系 a1.sources.r1.channels = c1

3、配置flume拦截器

打开IDEA,

创建Maven工程flume-interceptor,

创建包:com.atguigu.flume.interceptor,

在pom.xml文件中添加如下配置:

 

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

 

在com.atguigu.flume.interceptor包下创建JSONUtils类:

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

/**
 * @Readme :判断是否为json
 */
public class JSONUtils {
    public static boolean isJSONValidate(String log){
        try {
            JSON.parse(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

在com.atguigu.flume.interceptor包下创建ETLInterceptor 类

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

/**
 * @Readme
 * 1、继承Interceptor接口
 * 2、实现四个方法,public List<Event> intercept(List<Event> list)这个最重要
 * 3、静态内部类build实现Interceptor.Builder接口(建造者模式)
 */
public class ETLInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //需求:过滤Event中的数据是否是json格式
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        if (JSONUtils.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        //将处理之后为null的event删除掉
        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }
        return list;
    }
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

最后,打包并将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

四、Flume日志采集测试

 

上一篇:P5290 [十二省联考2019]春节十二响


下一篇:rabbitmq工作模式---路由模式