flume介绍

flume

1.flume是什么

Flume:
    ** Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、传输、聚合的系统。
    ** Flume仅仅运行在linux环境下
    ** flume.apache.org(Documentation--Flume User Guide)
Flume体系结构(Architecture):
Source: 用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel
Channel:连接 source 和 sink的数据传输通道
Sink:     从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase

2.flume安装

----flume安装-----------------------------

1、解压(建议安装到cdh目录里)

2、改名,并修改flume-env.sh
$ mv flume-env.sh.template flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.7.0_67

3、使用flume-ng命令
$ bin/flume-ng 
--conf         指定配置目录
--name         指定Agent的名称
--conf-file    指定具体的配置文件

3.案例:

需求:使用flume监控某个端口,把从端口写入的数据输出为logger

1、复制
$ cp -a flume-conf.properties.template flume-telnet.conf

2、修改flume-telnet.conf

# Name the components on this agent
# a1为代理(中介)实例名,任意命名,agent分三部分
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# netcat是用于调试和检查网络的工具包,windows和linux(redhat)均可用,需要安装
a1.sources.r1.type = netcat    
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
# 可以在文档Flume Sinks--Logger Sink部分查找
# 往日志文件里面写
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 内存channel
a1.channels.c1.type = memory
# channel里存放的最大event数
a1.channels.c1.capacity = 1000
# 每个事务支持的最大event数
a1.channels.c1.transactionCapacity = 100

# 绑定source和sink到channel
# 注意:这里有's'
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

*** 配置文件的使用:
a) 命名
b) 配置source、sink、channel
c) 关联

---------------------

测试:
*** 安装telnet
$ su -
# yum -y install telnet

*** 启动flume,'-D'设置日志级别和输出源
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger=INFO,console    //把日志结果输出到控制台

*** 打开另外一个窗口
$ netstat -an|grep 44444    --检查是否有程序(flume)在监听44444端口
$ telnet localhost 44444    --连接本机的44444端口,telnet是访问这个端口的客户端
然后随意输入字符串...

PS:
a) 退出telnet:'ctrl+]',然后输入quit。
b) 若flume-ng无法退出,则打开一个新的窗口,jps(或netstat -antp|grep 44444)查找pid,使用 kill -9
    
需求:实时抽取新生成的日志文件内容 -->  追加到HDFS上对应文件的末尾
      本例使用flume去监控某个文件,将新增添的内容抽取到其他地方,如HDFS
      本例监控的是apache的日志文件 /var/log/httpd/access_log

----安装Apache服务器-------

$ su -
# yum -y install httpd
# service httpd start
# service httpd status
** 编辑主页,/var/www/html是Apache web服务器根目录
# vi /var/www/html/index.html
随意输入内容...
** 打开浏览器,http://192.168.2.200访问网页

** 授权
# chmod 755 /var/log/httpd/

** 动态监看日志变化,刷新页面可以触发日志生成
# su - tom
$ tail -f /var/log/httpd/access_log    --'-F'和'-f'效果相同

----------------------------

$ cp -a flume-telnet.conf flume-apache.conf 

a2.sources = r2
a2.channels = c2
a2.sinks = k2

# define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /var/log/httpd/access_log
# '-c'表示命令行,必需写
a2.sources.r2.shell = /bin/bash -c

# define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# define sinks
#启用设置多级目录,这里按"年月日/时"2级目录,每1小时生成一个文件夹
a2.sinks.k2.type = hdfs
#目录会自动生成
a2.sinks.k2.hdfs.path=hdfs://192.168.2.200:8020/flume/%Y%m%d/%H
# 文件前缀
a2.sinks.k2.hdfs.filePrefix = accesslog
#启用按时间生成文件夹
a2.sinks.k2.hdfs.round=true
#设置round值:1,单位:小时  
a2.sinks.k2.hdfs.roundValue=1
a2.sinks.k2.hdfs.roundUnit=hour
#使用本地时间戳,如:用来命名文件
a2.sinks.k2.hdfs.useLocalTimeStamp=true

