一、Logger Sink
记录指定级别(比如INFO,DEBUG,ERROR等)的日志,通常用于调试
要求,在 --conf参数指定的目录下有log4j的配置文件
根据设计,logger sink将体内容限制为16字节,从而避免屏幕充斥着过多的内容。如果想要查看调试的完整内容,那么你应该使用其他的sink,也许可以使用file_roll sink,它会将日志写到本地文件系统中。
可配置项说明
配置项 | 说明 |
channel | |
type | logger |
补充说明 | 要求必须在 --conf 参数指定的目录下有 log4j的配置文件可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数 |
配置示例:
#配置Agent a1 的组件
a1.sources=r1
a1.channels=c1
a1.sinks=s1
#描述/配置a1的r1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44444
#描述a1的s1
a1.sinks.s1.type=logger
#描述a1的c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#为channel 绑定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
二、File Roll Sink
在本地系统中存储事件。
每隔指定时长生成文件保存这段时间内收集到的日志信息。
可配置参数说明
配置项 | 说明 |
channel | |
type | file_roll |
sink.directory | 文件被存储的目录 |
sink.rollInterva | 30 记录日志到文件里,每隔30秒生成一个新日志文件。如果设置为0,则禁止滚动,从而导致所有数据被写入到一个文件中。 |
配置示例:
#配置Agent a1 的组件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
#描述/配置a1的source1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
#描述sink
a1.sinks.s1.type=file_roll
a1.sinks.s1.sink.directory=/home/work/rolldata
a1.sinks.s1.sink.rollInterval=60 ##每隔60s生成一个文件
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#位channel 绑定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
三、HDFS Sink
此Sink将事件写入到Hadoop分布式文件系统HDFS中。
目前它支持创建文本文件和序列化文件。
对这两种格式都支持压缩。
这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。
它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作
It also buckets/partitions data by attributes like timestamp or machine where the event originated.
HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名。
使用这个Sink要求haddop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信。
配置项 |
说明 |
channel |
绑定的通道 |
type |
hdfs |
hdfs.path |
HDFS 目录路径 (hdfs://namenode/flume/webdata/) |
hdfs.inUseSuffix |
.tmp Flume正在处理的文件所加的后缀 |
hdfs.rollInterval |
30 Number of seconds to wait before |
hdfs.rollSize |
1024 File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount |
10 Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.fileType |
SequenceFile File format: currently SequenceFile, DataStream or CompressedStream |
配置文件:hdfssource.conf
#配置Agent a1 的组件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
#描述/配置a1的source1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
#描述sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop101:9000/flume
a1.sinks.s1.hdfs.fileType=DataStream
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#位channel 绑定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
bin目录下执行:./flume-ng agent -n a1 -c ../conf/ -f ../data/hdfssource.conf -Dflume.root.logger=INFO,console
新建ssh窗口执行:net hadoop101 8888
扩展:如果HDFS中NameNode挂了,产生了切换,Flume sink如何切换?
四、Avro Sink
是实现多级流动、扇出流(1到多) 扇入流(多到1) 的基础。
可配置项说明
配置项 |
说明 |
channel |
|
type |
avro |
hostname |
The hostname or IP address to bind to |
port |
The port # to listen on |
多级流动:
配置文件:multi.conf(hadoop101上)
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
配置文件:multi.conf(hadoop102上)
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
配置文件:multi.conf(hadoop103上)
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinksk1.channel = c1
启动时,倒序启动,先启动hadoop103上的flume,再启动hadoop102上的,最后启动hadoop101上的
bin目录下执行:./flume-ng agent -n a1 -c ../conf/ -f ../data/multi.conf -Dflume.root.logger=INFO,console
在hadoop101上,新建ssh窗口 ,执行:nc hadoop101 8090
扇入:shanru.conf
hadoop101上的配置
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
hadoop102上的配置
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 8090
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
hadoop103上的配置
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动时,倒序启动,先启动hadoop103上的flume,hadoop101和hadoop102不分顺序
bin目录下执行:./flume-ng agent -n a1 -c ../conf/ -f ../data/shanru.conf -Dflume.root.logger=INFO,console
在hadoop101上,新建ssh窗口 ,执行:nc hadoop101 8090
扇出:shanchu.conf
hadoop101上的配置
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1,c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
hadoop102上的配置
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
hadoop103上的配置
a1.sources = s1
a1.channels c1
a1.sinks = k1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动时,倒序启动,先启动hadoop103和hadoop102的flume,再启动hadoop101上的flume
bin目录下执行:./flume-ng agent -n a1 -c ../conf/ -f ../data/shanchu.conf -Dflume.root.logger=INFO,console
在hadoop101上,新建ssh窗口 ,执行:nc hadoop101 8090
路由模式:multiplexing.conf,在扇出的结构下进行修改
在这种模式下,用户可以指定转发的规则。selector根据规则进行数据的分发
可配置参数
配置项 |
说明 |
selector.type |
multiplexing 表示路由模式 |
selector.header |
指定要监测的头的名称 |
selector.mapping.* |
匹配规则 |
selector.default |
如果未满足匹配规则,则默认发往指定的通道 |
在hadoop101上的配置如下,hadoop102和hadoop103和扇出的配置一样
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.s1.type = http
a1.sources.s1.port = 8090
a1.sources.s1.selector.type = multiplexing
a1.sources.s1.selector.header = class
a1.sources.s1.selector.mapping.big1902 = c1
a1.sources.s1.selector.mapping.big1903 = c2
a1.sources.r1.selector.default = c2 #若class不为big1902或1903,则默认到c2,到hadoop103上
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 8090
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 8090
a1.sources.s1.channels = c1,c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
启动时,倒序启动,先启动hadoop103和hadoop102的flume,再启动hadoop101上的flume
bin目录下执行:./flume-ng agent -n a1 -c ../conf/ -f ../data/multiplexing.conf -Dflume.root.logger=INFO,console
在hadoop101上,新建ssh窗口 ,执行:
curl -X POST -d ‘[{"headers":{"class":"big1902"},"body":"This is 1902"}]‘ http://0.0.0.0:8090 -- 会发送到hadoop102,c1
curl -X POST -d ‘[{"headers":{"class":"big1903"},"body":"This is 1903"}]‘ http://0.0.0.0:8090 -- 会发送到hadoop103,c2
curl -X POST -d ‘[{"headers":{"class":"big1900"},"body":"This is 1900"}]‘ http://0.0.0.0:8090 -- 会发送到hadoop103,默认c2
Interceptor:timein.conf
1.hadoop101上的配置:type为timestamp类型
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8090
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.s1.type=logger
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
bin目录下执行:./flume-ng agent -n a1 -c ../conf/ -f ../data/timein.conf -Dflume.root.logger=INFO,console
2.将数据按天存放到hdfs上,并按天存放:
配置如下:
#agent a1 的组件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
#描述/配置a1的source1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
#描述sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop101:9000/flume/time=%Y-%m-%D
a1.sinks.s1.hdfs.fileType=DataStream
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#位channel 绑定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
3.search_replace - 搜索替换
searchin.conf配置如下:将0~9的数字配置为*
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = http
a1.sources.s1.port = 8090
a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type=search_replace
a1.sources.s1.interceptors.i1.searchPattern=[0-9]
a1.sources.s1.interceptors.i1.replaceString= *
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
正则过滤:regex_filter
regexin.conf配置如下:
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = http
a1.sources.s1.port = 8090
a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type=regex_filter
a1.sources.s1.interceptors.i1.regex=.*[0-9].*
a1.sources.s1.interceptors.i1.replaceString= *
a1.sources.s1.interceptors.i1.excludeEvents=true #true:匹配到的过滤,false:没有匹配到的过滤,默认false
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
五、Failover Sink Processor
1.维护一个sink的优先表,确保只要一个是可用的就可以被处理
2.失败处理原来是,为失效的sink指定一个冷却时间,在冷却时间到达后再重新使用
3.sink可用被配置一个优先级,数字越大优先级越高
4.如果sink发送事件失败,则下一个最高优先级的sink将会尝试接着发送事件
5.如果没有指定优先级,则优先级顺序取决于sink的配置顺序,先配置的默认优先级高于后配置的
6.在配置过程中,设置一个group processor,并且为每个sink都指定一个优先级
7.优先级必须是唯一的
8.可以设置maxpenalty属性指定限定失败时间
六、Load Balancing Sink Processor
1.提供了多个sink之间实现负载均衡的能力
2.维护了一个活动sink的索引列表
3.支持轮询或随机方式的负载均衡,默认是轮询方式,可以通过配置指定
4.可以通过实现AbstractSinkSelector接口实现自定义的选择机制