flume简介及netcat样例

一、简介

Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据的简单灵活的架构、可靠的可靠性机制和许多故障转移和恢复机制,以及强大的容错性。它使用简单的可扩展数据模型,允许在线分析数据。很多大数据分析系统都通过flume来获取数据的输入。

Flume最早是Cloudera提供的日志收集系统,后来成为Apache下的项目,目前已经是Apache下的*项目。

Flume有两个版本,一个是老的Flume 0.9X版本,统称为Flume-og;另一个是经过重大重构后的版本Flume1.X,统称为Flume-ng。在本文中,我们使用的是最新的Flume-ng版本,版本号是Apache Flume 1.9.0。

Flume的官方下载网站是http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

Flume是用java开发的,其运行依赖java环境。Flume可以在windows和linux下运行,本文示例所用的是linux环境。

二、核心概念

Event:消息的基本单位,由header和body组成。

Agent:一个独立JVM进程,负责将一端外部来源产生的消息转 发到另一端外部的目的地。在分布式部署中,每个节点部署一个Agent。多个Agent就组成了Flume的分布式系统。

(一)Agent(代理)

Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图示例:

flume简介及netcat样例

上图是Agent的一个最基本的结构,一个Agent中的source、channel、sink数量可以有多个,它们可相互组合使用,比如一个source可以对应多个channel,一个channel也可对应多个sink。它们的关系在配置文件中配置。后面会举例介绍。

下面接着介绍Agent的这三个核心组件的含义.

(二)Source(源)

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http post(body为json格式),可以让应用程序同已有的Source直接打交道,如AvroSource,  SyslogTcpSource。 如果内置的Source无法满足需要, Flume还支持自定义Source。

(三)Channel(通道)

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直 到Sink处理完该事件。

Flume提供了多种内置的Channel类型,如MemoryChannel和FileChannel,也支持用户自定义Channel。

(四)Sink(接收器)

Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他flume agent的Source。Flume也提供了多种内置的Sink。

三、使用场景

Flume在英文中的意思是水道, 但Flume更像可以随意组装的消防水管,下面根据官方文档,展示几种Flow。

(一)多个agent顺序连接

flume简介及netcat样例

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent 的数量,因为数据流经的路径变长了,出现故障将影响整个Flow上的Agent收集服务。

(二)多个Agent的数据汇聚到同一个Agent

flume简介及netcat样例

这种情况应用的场景比较多,比如要收集Web网站的用户行为日志, Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

(三)多级流

flume简介及netcat样例

Flume还支持多级流,什么多级流?结合在云开发中的应用来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。

(四)load balance功能

flume简介及netcat样例

上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上 ,这样起到负荷分担的效果。

四、快速起步

下面我们通过一个简单的实例来演示如何使用Flume,从而加深对Flume基本概念的理解。

(一)版本安装

从flume官网下载二进制的安装文件apache-flume-1.9.0-bin.tar.gz,解压后得到的目录结构如下:

flume简介及netcat样例

 

 flume的可执行程序(实际是个shell脚本)位于bin目录下,文件名是flume-ng。我们可以运行flume-ng version测试下当前环境是否正常,如:

flume简介及netcat样例

 

 如果能正常执行,显示上面的类似信息,说明当前环境是正常的(比如java环境)。上面测试并不需要修改flume的任何配置文件。

(二)配置文件

启动一个flume agent需要指定一个配置文件,该配置文件用于配置该agent的相关参数信息,如source,channel,sink等设置信息。配置文件是普通的键值对格式的文本文件。在安装目录下的conf目录下有样例文件可参考。

下面我们通过一个简单的例子来说明如何设置agent的配置文件。

每个flume agent都需要指定一个名称,配置文件的前三个参数都是以如下内容开始,如:

a1.sources=<list of sources>
a1.channels=<list of channel>
a1.sinks=<list of sinks>
上面的配置信息中,a1就是要启动的flume agent的名称,a1.sources的值指定该agent的source组件,一个agent可以配置多个source,所以a1.sources的值是个列表;同样a1.channels用于指定该agent的channel信息;a1.sinks用于指定该agent的sink信息。举例:
a1.sources=s1
a1.channels=c1
a1.sinks=k1

