flume配置简介
配置简介
总的来说:
source用来接收数据
sinks用来发送数据
channel用来缓存数据
以下是一些相关类型,以后将会用到。
1.Source组件类型(用于接收从某个地方发送过来的数据)
Netcat Source
接受来自于数据客户端的请求数据,常用于测试开发
Exec Source
运行一个给定的unix指令,将指令的执行结果作为数据来源
Spooling Directory Source
监视指定目录的新文件,并从出现的新文件中解析事件
Kafka Source
获取来自于Kafka集群中数据
Sequence Generator Source
序列产生器,计数器从0开始每次+1到LONG.MAX_VALUE
Avro Source
接受来自于Avro Client请求数据,类似于Netcat Source
通常用于构建Flume集群和RPC通信数据的手机
2.Channel组件类型(用于缓存数据)
Memory Channel
将Event事件对象缓存到内存中
优点:快
缺点:存在数据丢失风险
JDBC Channel
将Event事件对象保存到DB中,目前只支持Derby
优点:安全
缺点:效率较低
File Channel
将Event事件对象保存到文件中
优点:安全
缺点:效率较低
Kafka Channel
将Event事件写入保存到Kafka集群
优点:高可用,数据备份
3.Sink组件(主要是输出文件,或者输出到其他主机或者发送信息到其他地方)
Logger Sink
以日志的形式输出采集到的数据
HDFS Sink
将采集到的数据最终写出到HDFS分布式文件系统,支持两种文件格式:文本和序列
注意:文件格式DataStream,采集到的数据不会进行序列化处理
每隔十分钟产生一个数据文件目录
File Roll Sink
基于文件滚动的sink输出,将采集到的数据写入保存到本地文件系统
Null Sink
将采集到所有的数据,全部丢弃
HBaseSinks
将采集到的数据写出保存到HBase非关系型数据库
安装flume(这里使用的是1.9版本)
这个文档建议用于1.6版本以上,因为涉及一些方法要自行调试,旧版本会比较麻烦。
1.下载安装前所需要的插件,用来简单测试flume。
yum install -y nc 下载插件
把我们要用的安装包下载
flume官方下载链接
安装包(点击就可以下载)
上传完我们的flume,然后解压,这些步骤不懂自行上网查询。
2.配置环境
export FLUME_HOME=你的flume路径
export PATH=$PATH:$FLUME_HOME/bin
3.启动hadoop这里建议使用完全分布式,这里会涉及多个agent。
4.启动flume文件方法
[root@master conf]# flume-ng agent -n a1 -c ./ -f /example.conf -Dflume.root.logger=INFO,console
或者在flume.env.sh文件中加入
export JAVA_OPTS="-Dflume.root.logger=INFO,console"
各种配置文件
带有文件1,2的注意分开不同机器,注意看一下文档内容了解一下。
1、提示:
目前我这里用的是俩台机器分别都有hadoop3.x,flume1.9
192.168.120.129是我的主机
192.168.120.134是我的副机
2、各种配置方法
在flume目录中的conf目录
1、非持久化保存数据:文件名 example.conf
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink类型为sink
a1.sinks.k1.type = logger
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
新打开一个会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
2、持久化保存数据
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1 c2
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink类型为sink
a1.sinks.k1.type = logger
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /usr/local/src/flume/checkpoint
a1.channels.c2.dataDirs = /usr/local/src/flume/data
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c2
新打开一个会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
3、单个日志监控
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1
#配置soure类型为exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F app.log
#配置sink类型为sink
a1.sinks.k1.type = logger
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
新打开一个会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
4、多个日志监控
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 = /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/src/flume/conf/logs/.*log
a1.sinks.k1.type = logger
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
新打开一个会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
5、多个agent监控
文件一:
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink类型为sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
文件二:
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 = /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/src/flume/conf/logs/.*log
a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555
#配置sink类型为sink
a1.sinks.k1.type = logger
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1
新打开文件1的会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
6、拦截器:
文件1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
#配置sink类型为sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
文件2:
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 = /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/src/flume/conf/logs/.*log
a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555
#配置sink类型为sink
a1.sinks.k1.type = logger
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1
新打开文件1的会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
7、拦截器的使用:
这个看个眼熟
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#添加拦截器
a1.sources.r1.interceptors = i1 i2 i3 i4 i5
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
#自定义拦截器
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = beijing
#添加UUID
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#隐藏文字
a1.sources.r1.interceptors.i5.type = search_replace
a1.sources.r1.interceptors.i5.searchPattern = \\d{6}
a1.sources.r1.interceptors.i5.replaceString = ******
#配置sink类型为sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
新打开会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
8、自定义拦截器
这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是MyHostInterceptor.java
使用的是:
包名+文件名
Mzj.Demo.MyHostInterceptor$Builer
打包后的jar包放在flume/lib目录中
文件1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 = /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 = /usr/local/src/flume/conf/logs/.*log
a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555
#添加自定义拦截器
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type = Mzj.Demo.MyHostInterceptor$Builder
a1.sinks.k1.type = logger
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或>者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source 和 sink 绑定到channel 上
a1.sources.r2.channels = c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#定义agent名称为a1
文件2:
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
##将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
新打开文件2会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
9、管道选择器
这些都放在同一个机器上
agent1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1 c2 c3 c4
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink1类型为Logger
a1.sinks.k1.type = logger
#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 4040
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量>或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100
a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100
#将source和sink绑定到channel上
a1.sources.r1.channels = c1 c2 c3 c4
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4
#通道选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4
#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = US
agent2:
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4040
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
agent3:
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4041
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
agent4:
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4042
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
新打开会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
10、sink故障转移:
agent1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15
a1.sinkgroups.g1.processor.priority.k4 = 20
a1.sinkgroups.g1.processor.maxpenalty = 10000
#配置sink1类型为Logger
a1.sinks.k1.type = logger
#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 4040
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
其他agent(n)文件如配置9的agent(n)文件一样(除了1以外)
新打开文件2会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
11、sink处理器负载均衡
#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#定义组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
#配置sink1类型为Logger
a1.sinks.k1.type = logger
#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 4040
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
其他agent(n)文件如配置9的agent(n)文件一样(除了1以外)
新打开会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
12、导出数据到hdfs
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#node端口 9000/路径 下面的path ,在发送数据的过程中会自动创建文件夹
#配置类型为hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.120.129:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
##将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
新打开会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
13、多个agent上传hdfs
文件1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.120.129
a1.sources.r1.port = 4040
#配置sink1类型为Avro
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.120.129:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
文件2:
#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink2,3,4类型为Avro
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.120.129
a1.sinks.k1.port = 4040
#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
新打开文件2会话窗口,链接flume。nc localhost 44444(端口号)
然后随便输入文字
hadoop fs -cat 文件路径 (这个会在写入的信息会出现在主机)
14、自定义source
这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是:
包名+文件名
使用的是s1java
打包后的jar包放在flume/lib目录中
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = Mzj.Demo.s1
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
自定义sinks
这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是:
包名+文件名
使用的是s2.java
打包后的jar包放在flume/lib目录中
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = Mzj.Demo.s1
a1.sinks.k1.type = Mzj.Demo.s2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
代码测试资源
提取链接
提取码:6666
打包:
双击红色框框
生成的jar包这个拉进flume/lib里面
maven资源简介:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Mzj_baby</groupId>
<artifactId>Flume_Demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
MyHostInterceptor.java:
package Mzj.Demo;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 浩
* 2021/9/15
*/
public class MyHostInterceptor implements Interceptor{
private String name;
private static final Logger logger = LoggerFactory
.getLogger(MyHostInterceptor.class);
@Override
public void initialize() {
this.name = "";
}
@Override
public Event intercept(Event event) {
//对事件做处理 事件包含消息体和头部
//如果host来源是192.168.120.129,对事件做一个抛弃处理
if(event.getHeaders().get("host").equals("192.168.120.134")){
logger.info("消息来源是134,抛弃事件");
return null;
}
Map<String,String> map = new HashMap<String,String>();
map.put("state","CZ");
event.setHeaders(map);
return event;
}
@Override
//处理所有事件
public List<Event> intercept(List<Event> events) {
List<Event> eventList = new ArrayList<Event>();
for (Event event: events){
Event event1 = intercept(event);
if (event1 != null){
eventList.add(event1);
}
}
return eventList;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new MyHostInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
s1.java
package Mzj.Demo;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
/**
* 浩
* 2021/9/20
*/
public class s1 extends AbstractSource implements Configurable, PollableSource {
//处理数据
@Override
public Status process() throws EventDeliveryException {
Status status = null;
try {
//自己模拟数据发送
for (int i = 0; i< 10;i++){
Event event = new SimpleEvent();
event.setBody(("data:"+i).getBytes());
getChannelProcessor().processEvent(event);
//数据准备消费
status = Status.READY;
Thread.sleep(5000);
}
} catch (Exception e) {
e.printStackTrace();
status = Status.BACKOFF;
}
return status;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
@Override
public void configure(Context context) {
}
}
s2.java
package Mzj.Demo;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 浩
* 2021/9/21
*/
public class s2 extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(s2.class);
//处理数据
@Override
public Status process() throws EventDeliveryException {
Status status = null;
//获取sink绑定的Channel
Channel ch = getChannel();
//获取事务
Transaction transaction = ch.getTransaction();
try {
transaction.begin();
//从Channel接收数据
Event event = ch.take();
//可以将数据发送到外部存储
if(event == null){
status = Status.BACKOFF;
}else {
logger.info(new String(event.getBody()));
status = Status.READY;
}
transaction.commit();
}catch (Exception e){
logger.error(e.getMessage());
status = Status.BACKOFF;
}finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
}
}