flume日志采集框架使用
本次学习使用的全部过程均不在集群上,均在本机环境,供学习参考
先决条件:
flume-ng-1.6.0-cdh5.8.3.tar 去cloudrea下载flume框架,笔者是用cdh5.8.3的套餐
flume的使用环境:
- 采集特定目录到hdfs环境以供分析离线数据
- 监听特定端口的socket流数据
本次将以上两种情况的使用加以记录供以后参考
- 解压 flume-ng-1.6.0-cdh5.8.3.tar
- mv flume-ng-1.6.0-cdh5.8.3 flume
- 准备运行配置文件
//socket流采集 netcat-logger.conf
从网络端口接收数据,下沉到logger 采集配置文件,netcat-logger.conf # example.conf: A single-node Flume configuration # Name the components on this agent
#给那三个组件取个名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
#类型, 从网络端口接收数据,在本机启动, 所以localhost, type=spoolDir采集目录源,目录里有就采
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
#下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:
#capacity:默认该通道中最大的可以存储的event数量
#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 启动命令:
#告诉flum启动一个agent,指定配置参数, --name:agent的名字,
$ bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console 传入数据:
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK//spooldir配置文件实例 spooldir-hdfs.conf
监视文件夹
启动命令:
bin/flume-ng agent -c ./conf -f ./conf/spooldir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console 测试: 往/Users/willian/Public/flume放文件(mv ././xxxFile /Users/willian/Pulic/flume),但是不要在里面生成文件 ############## # Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
#监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /Users/willian/Public/flume
a1.sources.r1.fileHeader = true # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
可以看到 完成了采集会出现complete后缀
注意事项
- 不能出现重名的文件,不然会报错