1. flume将本地日志按时间读取上传到hdfs上,编辑配置文件 file_wfbmall_log_hdfs.conf,其内容如下
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = TAILDIR // positionFile 检查点文件路径 a1.sources.r1.positionFile = //opt/soft/flume/conf/job/log/taildir_position.json // 指定filegroups,可以有多个,以空格分隔;(TailSource可以同时监控tail多个目录中的文件) a1.sources.r1.filegroups = f1 // 指定监控的文件目录f1,匹配的文件信息 a1.sources.r1.filegroups.f1 = /opt/data/wfbmall/16/wfbmall.log #数据如有新增 则生成新的hdfs文件,本文件配置的为10分钟生成一个新文件 a1.sources.r1.fileHeader = true a1.sources.ri.maxBatchCount = 1000 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 #a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/data/logs//%y-%m-%d/ a1.sinks.k1.hdfs.path = hdfs://master:9000/flume/data/logs/ a1.sinks.k1.hdfs.filePrefix = events #控制文件夹的滚动频率 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 #10分钟生成一个新的HDFS文件 a1.sinks.k1.hdfs.roundUnit = minute #控制文件的滚动频率 a1.sinks.k1.hdfs.rollInterval = 300 #按照时间滚动文件,为0时表示不按照时间滚动文件,并不是0秒滚动; a1.sinks.k1.hdfs.rollSize = 0 #文件大小多大时滚动(单位是字节); a1.sinks.k1.hdfs.rollCount = 100000 #写入文件的时间数达到多少时滚动 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 #a1.sinks.k1.hdfs.codeC = lzo a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
进行调用配置文件,到flume的安装目录下执行,(如写全目录容易出现slf4j的jar包冲突的报错,而导致进程无法正常执行)
[root@master flume]# bin/flume-ng agent --conf conf --conf-file ./conf/job/uenpay/file_wfbmall_log_hdfs.conf --name a1