flume监控本地文件并上传至HDFS

1. flume将本地日志按时间读取上传到hdfs上,编辑配置文件  file_wfbmall_log_hdfs.conf,其内容如下

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
// positionFile 检查点文件路径
a1.sources.r1.positionFile = //opt/soft/flume/conf/job/log/taildir_position.json
// 指定filegroups,可以有多个,以空格分隔;(TailSource可以同时监控tail多个目录中的文件)
a1.sources.r1.filegroups = f1
// 指定监控的文件目录f1,匹配的文件信息
a1.sources.r1.filegroups.f1 = /opt/data/wfbmall/16/wfbmall.log   #数据如有新增 则生成新的hdfs文件,本文件配置的为10分钟生成一个新文件
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
#a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/data/logs//%y-%m-%d/
a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/data/logs/
a1.sinks.k1.hdfs.filePrefix = events
#控制文件夹的滚动频率
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10      #10分钟生成一个新的HDFS文件
a1.sinks.k1.hdfs.roundUnit = minute
#控制文件的滚动频率
a1.sinks.k1.hdfs.rollInterval = 300  #按照时间滚动文件,为0时表示不按照时间滚动文件,并不是0秒滚动;
a1.sinks.k1.hdfs.rollSize = 0        #文件大小多大时滚动(单位是字节);
a1.sinks.k1.hdfs.rollCount = 100000  #写入文件的时间数达到多少时滚动
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
#a1.sinks.k1.hdfs.codeC = lzo
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1  

进行调用配置文件,到flume的安装目录下执行,(如写全目录容易出现slf4j的jar包冲突的报错,而导致进程无法正常执行)

[root@master flume]# bin/flume-ng agent --conf conf --conf-file ./conf/job/uenpay/file_wfbmall_log_hdfs.conf --name a1

 flume监控本地文件并上传至HDFS

 

 

 

 

flume监控本地文件并上传至HDFS

上一篇:phptorm 集成git和gitlab和一些命令


下一篇:php站点ajax请求返回数据异常处理的经历