Flume 1.6.0
Flume简介
Apache Flume 是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前) 两个版本,NG 在 OG 的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以 NG 为基础。
外部数据源以特定格式向 Flume 发送 events
(事件),当 source
接收到 events
时,它将其存储到一个或多个 channel
,channe
会一直保存 events
直到它被 sink
所消费。sink
的主要功能从 channel
中读取 events
,并将其存入外部存储系统或转发到下一个 source
,成功后再从 channel
中移除 events
。
1、 基本概念
1. Event
Event
是 Flume NG 数据传输的基本单元。类似于 JMS 和消息系统中的消息。一个 Event
由标题和正文组成:前者是键/值映射,后者是任意字节数组。
2. Source
数据收集组件,从外部数据源收集数据,并存储到 Channel 中。
3. Channel
Channel
是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:
-
Memory Channel
: 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机); -
File Channel
: 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。
4. Sink
Sink
的主要功能从 Channel
中读取 Event
,并将其存入外部存储系统或将其转发到下一个 Source
,成功后再从 Channel
中移除 Event
。
5. Agent
是一个独立的 (JVM) 进程,包含 Source
、 Channel
、 Sink
等组件。
2、组件种类
Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:
- Source 类型 :内置了几十种类型,如
Avro Source
,Thrift Source
,Kafka Source
,JMS Source
; - Sink 类型 :
HDFS Sink
,Hive Sink
,HBaseSinks
,Avro Sink
等; - Channel 类型 :
Memory Channel
,JDBC Channel
,Kafka Channel
,File Channel
等。
Flume安装
1、解压、加入环境变量
完成后使用flume-ng version
可以查看版本。
>flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f
2、修改配置文件
进入conf目录,将配置文件复制一份。
cp flume-env.sh.template flume-env.sh
修改其中的JAVA_HOME
和 FLUME_CLASSPATH
。
(不修改也可,会找环境中的jdk)
export JAVA_HOME=/usr/local/jdk
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
# Note that the Flume conf directory is always included in the classpath.
FLUME_CLASSPATH="/usr/local/bigdata/flume"
案例一
监听文件,将数据采集到控制台。
编写配置文件,将/tmp/log.txt作为source。console作为sink.
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
#将sources与channels进行绑定
a1.sources.s1.channels = c1
#配置sink
a1.sinks.k1.type = logger
#将sinks与channels进行绑定
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
执行agent,处理数据。
flume-ng agent --conf conf --conf-file ../file/exec-memory-logger.properties --name a1 -Dflume.root.logger=INFO,console
案例二
监听文件夹,将文件上传至hdfs
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources属性
a1.sources.s1.type =spooldir
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
#将sources与channels进行绑定
a1.sources.s1.channels =c1
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#将sinks与channels进行绑定
a1.sinks.k1.channel = c1
#配置channel类型
a1.channels.c1.type = memory
执行监听。
flume-ng agent \
--conf-file /usr/local/bigdata/file/flume-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console