flume的配置无非就是四步:1、创建一个配置文件 2、在其中配置source,sink,Channel 的各项参数 3、连接各个组件 4、调用启动命令
配置参考官网http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
1、针对NetCat的配置
1.1选用NetCat TCP Source
这个source可以打开一个端口号,监听端口号收到的消息!将消息的每行,封装为一个event!
配置必要的参数
1.2选用Logger Sink
配置必要参数
1.3 选用Memory Channel
将event存储在内存中的队列中!一般适用于高吞吐量的场景,但是如果agent故障,会损失阶段性的数据!
配置必要参数
1.4编写配置文件
# 命名每个组件 a1代表agent的名称 #a1.sources代表a1中配置的source,多个使用空格间隔 #a1.sinks代表a1中配置的sink,多个使用空格间隔 #a1.channels代表a1中配置的channel,多个使用空格间隔 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 44444 # 配置sink a1.sinks.k1.type = logger a1.sinks.k1.maxBytesToLog=100 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 # 绑定和连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1.5启动命令
bin/flume-ng agent -c conf/ -n a1 -f flumeagents/netcatSource-loggersink.conf -Dflume.root.logger=DEBUG,console #参数说明: --conf/-c:表示配置文件存储在conf/目录 --name/-n:表示给agent起名为a1 --conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。 -Dflume.root.logger=INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。
日志级别包括:log、info、warn、error。console是将结果打印到控制台,方便测试
2、针对读取日志文件的配置
2.1.1选用Exec Source(因为在异常情况下,Exec Source无法把从客户端读取的event进行缓存,有丢失数据的风险的,建议使用 Spooling Directory Source, Taildir Source来替换ExecSource!)
Exec Source在启动后执行一个linux命令
配置必要参数
示例:
# 配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -f /opt/module/hive/logs/hive.log
2.1.2 Spooling Directory Source(自动收集文件)
SpoolingDirSource在监控一个目录中新放入的文件的数据,一旦发现,就数据封装为event! 在目录中,已经传输完成的数据,会使用重命名或删除来标识这些文件已经传输完成!
适用于:已经在一个目录中生成了大量的离线日志,且日志不会再进行写入和修改!
必要配置
可选配置
示例
# 配置 spoolsource a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/sun9/work
2.1.3 TailDirSource(实时更新文件)
TailDirSource以接近实时的速度监控文件中写入的新行!
TailDirSource检测文件中写入的新行,并且将每个文件tail的位置记录在一个JSON的文件中!即便agent挂掉,重启后,source从上次记录的位置继续执行tail操作!
用户可以通过修改Position文件的参数,来改变source继续读取的位置!如果postion文件丢失了,那么source会重新从每个文件的第一行开始读取(重复读)!
必须配置
可选配置
示例
# 配置source a1.sources.r1.type = TAILDIR #多个文件用空格分开 a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /home/atguigu/a.txt a1.sources.r1.filegroups.f2 = /home/atguigu/b.txt a1.sources.r1.positionFile=/home/atguigu/taildir_position.json
2.2选用HDFS Sink
HDFS Sink负责将数据写到HDFS。
-
目前支持创建 text和SequnceFile文件!
-
以上两种文件格式,都可以使用压缩
-
文件可以基于时间周期性滚动或基于文件大小滚动或基于events的数量滚动
-
可以根据数据产生的时间戳或主机名对数据进行分桶或分区
-
上传的路径名可以包含 格式化的转义序列,转义序列会在文件/目录真正上传时被替换,如:hdfs://hadoop102:9000/flume/%Y%m%d/%H%M
-
如果要使用这个sink,必须已经安装了hadoop,这样flume才能使用Jar包和hdfs通信
必要配置
推荐配置
文件滚动策略
文件的类型和压缩类型:
目录的滚动策略:
时间戳设置:
注意: 所有和时间相关的转义序列,都要求event的header中有timestamp的属性名,值为时间戳
示例:
# 配置sink a1.sinks.k1.type = hdfs #转义序列 a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H%M #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = logs- #滚动目录 一分钟滚动一次目录 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = minute #是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #配置文件滚动 a1.sinks.k1.hdfs.rollInterval = 30 a1.sinks.k1.hdfs.rollSize = 134217700 a1.sinks.k1.hdfs.rollCount = 0 #使用文件格式存储数据 a1.sinks.k1.hdfs.fileType=DataStream
3、针对多路复用的配置
需求如下,同一个数据,既要上传到HDFS上,也要保存到本地,这就涉及到了多个flume串联的问题
3.1 flume之间传输需选用Avro Sink (Avro Sink和Avro Source是搭配使用的!)
sink将event以avro序列化的格式发送到另外一台机器的指定进程!
必要配置
# 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname=hadoop102 a1.sinks.k1.port=12345
3.2Avro Source
source读取avro格式的数据,反序列化为event对象!
必要配置
示例
# 配置source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 12345
注意:在接收数据的Agent里有俩个Channel,这时还需要配置使用复制的channel选择器,此选择器会选中所有的channel,每个channel复制一个event(可以省略,默认)
a1.sources.r1.selector.type = replicating
3.3写入本地 File Roll Sink
将event写入到本地磁盘!
必要配置
可选配置
示例
# 配置sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory=/home/atguigu/flume a1.sinks.k1.sink.rollInterval=600
四、针对故障转移及负载均衡的配置
实质是一个Channel对应俩个Sink的配置,这里当然也会用到Avro Source ,Avro Sink ,但最关键的是Sink选择器
4.1Failover Sink Processor(故障转移)
故障转移的sink处理器! 这个sink处理器会维护一组有优先级的sink!默认挑选优先级最高(数值越大)的sink来处理数据!故障的sink会放入池中冷却一段时间,恢复后,重新加入到存活的池中,此时在live pool(存活的池)中选优先级最高的,接替工作!
必要配置
可选配置
示例:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
4.2Load balancing Sink Processor(负载均衡)
使用round_robinor
random两种算法,让多个激活的sink间的负载均衡(多个sink轮流干活)!
必要配置
可选配置
示例:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.selector = random