上面信息定义了一个名为a1的flume agent,它有一个名为s1的source(源)、一个名为c1的channel(通道),以及一个名为k1的sink(接收器)。

配置了agent的三个核心组件source,channel,sink后(实际只是指定了一个名称),下面就要接着配置这三个组件的具体信息,如类型等。

下面我们先配置source信息,例子如下:

a1.sources.s1.type=netcat
a1.sources.s1.bind=0.0.0.0
a1.sources.s1.port=12345
a1.sources.s1.channels=c1

上面的信息设置源s1的类型(type参数)为netcat,这是flume内置的一种源类型,它的作用是监听来自socket中的数据,需要指定监听的ip地址和端口号,bind参数设置Ip地址,0.0.0.0表示监听任何地址,port参数设置端口号,这里设置的端口号是12345。对每个源,还需要配置其对应的channel,一个源可以对应多个channel,所以属性名是复数channels,我们这里只对应一个channel,即上面配置的通道c1。

我们再配置channel的信息,例子如下:

a1.channels.c1.type = memory

上面配置c1通道为内存通道,通道数据既可以保存在内存中,也可以在文件中,其它采用默认值。

最后配置sink信息,例子如下:

a1.sinks.k1.type = logger
a1.sinks.k1.channel= c1
上面的配置中type指定sink的类型,这里的logger是flume内置的一种sink类型,主要用于调试,它会使用log4j将所有info级别的日志记录下来,这些日志来自于配置好的通道。channel 参数用于指定sink从哪个channel获取数据。一个sink只能从一个channel获取数据。

(三)启动Agent

配置好要启动的flume agent所需要的配置文件后,就可以启动agent了,即启动一个java进程。

假设我们上面的配置文件名称为hello.conf,保存到flume解压目录的conf目录下,该配置文件中完整的内容如下:

a1.sources=s1
a1.channels=c1
a1.sinks=k1
a1.sources.s1.type=netcat
a1.sources.s1.bind=0.0.0.0
a1.sources.s1.port=12345
a1.sources.s1.channels=c1
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sinks.k1.channel= c1

下面我们执行flume-ng脚本来启动agent。命令格式如下:

[root@diagbot01 bin]# ./flume-ng agent -n a1 -c conf -f ../conf/testNG/netcatNG.conf -Dflume.root.logger=INFO,console

上面的命令,各选项的含义如下:

1)flume-ng是可执行脚本,位于bin目录下

2)agent参数表示这是启动一个agent。

3)-n 参数用于指定本agent的名称,这里是a1,这样flume就会根据这个名称到配置文件中获取相关的配置信息。

4)-c 参数用于指定log4j的配置文件所在的目录,这里是conf目录

5)-f参数用于指定flume agent配置文件的名称

6)-Dflume.root.logger参数的作用是覆盖conf/log4j.properties中的信息,让采集的信息在控制台上输出。

执行上面命令后,会打印很多信息,该flume agent会等待socket客户端的连接和数据输入,这时我们利用操作系统的nc命令来往该agent发送信息。我们打开另外一个控制台界面,输入如下信息:

[root@diagbot01 ~]# nc localhost 12345
hello kwz
OK
you are cool
OK

上面我们运行nc命令连接flume,然后输入两行信息。

这时我们切换到运行flume agent的控制台界面,发现控制台上输出了如下信息,这正是nc程序发出的信息,如下:

20/04/24 13:46:23 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 6B 77 7A hello kwz }
20/04/24 13:47:42 INFO sink.LoggerSink: Event: { headers:{} body: 79 6F 75 20 61 72 65 20 63 6F 6F 6C you are cool }

上面只是一个简单的例子,从这个例子中可以看出,使用flume并不是很复杂,在规划好启动多少agent之后。后面的主要工作是设置每个agent的配置信息,最基本的就是source,channel和sink信息。

五、小结

本文是对flume组件的一个简单入门学习,主要是对flume的核心概念进行了介绍,并通过一个简单的例子演示了如何使用flume。

上一篇:Cassandra对读写请求的处理机制


下一篇:netcat 命令学习