# 缓冲到hdfs之前,用以写文件的事件的最大数
a2.sinks.k2.hdfs.batchSize=1000
a2.sinks.k2.hdfs.fileType=DataStream
a2.sinks.k2.hdfs.writeFormat=Text

#解决文件过多过小的问题(若是使用默认配置,会生成很多个小文件)
#每600秒生成一个文件
a2.sinks.k2.hdfs.rollInterval=600
#当文件达到128000000字节时,会创建一个新文件
#实际环境中如果一个文件块128M,那么这里一般设置成127M(127*1024*1024)
a2.sinks.k2.hdfs.rollSize=128000000
#设置文件的生成和events数无关
a2.sinks.k2.hdfs.rollCount=0
#需要设置为1,否则当有副本复制时,就重新生成文件,上面三条则会失效
a2.sinks.k2.hdfs.minBlockReplicas=1

# bind the sources and sinks to the channels
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

测试:
a) 启动CDH Hadoop
$ sbin/start-dfs.sh ; sbin/start-yarn.sh ; sbin/mr-jobhistory-daemon.sh start historyserver
b) 启动Apache
# service httpd start
c) 启动flume
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file conf/flume-apache.conf
d) 刷新http://192.168.2.200
   监看web日志:$ tail -f /var/log/httpd/access_log 
   监看HDFS:   $ bin/hdfs dfs -tail -f /flume/20170519/10/accesslog.1495161507253.tmp
利用flume监控某个目录(/home/tom/log),把里面回滚好的文件实时抽取到HDFS平台。

$ mkdir /home/hadoop/log
$ cd log
$ cp /var/log/httpd/access_log access_log.1
$ cp /var/log/httpd/access_log access_log.2
需求:抽取文件access_log.1和access_log.2

$ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint
$ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkdata

$ cp -a flume-apache.conf  flume-dir.conf

a3.sources = r3
a3.channels = c3
a3.sinks = k3

# define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /home/tom/log
# 使用正则表达式指定忽略的文件
# '.'表示除了'\r\n'以外的任意字符,'*'表示0-n个
a3.sources.r3.ignorePattern = ^.*\_log$

# define channels
# 通过临时文件进行转存(即把数据缓存到一个临时文件中,然后一起flush),速度慢,但数据相对安全
# 这里使用memory channel也可以
a3.channels.c3.type = file
# checkpoint文件存放的地方,checkpoint里存储着元数据信息,比如哪些文件被抽取过,哪些还没有...
a3.channels.c3.checkpointDir = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint
# 临时文件存放的地方
a3.channels.c3.dataDirs = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkdata

# define sinks
#启用设置多级目录,这里按"年月日/时"2级目录,每1小时生成一个文件夹
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path=hdfs://192.168.122.128:8020/flume2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = accesslog
#启用按时间生成文件夹
a3.sinks.k3.hdfs.round=true
a3.sinks.k3.hdfs.roundValue=1
a3.sinks.k3.hdfs.roundUnit=hour
#使用本地时间戳  
a3.sinks.k3.hdfs.useLocalTimeStamp=true

a3.sinks.k3.hdfs.batchSize=1000
a3.sinks.k3.hdfs.fileType=DataStream
a3.sinks.k3.hdfs.writeFormat=Text

#解决文件过多过小问题
#每600秒生成一个文件
a3.sinks.k3.hdfs.rollInterval=600
a3.sinks.k3.hdfs.rollSize=128000000
#设置文件的生成和events数无关
a3.sinks.k3.hdfs.rollCount=0
#设置成1,否则当有副本复制时就重新生成文件,上面三条则会失去效果
a3.sinks.k3.hdfs.minBlockReplicas=1

# bind the sources and sinks to the channels
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

测试:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file conf/flume-dir.conf
去http://192.168.2.200:50070查看即可
** 进入log/,可以看到,带后缀的表示抽取完成
$ ls
access_log.1.COMPLETED  access_log.2.COMPLETED

再次生成一个日志文件,会发现其会被立即抽取
$ cp access_log.1.COMPLETED access_log.3
$ ls
access_log.1.COMPLETED  access_log.3.COMPLETED    access_log.2.COMPLETED
在同一个服务器启动三个agent:
agent1:用于实时监控/var/log/httpd/access_log

