Flume文件配置方法且maven

flume配置简介

配置简介

总的来说:

source用来接收数据
sinks用来发送数据
channel用来缓存数据

以下是一些相关类型,以后将会用到。
1.Source组件类型(用于接收从某个地方发送过来的数据)

Netcat Source
接受来自于数据客户端的请求数据,常用于测试开发

Exec Source
运行一个给定的unix指令,将指令的执行结果作为数据来源

Spooling Directory Source
监视指定目录的新文件,并从出现的新文件中解析事件

Kafka Source
获取来自于Kafka集群中数据

Sequence Generator Source
序列产生器,计数器从0开始每次+1到LONG.MAX_VALUE

Avro Source
接受来自于Avro Client请求数据,类似于Netcat Source
通常用于构建Flume集群和RPC通信数据的手机

2.Channel组件类型(用于缓存数据)
Memory Channel
将Event事件对象缓存到内存中
优点:快
缺点:存在数据丢失风险

JDBC Channel
将Event事件对象保存到DB中,目前只支持Derby
优点:安全
缺点:效率较低

File Channel
将Event事件对象保存到文件中
优点:安全
缺点:效率较低

Kafka Channel
将Event事件写入保存到Kafka集群
优点:高可用,数据备份

3.Sink组件(主要是输出文件,或者输出到其他主机或者发送信息到其他地方)
Logger Sink
以日志的形式输出采集到的数据

HDFS Sink
将采集到的数据最终写出到HDFS分布式文件系统,支持两种文件格式:文本和序列
注意:文件格式DataStream,采集到的数据不会进行序列化处理
每隔十分钟产生一个数据文件目录

File Roll Sink
基于文件滚动的sink输出,将采集到的数据写入保存到本地文件系统

Null Sink
将采集到所有的数据,全部丢弃

HBaseSinks
将采集到的数据写出保存到HBase非关系型数据库

安装flume(这里使用的是1.9版本)

这个文档建议用于1.6版本以上,因为涉及一些方法要自行调试,旧版本会比较麻烦。

1.下载安装前所需要的插件,用来简单测试flume。

yum install -y nc  下载插件

把我们要用的安装包下载
flume官方下载链接
安装包(点击就可以下载)

上传完我们的flume,然后解压,这些步骤不懂自行上网查询。
2.配置环境

export FLUME_HOME=你的flume路径
export PATH=$PATH:$FLUME_HOME/bin

Flume文件配置方法且maven
3.启动hadoop这里建议使用完全分布式,这里会涉及多个agent。
4.启动flume文件方法

[root@master conf]# flume-ng agent -n a1 -c  ./ -f /example.conf -Dflume.root.logger=INFO,console

或者在flume.env.sh文件中加入

export JAVA_OPTS="-Dflume.root.logger=INFO,console"

各种配置文件

带有文件1,2的注意分开不同机器,注意看一下文档内容了解一下。

1、提示:

目前我这里用的是俩台机器分别都有hadoop3.x,flume1.9
192.168.120.129是我的主机
192.168.120.134是我的副机

2、各种配置方法

在flume目录中的conf目录

1、非持久化保存数据:文件名 example.conf

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

2、持久化保存数据

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1 c2
#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /usr/local/src/flume/checkpoint
a1.channels.c2.dataDirs = /usr/local/src/flume/data

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c2

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

3、单个日志监控

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

#配置soure类型为exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F app.log

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

4、多个日志监控

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开一个会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

5、多个agent监控

文件一:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink类型为sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

文件二:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1

新打开文件1的会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

6、拦截器:

文件1:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

#配置sink类型为sink
a1.sinks.k1.type = logger

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1

文件2:

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555

#配置sink类型为sink
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1

新打开文件1的会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

7、拦截器的使用:

这个看个眼熟

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#添加拦截器
a1.sources.r1.interceptors = i1 i2 i3 i4 i5
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp

#自定义拦截器
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = beijing

#添加UUID
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

#隐藏文字
a1.sources.r1.interceptors.i5.type = search_replace
a1.sources.r1.interceptors.i5.searchPattern = \\d{6}
a1.sources.r1.interceptors.i5.replaceString = ******

#配置sink类型为sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory

#将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1

新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

8、自定义拦截器

这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是MyHostInterceptor.java
使用的是:

包名+文件名
Mzj.Demo.MyHostInterceptor$Builer

打包后的jar包放在flume/lib目录

文件1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1 r2
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.positionFile =/usr/local/src/flume/conf/position.json
a1.sources.r1.filegroups.f1 =  /usr/local/src/flume/conf/app.log
a1.sources.r1.filegroups.f2 =  /usr/local/src/flume/conf/logs/.*log

a1.sources.r2.type = avro
a1.sources.r2.bind = 192.168.120.129
a1.sources.r2.port = 55555 
            
#添加自定义拦截器 
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type =  Mzj.Demo.MyHostInterceptor$Builder
           
a1.sinks.k1.type = logger

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或>者发送给sink的Eveents数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 
    
#将source 和 sink 绑定到channel 上
a1.sources.r2.channels = c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#定义agent名称为a1

文件2:

#设置3个组件的名称
a1.sources =r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

a1.sinks.k1.type = logger

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 55555

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

##将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

新打开文件2会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

9、管道选择器

这些都放在同一个机器上
agent1:

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1 c2 c3 c4

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink1类型为Logger
a1.sinks.k1.type = logger

