flume入门
1.安装与配置
下载地址:http://archive.apache.org/dist/flume/
安装:
(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(3)修改apache-flume-1.9.0-bin的名称为flume
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
rm /opt/module/flume/lib/guava-11.0.2.jar
配置环境变量:
vim /etc/profile.d/my_env.sh
将flume的环境变量导进去
export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin
测试:
输入flume-ng 若出现语法提示则成功
2.基础架构
3.事务
4.agent的内部原理
5.基础案例
1.实时监控单个文件,并上传到HDFS中 hdfs
A.案例需求:实时监控单个文件,并上传到HDFS中 hdfs sink 为常用sink
- 确定 sources sink channel 这三个组件要使用什么类型
exec hdfs memory- 到官方文档看需要填什么参数
#name
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#sources 定义
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt
#channel 定义
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#sinks 定义
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.使用Flume监听整个目录的文件,并上传至HDFS
B.案例需求:使用Flume监听整个目录的文件,并上传至HDFS (自有当有新文件时才会上传)
#name
a1.sources = r1
a1.channels = c1
a1.sink1 = k1
#source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/jobs/spooldir
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.fileHeader = true
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
3.使用Flume监听整个目录的实时追加文件,并上传至HDFS
C.使用Flume监听整个目录的实时追加文件,并上传至HDFS (目录需要分组,只有是组内的文件追加内容时才会上传)(分组一般用正则表达式识别)
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.复制 replicating
D.案例需求: 复制
使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memory avro
flume2 avro memory hdfs
flume3 avro memory file_roll
flume3.conf
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#sources
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#channels
a3.channels.c1.type =memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#sinks
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll
#bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume2.conf
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
#channels
a2.channels.c1.type =memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#sinks
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
#bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume1
#name
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#sources
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
#ChannelSelector
a1.sources.r1.selector.type = replicating
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
#sinks
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
启动3:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console
启动2:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console
启动1:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
5.负载均衡 load_balance
案例需求:负载均衡案例
使用Flume-1监控文件变动,Flume-1将变动内容(轮询或者随机传递给)传递给Flume-2,flume-3 Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memroy avro
flume2 avro memroy hdfs
flume3 avro memroy file_roll
flume3.conf
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#sources
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#channels
a3.channels.c1.type =memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#sinks
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll
#bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume2.conf
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
#channels
a2.channels.c1.type =memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#sinks
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
#bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume1.conf
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#sources
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Sink Processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
启动3:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console
启动2:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console
启动1:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console
7.故障转移 failover
案例需求:故障转移
使用Flume-1监控端口,Flume-1将端口数据传递给Flume-2(为active状态),当flume2出现故障时,将数据传递给flume3
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger
flume3.conf
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#sources
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#channels
a3.channels.c1.type =memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#sinks
a3.sinks.k1.type = logger
#bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume2.conf
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
#channels
a2.channels.c1.type =memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#sinks
a2.sinks.k1.type = logger
#bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume1.conf
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Sink Processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
启动3:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console
启动2:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console
启动1:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console
8.聚合
案例需求:聚合
hadoop102上的Flume-1监控文件/opt/module/flume-1.9.0/jobs/taildir,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
flume sources channels sinks
分析: flume1 taildir memory avro
flume2 netcat memory avro
flume3 avro memory logger
flume3.conf
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#sources
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 8888
#channels
a3.channels.c1.type =memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#sinks
a3.sinks.k1.type = logger
#bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume2.conf
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#sources
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 7777
#channels
a2.channels.c1.type =memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#sinks
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 8888
#bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume1.conf
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#sources
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 8888
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动3:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume3.conf -n a3 -Dflume.root.logger=INFO,console
启动2:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume2.conf -n a2 -Dflume.root.logger=INFO,console
启动1:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume1.conf -n a1 -Dflume.root.logger=INFO,console
8.多路 multiplexing
案例需求:多路
hadoop101 的flume1 监控 8888端口的数据 并将已字母开头的发给Hadoop104,数字开头的发送给Hadoop103
flume2,flume3打印到控制台
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger
flume3.conf
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#sources
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 8888
#channels
a3.channels.c1.type =memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#sinks
a3.sinks.k1.type = logger
#bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume2.conf
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#sources
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 7777
#channels
a2.channels.c1.type =memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#sinks
a2.sinks.k1.type = logger
#bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume1.conf
#name
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = myInterceptor.CustomInterceptor$Builder
#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
#sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 8888
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 7777
#bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
启动3:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume3.conf -n a3 -Dflume.root.logger=INFO,console
启动2:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume2.conf -n a2 -Dflume.root.logger=INFO,console
启动1:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume1.conf -n a1 -Dflume.root.logger=INFO,console
9.监听端口 netcat
#监听端口
要用到netcat
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#sink
a1.sinks.k1.type = logger
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/netcat-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console