** flume-apache.conf

# 配置agent1
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1

# define sources
agent1.sources.r1.type = exec
# 注意:执行flume命令的用户对/var/log/httpd/access_log文件一定要有可读权限
agent1.sources.r1.command = tail -F /var/log/httpd/access_log
agent1.sources.r1.shell = /bin/bash -c

# define channels
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

# define sinks
# 一种序列号技术
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = 192.168.2.200
agent1.sinks.k1.port = 4545

# bind the sources and sinks to the channels
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1

测试:
启动Apache

启动agent1:
$ bin/flume-ng agent --conf conf/ --name agent1 --conf-file conf/flume-apache.conf
$ tail -F /var/log/httpd/access_log
刷新网页,查看变化

------------------

agent2:用于实时监控/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log
$ mkdir logs
$ vi conf/hive-log4j.properties
hive.log.dir=/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs

** flume-hive.conf

# 配置agent2
agent2.sources = r2
agent2.channels = c2
agent2.sinks = k2

# define sources
agent2.sources.r2.type = exec
agent2.sources.r2.command = tail -F /opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log
agent2.sources.r2.shell = /bin/bash -c

# define channels
agent2.channels.c2.type = memory
agent2.channels.c2.capacity = 1000
agent2.channels.c2.transactionCapacity = 100

# define sinks
agent2.sinks.k2.type = avro
agent2.sinks.k2.hostname = 192.168.2.200
agent2.sinks.k2.port = 4545

# bind the sources and sinks to the channels
agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2

测试:
启动agent2:
$ bin/flume-ng agent --conf conf/ --name agent2 --conf-file conf/flume-hive.conf
$ tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
进入hive,随便执行几条语句,查看日志变化
hive> show databases;
...

-------------------

agent3:用于实时监控收集agent1和agent2传递过来的数据

** flume-collector.conf

# 配置agent3
agent3.sources = r3
agent3.channels = c3
agent3.sinks = k3

# define sources
agent3.sources.r3.type = avro
agent3.sources.r3.bind = 192.168.2.200
agent3.sources.r3.port = 4545

# define channels
agent3.channels.c3.type = memory
agent3.channels.c3.capacity = 1000
agent3.channels.c3.transactionCapacity = 100

# define sinks
# 启用设置多级目录,这里按"年月日"时 2级目录,每个小时生成一个文件夹
agent3.sinks.k3.type = hdfs
agent3.sinks.k3.hdfs.path=hdfs://192.168.2.200:8020/flume3/%Y%m%d/%H
agent3.sinks.k3.hdfs.filePrefix = accesslog

# 启用按小时生成文件夹
agent3.sinks.k3.hdfs.round=true 
agent3.sinks.k3.hdfs.roundValue=1
agent3.sinks.k3.hdfs.roundUnit=hour  
agent3.sinks.k3.hdfs.useLocalTimeStamp=true

agent3.sinks.k3.hdfs.batchSize=1000
agent3.sinks.k3.hdfs.fileType=DataStream
agent3.sinks.k3.hdfs.writeFormat=Text

# 解决文件过多过小的问题
# 每600秒生成一个文件
agent3.sinks.k3.hdfs.rollInterval=600
agent3.sinks.k3.hdfs.rollSize=128000000
# 设置文件的生成和events数无关
agent3.sinks.k3.hdfs.rollCount=0
# 设置成1,否则当有副本复制时就会重新生成文件,上面三条则会失效
agent3.sinks.k3.hdfs.minBlockReplicas=1

# bind the sources and sinks to the channels
agent3.sources.r3.channels = c3
agent3.sinks.k3.channel = c3


测试:
启动agent3:
$ bin/flume-ng agent --conf conf/ --name agent3 --conf-file conf/flume-collector.conf
进入CDH Hadoop,监控日志变化,注意:路径要修改(监控.temp文件效果会明显点)
$ bin/hdfs dfs -tail -f /flume3/20161220/11/accesslog.1482203839459

 

上一篇:Flume


下一篇:新闻网大数据实时分析可视化系统项目——8、Flume数据采集准备