#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 4040

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041

a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量>或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100

a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1 c2 c3 c4
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4

#通道选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4

#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = US

agent2:

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4040

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

agent3:

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4041

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

agent4:

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.120.129
a2.sources.r1.port = 4042

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapactity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

10、sink故障转移:

agent1:
#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15
a1.sinkgroups.g1.processor.priority.k4 = 20
a1.sinkgroups.g1.processor.maxpenalty = 10000

#配置sink1类型为Logger
a1.sinks.k1.type = logger

#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129 
a1.sinks.k2.port = 4040
    
a1.sinks.k3.type = avro 
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041  
    
a1.sinks.k4.type = avro 
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1

其他agent(n)文件如配置9agent(n)文件一样(除了1以外)

新打开文件2会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

11、sink处理器负载均衡

Flume文件配置方法且maven

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#定义组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

#配置sink1类型为Logger
a1.sinks.k1.type = logger

#配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.120.129
a1.sinks.k2.port = 4040

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.120.129
a1.sinks.k3.port = 4041

a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.120.129
a1.sinks.k4.port = 4042

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1

其他agent(n)文件如配置9agent(n)文件一样(除了1以外)
新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

12、导出数据到hdfs

Flume文件配置方法且maven

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#node端口 9000/路径   下面的path  ,在发送数据的过程中会自动创建文件夹
#配置类型为hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.120.129:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

##将source 和 sink 绑定到channel 上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

13、多个agent上传hdfs

文件1:

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.120.129
a1.sources.r1.port = 4040

#配置sink1类型为Avro
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.120.129:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

文件2:

#定义agent名称为a1
#设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#配置sink2,3,4类型为Avro
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.120.129
a1.sinks.k1.port = 4040

#配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

新打开文件2会话窗口,链接flume。
nc localhost 44444(端口号)
然后随便输入文字

hadoop fs -cat 文件路径 (这个会在写入的信息会出现在主机)

14、自定义source

这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是:

包名+文件名

使用的是s1java
打包后的jar包放在flume/lib目录

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = Mzj.Demo.s1

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义sinks

这里要使用到maven
相关文件放在 代码测试资源 目录链接里面
使用的是:

包名+文件名

使用的是s2.java
打包后的jar包放在flume/lib目录

#定义agent名称为a1
#设置3个组件的名称
a1.sources =r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = Mzj.Demo.s1
a1.sinks.k1.type = Mzj.Demo.s2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

代码测试资源

提取链接
提取码:6666
打包:
双击红色框框
Flume文件配置方法且maven

生成的jar包这个拉进flume/lib里面
Flume文件配置方法且maven
maven资源简介:
pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>Mzj_baby</groupId>
    <artifactId>Flume_Demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

MyHostInterceptor.java:

package Mzj.Demo;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 浩
 * 2021/9/15
 */
public class MyHostInterceptor implements Interceptor{
    private  String name;


    private static final Logger logger = LoggerFactory
            .getLogger(MyHostInterceptor.class);
    @Override
    public void initialize() {
        this.name = "";
    }

    @Override
    public Event intercept(Event event) {
        //对事件做处理 事件包含消息体和头部
        //如果host来源是192.168.120.129,对事件做一个抛弃处理
        if(event.getHeaders().get("host").equals("192.168.120.134")){
            logger.info("消息来源是134,抛弃事件");
            return null;
        }

        Map<String,String> map = new HashMap<String,String>();
        map.put("state","CZ");
        event.setHeaders(map);

        return event;
    }

    @Override
    //处理所有事件
    public List<Event> intercept(List<Event> events) {
        List<Event> eventList = new ArrayList<Event>();
        for (Event event: events){
            Event event1 = intercept(event);
            if (event1 != null){
                eventList.add(event1);
            }

        }
        return eventList;
    }

    @Override
    public void close() {

    }



    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return  new MyHostInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

s1.java

package Mzj.Demo;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
/**
 * 浩
 * 2021/9/20
 */
public class s1 extends AbstractSource implements Configurable, PollableSource {
    //处理数据
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        try {
            //自己模拟数据发送
            for (int i = 0; i< 10;i++){
                Event event = new SimpleEvent();
                event.setBody(("data:"+i).getBytes());
                getChannelProcessor().processEvent(event);
                //数据准备消费
                status = Status.READY;
                Thread.sleep(5000);
            }
        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;
        }
        return status;
    }
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
    @Override
    public void configure(Context context) {
    }
}

s2.java

package Mzj.Demo;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 浩
 * 2021/9/21
 */
public class s2 extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory
            .getLogger(s2.class);
    //处理数据
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        //获取sink绑定的Channel
        Channel ch = getChannel();
        //获取事务
        Transaction transaction = ch.getTransaction();
        try {
            transaction.begin();
            //从Channel接收数据
            Event event = ch.take();
            //可以将数据发送到外部存储
            if(event == null){
                status = Status.BACKOFF;
            }else {
                logger.info(new String(event.getBody()));
                status = Status.READY;
            }
        transaction.commit();
        }catch (Exception e){
            logger.error(e.getMessage());
            status = Status.BACKOFF;
        }finally {
            transaction.close();
        }
        return status;
    }
    @Override
    public void configure(Context context) {
    }
}

上一篇:理解flume hdfs sink round 和roll


下一篇:大数据项目实战---电商埋点日志分析(第一部分,往hdfs中写入埋点日志的数据)