Flume介绍
介绍
(1)高可靠性,Flume提供了end to end的数据可靠性机制
(2)易于扩展,Agent为分布式架构,可水平扩展
(3)易于恢复,Channel中保存了与数据源有关的事件,用于失败时的恢复
(4)功能丰富,Flume内置了多种组件,包括不同数据源和不同存储方式
(1)Source:数据源,简单的说就是agent获取数据的入口。
(2)Channel:管道,数据流通和存储的通道。一个source必须至少和一个channel关联。
(3)Sink:用来接收channel传输的数据并将之传送到指定的地方,成功后从channel中删除。
Events
(1)是Flume数据传输的基本单元
(2)由header和载有数据的一个byte array构成,byte array字节数组:存储真实的数据
(3)每一个事件的大小:deserializer.maxLineLength2048字节,编码格式:UTF-8
一个source,可以绑定多个channel
一个sink,只能绑定一个channel
Flume安装
解压
tar -xzf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin /usr/local/flume
修改/etc/profile
vim /etc/profile
FLUME_HOME=/usr/local/flume PATH=$PATH:$FLUME_HOME/bin |
source /etc/profile
修改flume-env.sh
cd /usr/local/flume/conf
mv flume-env.sh.template flume-env.sh
vim flume-env.sh
export JAVA_HOME=/usr/local/jdk1.8.0_271 export PATH=$PATH:$JAVA_HOME/bin |
验证
cd /usr/local/flume/bin
flume-ng version
自定义flume配置目录
mkdir -pv /usr/local/flume/{log,job,shell}
job #用于存放flume启动的agent端的配置文件
log #用于存放日志文件
shell #用于存放启动脚本
Flume案例
案例1:监控端口数据(netcat source - memory channel - logger sink)
1.编辑配置:
vim /usr/local/flume/job/flume-netcat.conf
# 指定Agent组件 a1.sources = r1 a1.channels = c1 a1.sinks = k1
# 指定source a1.sources.r1.type = netcat a1.sources.r1.bind = cancer01 a1.sources.r1.port = 8888
# 指定sink类型,我们这里指定的为logger,即控制台输出。 a1.sinks.k1.type = logger
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是100 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# 绑定source和sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
2.启动
flume-ng agent --conf /usr/local/flume/conf --name a1 --conf-file /usr/local/flume/job/flume-netcat.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger==INFO,console
3.测试
ss -ntl #查看本地端口
telnet cancer01 8888
4.编写脚本
vim /usr/local/flume/shell/start-netcat.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/conf --conf-file=/usr/local/flume/job/flume-netcat.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=10501 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-netcat.log 2>&1 & |
赋予权限:
chmod +x /usr/local/flume/shell/start-netcat.sh
5.启动脚本
ss -ntl
/usr/local/flume/shell/start-netcat.sh
tail -100f /usr/local/flum/log/flume-netcat.log
6.查看channel状态
yum -y install epel-release #安装EPEL源
yum list jq #查看jq安装包是否存在
yum -y install jq #安装jq工具,便于我们查看json格式的内容
curl http://cancer01:10501/metrics | jq { "CHANNEL.c1": { #这是c1的CHANEL监控数据,c1该名称在flume-netcat.conf中配置文件中定义的。 "ChannelCapacity": "1000", #channel的容量,目前仅支持File Channel,Memory channel的统计数据。 "ChannelFillPercentage": "0.0", #channel已填入的百分比。 "Type": "CHANNEL", #很显然,这里是CHANNEL监控项,类型为CHANNEL。 "ChannelSize": "0", #目前channel中事件的总数量,目前仅支持File Channel,Memory channel的统计数据。 "EventTakeSuccessCount": "64", #sink成功从channel读取事件的总数量。 "EventTakeAttemptCount": "227", #sink尝试从channel拉取事件的总次数。这不意味着每次时间都被返回,因为sink拉取的时候channel可能没有任何数据。 "StartTime": "1563520459221", #channel启动时的毫秒值时间。 "EventPutAttemptCount": "64", #Source尝试写入Channe的事件总次数。 "EventPutSuccessCount": "64", #成功写入channel且提交的事件总次数。 "StopTime": "0" #channel停止时的毫秒值时间,为0表示一直在运行。 } }
|
6.杀掉进程
ss -ntl
netstat -untalp | grep 8888
jps
kill 3816
netstat -untalp | grep 8888
ss -ntl
案例2:实时读取本地文件到HDFS集群(exec source - memory channel - hdfs sink)
说明:需要flume节点配置hadoop集群环境
1.编辑配置:
vim /usr/local/flume/job/flume-hdfs.conf
# 指定Agent组件 a2.sources = file_source a2.sinks = hdfs_sink a2.channels = memory_channel
# 指定source a2.sources.file_source.type = exec a2.sources.file_source.command = tail -F /var/log/messages a2.sources.file_source.shell = /bin/bash -c
# 指定sink。 a2.sinks.hdfs_sink.type = hdfs a2.sinks.hdfs_sink.hdfs.path = hdfs://cancer/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105- #是否按照时间滚动文件夹 a2.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.hdfs_sink.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.hdfs_sink.hdfs.rollInterval = 600 #设置每个文件的滚动大小 a2.sinks.hdfs_sink.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.hdfs_sink.hdfs.rollCount = 0 #最小副本数 a2.sinks.hdfs_sink.hdfs.minBlockReplicas = 1
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 a2.channels.memory_channel.type = memory a2.channels.memory_channel.capacity = 1000 a2.channels.memory_channel.transactionCapacity = 1000
# 绑定source和sink a2.sources.file_source.channels = memory_channel a2.sinks.hdfs_sink.channel = memory_channel |
说明:(生产配置)
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
Channel参数解释:
capacity:默认该通道中最大的可以存储的event数量
trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
keep-alive:event添加到通道中或者移出的允许时间
2.编写脚本
vim /usr/local/flume/shell/start-hdfs.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-hdfs.conf --name a2 -Dflume.monitoring.type=http -Dflume.monitoring.port=10502 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-hdfs.log 2>&1 & |
赋予权限:
chmod +x /usr/local/flume/shell/start-hdfs.sh
3.启动脚本
ss -ntl
/usr/local/flume/shell/start-hdfs.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-hdfs.log
4查看hdfs对应目录是否生成相应的日志信息
hdfs dfs -ls /flume/20190719/17
5.查看channel状态
curl http://cancer01:10502/metrics | jq { "SOURCE.file_source": { "AppendBatchAcceptedCount": "0", #成功提交到channel的批次的总数量。 "GenericProcessingFail": "0", #常规处理失败的次数 "EventAcceptedCount": "9", #成功写出到channel的事件总数量。 "AppendReceivedCount": "0", #每批只有一个事件的事件总数量(与RPC调用的一个append调用相等)。 "StartTime": "1563528439426", #SOURCE启动时的毫秒值时间。 "AppendBatchReceivedCount": "0", #接收到事件批次的总数量。 "ChannelWriteFail": "0", #往CHANNEL写失败的次数 "EventReceivedCount": "9", #目前为止source已经接收到的事件总数量。 "EventReadFail": "0", #时间读取失败的次数 "Type": "SOURCE", #当前类型为SOURRCE "AppendAcceptedCount": "0", #逐条录入的次数,单独传入的事件到Channel且成功返回的事件总数量。 "OpenConnectionCount": "0", #目前与客户端或sink保持连接的总数量,目前仅支持avro source展现该度量。 "StopTime": "0" #SOURCE停止时的毫秒值时间,0代表一直运行着 }, "CHANNEL.memory_channel": { "ChannelCapacity": "1000", #channel的容量,目前仅支持File Channel,Memory channel的统计数据。 "ChannelFillPercentage": "0.0", #channel已填入的百分比。 "Type": "CHANNEL", #当前类型为CHANNEL "ChannelSize": "0", #目前channel中事件的总数量,目前仅支持File Channel,Memory channel的统计数据。 "EventTakeSuccessCount": "9", #sink成功从channel读取事件的总数量。 "EventTakeAttemptCount": "36", #sink尝试从channel拉取事件的总次数。这不意味着每次时间都被返回,因为sink拉取的时候channel可能没有任何数据。 "StartTime": "1563528438997", #CHANNEL启动时的毫秒值时间。 "EventPutAttemptCount": "9", #Source尝试写入Channe的事件总次数。 "EventPutSuccessCount": "9", #成功写入channel且提交的事件总次数。 "StopTime": "0" #CHANNEL停止时的毫秒值时间。 }, "SINK.hdfs_sink": { "ConnectionCreatedCount": "1", #下一个阶段(或存储系统)创建链接的数量(如HDFS创建一个文件)。 "BatchCompleteCount": "0", #批量处理event的个数等于批处理大小的数量。 "EventWriteFail": "0", #时间写失败的次数 "BatchEmptyCount": "26", #批量处理event的个数为0的数量(空的批量的数量),如果数量很大表示source写入数据的速度比sink处理数据的速度慢很多。 "EventDrainAttemptCount": "9", #sink尝试写出到存储的事件总数量。 "StartTime": "1563528439448", #SINK启动时的毫秒值时间。 "BatchUnderflowCount": "1", #批量处理event个数小于批处理大小数量(比sink配置使用最大批量尺寸更小批量数量),如果该值很高也表示sink比source快 "ChannelReadFail": "0", #从CHANNEL读取失败的次数 "ConnectionFailedCount": "0", #连接失败的次数 "ConnectionClosedCount": "0", #连接关闭的次数 "Type": "SINK", #当前类型为SINK "EventDrainSuccessCount": "9", #sink成功写出到存储的事件总数量。 "StopTime": "0" #SINK停止时的毫秒值时间。 } } |
6.杀掉进程
ss -ntl
netstat -untalp | grep ****
jps
kill ****
netstat -untalp | grep ****
ss -ntl
案例3:实时指定目录文件内容到HDFS集群(spooldir source - memory channel - hdfs sink)
说明:需要flume节点配置hadoop集群环境
1.编辑配置:
vim /usr/local/flume/job/flume-dir.conf
# 指定Agent组件 a3.sources = spooldir_source a3.sinks = hdfs_sink a3.channels = memory_channel
# 指定source a3.sources.spooldir_source.type = spooldir a3.sources.spooldir_source.spoolDir = /home/flume/upload a3.sources.spooldir_source.fileSuffix = .COMPLETED a3.sources.spooldir_source.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 a3.sources.spooldir_source.ignorePattern = ([^ ]*\.tmp) #获取源文件名称,方便下面的sink调用变量fileName a3.sources.spooldir_source.basenameHeader = true a3.sources.spooldir_source.basenameHeaderKey = fileName
# 指定sink。 a3.sinks.hdfs_sink.type = hdfs a3.sinks.hdfs_sink.hdfs.path = hdfs://cancer/flume #上传文件的前缀 a3.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105-upload- #是否按照时间滚动文件夹 a3.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.hdfs_sink.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.hdfs_sink.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.hdfs_sink.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.hdfs_sink.hdfs.rollCount = 0 #最小冗余数 a3.sinks.hdfs_sink.hdfs.minBlockReplicas = 1 #和source的basenameHeader,basenameHeaderKey两个属性一起用可以保持原文件名称上传 a3.sinks.hdfs_sink.hdfs.filePrefix = %{fileName}
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 a3.channels.memory_channel.type = memory a3.channels.memory_channel.capacity = 1000 a3.channels.memory_channel.transactionCapacity = 1000
# 绑定source和sink a3.sources.spooldir_source.channels = memory_channel a3.sinks.hdfs_sink.channel = memory_channel |
2.编写脚本
vim /usr/local/flume/shell/start-dir.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-dir.conf --name a3 -Dflume.monitoring.type=http -Dflume.monitoring.port=10503 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-dir.log 2>&1 & |
赋予权限:
chmod +x /usr/local/flume/shell/start-dir.sh
3.构建测试数据
mkdir -pv /home/flume/upload
echo http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie.blog
echo http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie2.tmp
echo http://www.cnblogs.com/yinzhengjie>/yinzhengjie/data/flume/upload/yinzhengjie3.txt
4.启动脚本
ss -ntl
/usr/local/flume/shell/start-dir.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-dir.log
5.查看hdfs目录
ll /home/flume/upload
hdfs dfs -ls /flume/
6.查看channel状态
curl http://cancer01:10503/metrics | jq
|
6.杀掉进程
ss -ntl
netstat -untalp | grep ****
jps
kill ****
netstat -untalp | grep ****
ss -ntl
案例4:Flume与Flume之间数据传递,多Flume汇总数据到单Flume
说明:需要flume节点配置hadoop集群环境
场景说明:flume-1监控文件yinzhengjie.log,flume-2监控某一个端口的数据流,flume-1与flume-2将数据发送给flume-3,flume3将最终数据写入到HDFS。
1.编辑aggregation配置:
vim /usr/local/flume/job/flume-aggregation.conf
# 指定Agent组件 aggregation.sources = avro_source aggregation.sinks = hdfs_sink aggregation.channels = memory_channel
# 指定source aggregation.sources.avro_source.type = avro aggregation.sources.avro_source.bind = cancer01 aggregation.sources.avro_source.port = 6666
# 指定sink。 aggregation.sinks.hdfs_sink.type = hdfs aggregation.sinks.hdfs_sink.hdfs.path = hdfs://cancer/flume/%Y%m%d/%H #上传文件的前缀 aggregation.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105- #是否按照时间滚动文件夹 aggregation.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 aggregation.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 aggregation.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 aggregation.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 aggregation.sinks.hdfs_sink.hdfs.batchSize = 100 #设置文件类型,可支持压缩 aggregation.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 aggregation.sinks.hdfs_sink.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M aggregation.sinks.hdfs_sink.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 aggregation.sinks.hdfs_sink.hdfs.rollCount = 0 #最小冗余数 aggregation.sinks.hdfs_sink.hdfs.minBlockReplicas = 1
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 aggregation.channels.memory_channel.type = memory aggregation.channels.memory_channel.capacity = 1000 aggregation.channels.memory_channel.transactionCapacity = 100
# 绑定source和sink aggregation.sources.avro_source.channels = memory_channel aggregation.sinks.hdfs_sink.channel = memory_channel |
2.编写aggregation脚本
vim /usr/local/flume/shell/start-aggregation.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-aggregation.conf --name aggregation -Dflume.monitoring.type=http -Dflume.monitoring.port=10511 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-aggregation.log 2>&1 & |
赋予权限:
chmod +x /usr/local/flume/shell/start-aggregation.sh
3.启动aggregation脚本
ss -ntl
/usr/local/flume/shell/start-aggregation.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-aggregation.log
4.编辑netcat配置:
vim /usr/local/flume/job/flume-my_netcat.conf
# 指定Agent组件 my_netcat.sources = netcat_source my_netcat.sinks = avro_sink my_netcat.channels = memory_channel
# 指定source my_netcat.sources.netcat_source.type = netcat my_netcat.sources.netcat_source.bind = cancer01 my_netcat.sources.netcat_source.port = 8888
# 指定sink my_netcat.sinks.avro_sink.type = avro my_netcat.sinks.avro_sink.hostname = cancer01 my_netcat.sinks.avro_sink.port = 6666
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 my_netcat.channels.memory_channel.type = memory my_netcat.channels.memory_channel.capacity = 1000 my_netcat.channels.memory_channel.transactionCapacity = 100
# 绑定source和sink my_netcat.sources.netcat_source.channels = memory_channel my_netcat.sinks.avro_sink.channel = memory_channel |
5.编写netcat脚本
vim /usr/local/flume/shell/start-my_netcat.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_netcat.conf --name my_netcat -Dflume.monitoring.type=http -Dflume.monitoring.port=10512 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-my_netcat.log 2>&1 & |
赋予权限:
chmod +x /usr/local/flume/shell/start-my_netcat.sh
6.启动netcat脚本
ss -ntl
/usr/local/flume/shell/start-my_netcat.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-my_netcat.log
7.编辑exec配置:
vim /usr/local/flume/job/flume-my_exec.conf
# 指定Agent组件 my_exec.sources = exec_source my_exec.sinks = avro_sink my_exec.channels = memory_channel
# 指定source my_exec.sources.exec_source.type = exec my_exec.sources.exec_source.command = tail -F /home/flume/blog.txt my_exec.sources.exec_source.shell = /bin/bash -c
# 指定sink my_exec.sinks.avro_sink.type = avro my_exec.sinks.avro_sink.hostname = cancer01 my_exec.sinks.avro_sink.port = 6666
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 my_exec.channels.memory_channel.type = memory my_exec.channels.memory_channel.capacity = 1000 my_exec.channels.memory_channel.transactionCapacity = 100
# 绑定source和sink my_exec.sources.exec_source.channels = memory_channel my_exec.sinks.avro_sink.channel = memory_channel |
8.编写exec脚本
vim /usr/local/flume/shell/start-my_exec.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_exec.conf --name my_exec -Dflume.monitoring.type=http -Dflume.monitoring.port=10513 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-my_exec.log 2>&1 & |
赋予权限:
chmod +x /usr/local/flume/shell/start-my_exec.sh
9.启动exec脚本
ss -ntl
/usr/local/flume/shell/start-my_exec.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-my_exec.log
10测试
telnet cancer01 8888
echo "www.baidu.com" >> /home/flume/blog.txt
11.查看hdfs目录
hdfs dfs -ls /flume/
hdfs dfs -cat /flume/*******
6.查看channel状态
curl http://cancer01:10511/metrics | jq curl http://cancer01:10512/metrics | jq curl http://cancer01:10513/metrics | jq
|
6.杀掉进程
ss -ntl
netstat -untalp | grep ****
jps
kill ****
netstat -untalp | grep ****
ss -ntl
案例5:挑选器案例
channel selector: 通道挑选器,选择指定的event发送到指定的channel
(1)Replicating Channel Selector: 默认为副本挑选器,事件均以副本方式输出,换句话说就是有几个channel就发送几个副本。
(2)multiplexing selector: 多路复用挑选器,作用就是可以将不同的内容发送到指定的channel
1.编辑avro配置:
vim /usr/local/flume/job/flume-my_avro.conf
# 指定Agent组件 my_avro.sources = avro_source my_avro.sinks = hdfs_sink my_avro.channels = memory_channel
# 指定source my_avro.sources.avro_source.type = avro my_avro.sources.avro_source.bind = cancer01 my_avro.sources.avro_source.port = 8888
# 指定sink。 my_avro.sinks.hdfs_sink.type = hdfs my_avro.sinks.hdfs_sink.hdfs.path = hdfs://cancer01/flume/%Y%m%d/%H #上传文件的前缀 my_avro.sinks.hdfs_sink.hdfs.filePrefix = 172.30.1.105- #是否按照时间滚动文件夹 my_avro.sinks.hdfs_sink.hdfs.round = true #多少时间单位创建一个新的文件夹 my_avro.sinks.hdfs_sink.hdfs.roundValue = 1 #重新定义时间单位 my_avro.sinks.hdfs_sink.hdfs.roundUnit = hour #是否使用本地时间戳 my_avro.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 my_avro.sinks.hdfs_sink.hdfs.batchSize = 100 #设置文件类型,可支持压缩 my_avro.sinks.hdfs_sink.hdfs.fileType = DataStream #多久生成一个新的文件 my_avro.sinks.hdfs_sink.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M my_avro.sinks.hdfs_sink.hdfs.rollSize = 134210000 #文件的滚动与Event数量无关 my_avro.sinks.hdfs_sink.hdfs.rollCount = 0 #最小冗余数 my_avro.sinks.hdfs_sink.hdfs.minBlockReplicas = 1
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 my_avro.channels.memory_channel.type = memory my_avro.channels.memory_channel.capacity = 1000 my_avro.channels.memory_channel.transactionCapacity = 100
# 绑定source和sink my_avro.sources.avro_source.channels = memory_channel my_avro.sinks.hdfs_sink.channel = memory_channel
|
2.编写avro脚本
vim /usr/local/flume/shell/start-my_avro.sh
#!/bin/bash anohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_avro.conf --name my_avro -Dflume.monitoring.type=http -Dflume.monitoring.port=10514 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-my_avro.log 2>&1 &
|
赋予权限:
chmod +x /usr/local/flume/shell/start-my_avro.sh
3.启动avro脚本
ss -ntl
/usr/local/flume/shell/start-my_avro.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-my_avro.log
4.编辑file_roll配置:
vim /usr/local/flume/job/flume-my_file_roll.conf
# 指定Agent组件 my_file_roll.sources = avro_source my_file_roll.sinks = file_roll_sink my_file_roll.channels = memory_channel
# 指定source my_file_roll.sources.avro_source.type = avro my_file_roll.sources.avro_source.bind = cancer01 my_file_roll.sources.avro_source.port = 9999
# 指定sink my_file_roll.sinks.file_roll_sink.type = file_roll #输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。 my_file_roll.sinks.file_roll_sink.sink.directory = /home/flume/output
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 my_file_roll.channels.memory_channel.type = memory my_file_roll.channels.memory_channel.capacity = 1000 my_file_roll.channels.memory_channel.transactionCapacity = 100
# 绑定source和sink my_file_roll.sources.avro_source.channels = memory_channel my_file_roll.sinks.file_roll_sink.channel = memory_channel
|
5.编写file_roll脚本
vim /usr/local/flume/shell/start-my_file_roll.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_file_roll.conf --name my_file_roll -Dflume.monitoring.type=http -Dflume.monitoring.port=10515 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-my_file_roll.log 2>&1 &
|
赋予权限:
chmod +x /usr/local/flume/shell/start-my_file_roll.sh
6.启动file_roll脚本
ss -ntl
/usr/local/flume/shell/start-my_file_roll.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-my_file_roll.log
7.编辑replica配置:
vim /usr/local/flume/job/flume-my_replica.conf
# 指定Agent组件 replica.sources = exec_source replica.sinks = hdfs_sink file_roll_sink replica.channels = hdfs_channel file_roll_channel
# 将数据流复制给多个channel replica.sources.exec_source.selector.type = replicating
# 指定source replica.sources.exec_source.type = exec replica.sources.exec_source.command = tail -F /home/flume/blog.txt replica.sources.exec_source.shell = /bin/bash -c
# 定义要输出到hdfs的sink,注意端口号 replica.sinks.hdfs_sink.type = avro replica.sinks.hdfs_sink.hostname = cancer01 replica.sinks.hdfs_sink.port = 8888
# 定义要输出到local filesystem的sink replica.sinks.file_roll_sink.type = avro replica.sinks.file_roll_sink.hostname = cancer01 replica.sinks.file_roll_sink.port = 9999
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 replica.channels.hdfs_channel.type = memory replica.channels.hdfs_channel.capacity = 1000 replica.channels.hdfs_channel.transactionCapacity = 100 replica.channels.file_roll_channel.type = memory replica.channels.file_roll_channel.capacity = 1000 replica.channels.file_roll_channel.transactionCapacity = 100
# 绑定source和sink replica.sources.exec_source.channels = hdfs_channel file_roll_channel replica.sinks.hdfs_sink.channel = hdfs_channel replica.sinks.file_roll_sink.channel = file_roll_channel
|
8.编写replica脚本
vim /usr/local/flume/shell/start-my_replica.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-replica.conf --name replica -Dflume.monitoring.type=http -Dflume.monitoring.port=10516 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-replica.log 2>&1 &
|
赋予权限:
chmod +x /usr/local/flume/shell/start-my_replica.sh
9.启动replica脚本
ss -ntl
/usr/local/flume/shell/start-my_replica.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-my_replica.log
10.查看hdfs目录
hdfs dfs -ls /flume/
hdfs dfs -cat /flume/*******
ll /home/flume/output
11.编辑multiplexing_selector配置:
vim /usr/local/flume/job/flume-my_multiplexing_selector.conf
# 指定Agent组件 my_multiplexing_selector.sources = avro_source my_multiplexing_selector.sinks = Java_sink Go_sink Python_sink my_multiplexing_selector.channels = Java_channel Go_channel Python_channel
# 指定source my_multiplexing_selector.sources.avro_source.type = avro my_multiplexing_selector.sources.avro_source.bind = cancer01 my_multiplexing_selector.sources.avro_source.port = 8888 # 指定挑选器类型为:多路复用 my_multiplexing_selector.sources.avro_source.selector.type = multiplexing # 指定event header的key值 my_multiplexing_selector.sources.avro_source.selector.header = language # 指定event header的key值对应的value值,编辑文件内容如:language java my_multiplexing_selector.sources.avro_source.selector.mapping.JAVA = Java_channel my_multiplexing_selector.sources.avro_source.selector.mapping.GOLANG = Go_channel my_multiplexing_selector.sources.avro_source.selector.default = Python_channel
# 定义sink my_multiplexing_selector.sinks.Java_sink.type = file_roll my_multiplexing_selector.sinks.Java_sink.sink.directory= /home/language/java my_multiplexing_selector.sinks.Java_sink.sink.rollInterval = 0 my_multiplexing_selector.sinks.Go_sink.type = file_roll my_multiplexing_selector.sinks.Go_sink.sink.directory= /home/language/golang my_multiplexing_selector.sinks.Go_sink.sink.rollInterval = 0 my_multiplexing_selector.sinks.Python_sink.type = file_roll my_multiplexing_selector.sinks.Python_sink.sink.directory= /home/language/python my_multiplexing_selector.sinks.Python_sink.sink.rollInterval = 0
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 my_multiplexing_selector.channels.Java_channel.type = memory my_multiplexing_selector.channels.Java_channel.capacity = 100000 my_multiplexing_selector.channels.Java_channel.transactionCapacity = 10000 my_multiplexing_selector.channels.Go_channel.type = memory my_multiplexing_selector.channels.Go_channel.capacity = 100000 my_multiplexing_selector.channels.Go_channel.transactionCapacity = 10000 my_multiplexing_selector.channels.Python_channel.type = memory my_multiplexing_selector.channels.Python_channel.capacity = 100000 my_multiplexing_selector.channels.Python_channel.transactionCapacity = 10000
# 绑定source和sink my_multiplexing_selector.sources.avro_source.channels = Java_channel Go_channel Python_channel my_multiplexing_selector.sinks.Java_sink.channel = Java_channel my_multiplexing_selector.sinks.Go_sink.channel = Go_channel my_multiplexing_selector.sinks.Python_sink.channel = Python_channel
|
12.编写multiplexing_selector脚本
vim /usr/local/flume/shell/start-my_multiplexing_selector.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-my_multiplexing_selector.conf --name my_multiplexing_selector -Dflume.monitoring.type=http -Dflume.monitoring.port=10522 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-my_multiplexing_selector.log 2>&1 &
|
赋予权限:
chmod +x /usr/local/flume/shell/start-my_multiplexing_selector.sh
13.启动replica脚本
ss -ntl
/usr/local/flume/shell/start-my_multiplexing_selector.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-my_multiplexing_selector.log
14.测试
cat a.txt
language java
language php
language shell
language golang
language python
language scanla
language js
language vbs
language c++
language linux
cat header.txt
language java
flume-ng avro-client -H cancer01 -p 8888 -F a.txt -R header.txt
案例6:主机拦截器案例
拦截器(interceptor): 是source端的在处理过程中能够对数据(event)进行修改或丢弃的组件。常见的拦截器有:
(1)host interceptor: 将发送的event添加主机名的header
(2)timestamp interceptor: 将发送的event添加时间戳的header
1.编辑host_interceptor配置:
vim /usr/local/flume/job/flume-host_interceptor.conf
# 指定Agent组件 my_host_interceptor.sources = netcat_source my_host_interceptor.sinks = logger_sink my_host_interceptor.channels = memory_channel
# 指定source my_host_interceptor.sources.netcat_source.type = netcat my_host_interceptor.sources.netcat_source.bind = cancer01 my_host_interceptor.sources.netcat_source.port = 8888
# 指定添加拦截器 my_host_interceptor.sources.netcat_source.interceptors = i1 my_host_interceptor.sources.netcat_source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder my_host_interceptor.sources.netcat_source.interceptors.i1.preserveExisting = false # 指定header的key my_host_interceptor.sources.netcat_source.interceptors.i1.hostHeader = hostname # 指定header的value为主机ip my_host_interceptor.sources.netcat_source.interceptors.i1.useIP = true
# 指定sink。 my_host_interceptor.sinks.logger_sink.type = logger
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 my_host_interceptor.channels.memory_channel.type = memory my_host_interceptor.channels.memory_channel.capacity = 100000 my_host_interceptor.channels.memory_channel.transactionCapacity = 10000
# 绑定source和sink my_host_interceptor.sources.netcat_source.channels = memory_channel my_host_interceptor.sinks.logger_sink.channel = memory_channel
|
2.编写host_interceptor脚本
vim /usr/local/flume/shell/start-host_interceptor.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-host_interceptor.conf --name my_host_interceptor -Dflume.monitoring.type=http -Dflume.monitoring.port=10520 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-host_interceptor.log 2>&1 &
|
赋予权限:
chmod +x /usr/local/flume/shell/start-host_interceptor.sh
3.启动host_interceptor脚本
ss -ntl
/usr/local/flume/shell/start-host_interceptor.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-host_interceptor.log
4.测试
telnet cancer01 8888
tail -100f /usr/local/flum/log/flume-host_interceptor.log
1.编辑timestamp_interceptor配置:
vim /usr/local/flume/job/flume-timestamp_interceptor.conf
# 指定Agent组件 my_timestamp_interceptor.sources = netcat_source my_timestamp_interceptor.sinks = logger_sink my_timestamp_interceptor.channels = memory_channel
# 指定source my_timestamp_interceptor.sources.netcat_source.type = netcat my_timestamp_interceptor.sources.netcat_source.bind = cancer01 my_timestamp_interceptor.sources.netcat_source.port = 8888
# 指定添加拦截器 my_timestamp_interceptor.sources.netcat_source.interceptors = i1 my_timestamp_interceptor.sources.netcat_source.interceptors.i1.type = timestamp
# 指定sink。 my_timestamp_interceptor.sinks.logger_sink.type = logger
# 指定channel类型为memory,指定channel的容量是1000,每次传输的容量是1000 my_timestamp_interceptor.channels.memory_channel.type = memory my_timestamp_interceptor.channels.memory_channel.capacity = 100000 my_timestamp_interceptor.channels.memory_channel.transactionCapacity = 10000
# 绑定source和sink my_timestamp_interceptor.sources.netcat_source.channels = memory_channel my_timestamp_interceptor.sinks.logger_sink.channel = memory_channel
|
2.编写timestamp_interceptor脚本
vim /usr/local/flume/shell/start-timestamp_interceptor.sh
#!/bin/bash nohup flume-ng agent -c /usr/local/flume/job --conf-file=/usr/local/flume/job/flume-timestamp_interceptor.conf --name my_timestamp_interceptor -Dflume.monitoring.type=http -Dflume.monitoring.port=10521 -Dflume.root.logger=INFO,console >> /usr/local/flume/log/flume-timestamp_interceptor.log 2>&1 & |
赋予权限:
chmod +x /usr/local/flume/shell/start-timestamp_interceptor.sh
3.启动timestamp_interceptor脚本
ss -ntl
/usr/local/flume/shell/start-timestamp_interceptor.sh
ss -ntl
tail -100f /usr/local/flum/log/flume-timestamp_interceptor.log
4.测试
telnet cancer01 8888
tail -100f /usr/local/flum/log/flume-timestamp_interceptor.log
案例7:日志合并
1.场景:
2.分析:
3. 在服务器A和服务器B上创建配置文件:exec_source_avro_sink.conf
# Name the components on this agent a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1
# Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/data/access.log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static ##static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对 a1.sources.r1.interceptors.i1.key = type a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec a1.sources.r2.command = tail -F /root/data/nginx.log a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = type a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec a1.sources.r3.command = tail -F /root/data/web.log a1.sources.r3.interceptors = i3 a1.sources.r3.interceptors.i3.type = static a1.sources.r3.interceptors.i3.key = type a1.sources.r3.interceptors.i3.value = web
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = cancer01 a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 a1.sinks.k1.channel = c1
|
4. 在服务器C上创建配置文件:avro_source_hdfs_sink.conf
#定义agent名, source、channel、sink的名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1
#定义source a1.sources.r1.type = avro a1.sources.r1.bind = mini2 a1.sources.r1.port =41414
#添加时间拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#定义channels a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000
#定义sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=hdfs://cancer/flume/logs/%{type}/%Y%m%d a1.sinks.k1.hdfs.filePrefix =events a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text #时间类型 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件不按条数生成 a1.sinks.k1.hdfs.rollCount = 0 #生成的文件按时间生成 a1.sinks.k1.hdfs.rollInterval = 30 #生成的文件按大小生成 a1.sinks.k1.hdfs.rollSize = 10485760 #批量写入hdfs的个数 a1.sinks.k1.hdfs.batchSize = 10000 flume操作hdfs的线程数(包括新建,写入等) a1.sinks.k1.hdfs.threadsPoolSize=10 #操作hdfs超时时间 a1.sinks.k1.hdfs.callTimeout=30000
#组装source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
5启动
先启动服务器C上flume
flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
在启动服务器AB上flume
flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
案例7:sink group
# Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinkgroups = g1 a1.sinks = k1 k2
# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
# 配置sink组相关信息 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142
# Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1 |
案例8:source mysql
cd /var/lib/flume
touch sql-source.status #状态文件记录最新id值
chmod -R 777 /usr/local/flume/
cp flume-ng-sql-source-1.3.7.jar /usr/local/flume/lib/
cp mysql-connector-java-5.1.17.jar /usr/local/flume/lib/mysql-connector-java.jar
agent.sources = sql-source agent.channels = ch1 agent.sinks = HDFS
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test agent.sources.sql-source.user = root agent.sources.sql-source.password = 123456 agent.sources.sql-source.table = wlslog agent.sources.sql-source.columns.to.select = * agent.sources.sql-source.incremental.column.name = id agent.sources.sql-source.incremental.value = 0 agent.sources.sql-source.run.query.delay=5000 agent.sources.sql-source.status.file.path = /usr/local/flume agent.sources.sql-source.status.file.name = mysql-source.status
agent.sinks.HDFS.type = hdfs agent.sinks.HDFS.hdfs.path = hdfs://cancer/flume/mysql agent.sinks.HDFS.hdfs.fileType = DataStream agent.sinks.HDFS.hdfs.writeFormat = Text agent.sinks.HDFS.hdfs.rollSize = 268435456 agent.sinks.HDFS.hdfs.rollInterval = 0 agent.sinks.HDFS.hdfs.rollCount = 0
agent.channels.ch1.type = memory agent.channels.ch1.capacity = 100000 agent.channels.ch1.transactionCapacity = 1000
agent.sources.sql-source.channels = ch1 agent.sinks.HDFS.channel = ch1 |
说明:
利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。这种方案的缺点与其优点一样突出,主要体现在以下几方面。
在源库上执行了查询,具有入侵性。
通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。
只能识别新增数据,检测不到删除与更新。
要求源库必须有用于表示增量的字段。
即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。
自定义mysql source:
https://blog.csdn.net/lianghecai52171314/article/details/104919999
https://www.cnblogs.com/jhxxb/p/11589851.html
案例9:source mysql sink kafka
agent.sources = sql-source agent.sinks = k1 agent.channels = ch
#将flume-ng-sql-source-1.x.x.jar包放到FLUME_HOME/lib下, #如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下 #将mysql-connector-java-X-bin.jar放到FLUME_HOME/lib下, #如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下
agent.sources.sql-source.type= org.keedio.flume.source.SQLSource agent.sources.sql-source.hibernate.connection.url=jdbc:mysql://hostname:3306/yinqing?useUnicode=true&characterEncoding=utf-8&useSSL=false agent.sources.sql-source.hibernate.connection.user=root agent.sources.sql-source.hibernate.connection.password =password agent.sources.sql-source.hibernate.connection.autocommit = true agent.sources.sql-source.hibernate.connection.driver_calss=com.mysql.jdbc.Driver agent.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect agent.sources.sql-source.hibernate.provider_class=org.hibernate.connection.C3P0ConnectionProvider agent.sources.sql-source.run.query.delay=10000 # 增量配置 #agent.sources.sql-source.table =table_name #agent.sources.sql-source.columns.to.select = * #agent.sources.sql-source.incremental.column.name=id #agent.sources.sql-source.incremental.value=0 agent.sources.sql-source.custom.query=select * from lt_api_getallstops order by id desc agent.sources.sql-source.start.from=0 #增量列的初始值 agent.sources.sql-source.status.file.path = /var/lib/flume-ng agent.sources.sql-source.status.file.name = sql-source.status
#kafka.sink配置,此处是集群 agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.topic = topic_test agent.sinks.k1.brokerList = cancer01:9092,cancer02:9092,cancer03:9092 agent.sinks.k1.batchsize = 200 agent.sinks.kafkaSink.requiredAcks=1 agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder agent.sinks.kafkaSink.zookeeperConnect=cancer01:2181,cancer02:2181,cancer03:2181
agent.channels.ch.type = memory agent.channels.ch.capacity = 100000 agent.channels.ch.transactionCapacity = 1000 agent.channels.hbaseC.keep-alive = 20
agent.sources.sql-source.channels = ch agent.sinks.k1.channel = ch
|
案例9:source kafka sink hdfs
agent.sources = kafkaSource agent.channels = memoryChannel agent.sinks = hdfsSink
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource agent.sources.kafkaSource.zookeeperConnect=cancer01:2181,cancer02:2181,cancer03:2181 agent.sources.kafkaSource.topic=flume-data agent.sources.kafkaSource.kafka.consumer.timeout.ms=100
agent.sinks.hdfsSink.type=hdfs agent.sinks.hdfsSink.hdfs.path=hdfs://cancer/kafka agent.sinks.hdfsSink.hdfs.writeFormat=Text agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.channels.memoryChannel.type=memory agent.channels.memoryChannel.capacity=1000 agent.channels.memoryChannel.transactionCapacity=100
agent.sources.kafkaSource.channels = memoryChannel agent.sinks.hdfsSink.channel = memoryChannel |
其他补充
如果flume sink连接hadoop,则需要拷贝hdoop配置和lib
拷贝hdoop配置,添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录
拷贝hdoop lib,将hadoop的jar包添加到flume的lib目录下
如果flume sink连接hive,则需要拷贝hive配置和lib
拷贝hive lib,将hive的一些jar拷贝过来 flume的lib目录下
修改flume.conf
vim /usr/local/flume/conf/flume.conf
a1.sources = r1 a1.channels = c1 a1.sinks = k1
# 指定Flume source(示例1) a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/log
# source(示例2) a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
# source(示例3) a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
# source(示例4) a1.sources.r1.type = exec a1.sources.r1.command = tail -f /var/log/httpd/access_log
# source(示例5) a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /data/logs a1.sources.r1.recursiveDirectorySearch = true
# source(示例6) a1.sources.r1.type = netcat a1.sources.r1.bind = masterhbase a1.sources.r1.port = 44444
# source(示例7) a1.sources.r1.type = exec a1.sources.r1.command = cat /data/emp.txt
# 指定Flume sink a1.sinks.k1.type = logger
# sink(示例4) a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://cancer/flume/webdata/roll/%y%m%d/%H a1.sinks.k1.hdfs.rollInterval = 600 a1.sinks.k1.hdfs.rollSize = 1048576 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue= 1 a1.sinks.k1.hdfs.roundUnit = hour
# sink(示例5) a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://cancer/flume/webdata/spooldir/%y%m%d/%H a1.sinks.k1.hdfs.rollInterval = 600 a1.sinks.k1.hdfs.rollSize = 1048576 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue= 1 a1.sinks.k1.hdfs.roundUnit = hour a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text
# 指定Flume channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# channel(示例5) a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs
# channel(示例6) a1.sinks.k1.type = hive a1.sinks.k1.hive.metastore = thrift://cancer01:9083 a1.sinks.k1.hive.database = flume_test a1.sinks.k1.hive.table = flume_user a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = "\t" a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age a1.sinks.k1.serializer.serdeSeparator = '\t'
# channel(示例7) a1.sinks.k1.type = hive a1.sinks.k1.hive.metastore = thrift://masterhbase:9083 a1.sinks.k1.hive.database = flume_test a1.sinks.k1.hive.table = emp a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = "\t" a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno a1.sinks.k1.serializer.serdeSeparator = '\t'
# 绑定source和sink到channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
启动
cd /usr/local/flume/bin
flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,consol
flume-ng agent --c conf --f conf/flume.conf --n a1 -Dflume.root.logger=INFO,consol
说明:
–-conf 或 --c 指定配置文件夹,包含flume-env.sh和log4j的配置文件 -–conf conf
-–conf-file 或 --f 配置文件地址 –-conf-file conf/flume.conf
–-name 或 --n agent名称 –-name a1
--z zookeeper连接字符串 --z zkhost:2181,zkhost1:2181
--p zookeeper中的存储路径前缀 --p /flume
使用avro-client发送文件
flume-ng avro-client -H localhost -p 4141 -F ./log.00
flume-ng avro-client --conf conf --host host --port 8080
示例5配置说明:
1.source:spooldir(已经生成好的最终的数据文件)
(1)recursiveDirectorySearch 是否监视子目录以查找要读取的新文件
(2)includePattern 正则表达式,指定要包含的文件 (只.csv数据文件,是正则匹配)
(3)ignorePattern 正则表达式,指定要忽略的文件 (不抽取.csv数据文件,是正则匹配)
(4)缺点:不能对目录文件进行修改,如果有追加内容的文本文件,是不允许的(有可能不会被抽取,有可能会有错误)
2.flume监控目录,支持文件修改,并记录文件状态
(1)source:taildir (类似exec + spooldir的组合)
(2)filegroups :设置source组 可设置多个 filegroups = f1
(3)filegroups.
(4)positionFile:设置定位文件的位置,以JSON格式写入给定位置文件上每个文件的最后读取位置
3.Memory Channel是一个不稳定的channel,它在内存中存储所有事件,如果进程异常停止,内存中的数据将不能让恢复,而且受内存大小的限制。
4.flie channel:是一个持久化的channel,数据安全并且只要磁盘空间足够,它就可以将数据存储到磁盘上
5.checkpointDir:检查数据完整性,存放检查点目录,可以检测出哪些数据已被抽取,哪些还没有
6.dataDirs:存放数据的目录,dataDirs可以是多个目录,以逗号隔开,用独立的多个磁盘上的多个目录可以提高file channel的性能。
7.hdfs上数据默认是二进制的文件类型:bin/hdfs dfs -text /
8.可以修改hdfs.fileType 改为DataStream(数据流)hdfs.writeFormat = Text 改为文本格式
9.当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;hdfs.codeC压缩编码解码器 --》snappy压缩
10.batchSize默认值:100 每个批次刷新到HDFS上的events数量;
示例6配置说明:
1.serializer: 负责解析事件中的字段并将它们映射到hive表中的列
(2)DELIMITED 普通文本
(2)json json文件 (不需要配置,JSON中的对象名称直接映射到Hive表中具有相同名称的列, 内部使用org.apache.hive.hcatalog.data.JsonSerDe)
2.DELIMITED:
serializer.delimiter:传入数据中的字段分隔符,用双引号括起来,例如"\t"
serializer.fieldnames:从输入字段到hive表中的列的映射,指定为hive表列名称的逗号分隔列表
serializer.serdeSeparator :输出字段分隔符,单引号括起来,例如'\t'
hive要做对应配置:
<!--以下配置为配合flume sink写入hive -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://cancer01:9083</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.in.test</name>
<value>true</value>
</property>
配置说明:
(1)报错:Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns
hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; -》 打开一部分事务支持
hive.compactor.initiator.on=true; -》运行启动程序和清除线程,用于打开所需参数的完整列表事务
hive.compactor.worker.threads=1; -》增加工作线程的数量将减少花费的时间
hive.support.concurrency=true; -》是否支持并发,默认是false
hive.enforce.bucketing=true; -》是否启用bucketing,写入table数据时会启动分桶
hive.exec.dynamic.partition.mode=nonstrict; -》设置非严格模式
(2)启动metastore时报错:
Table 'metastore.COMPACTION_QUEUE' doesn't exist
配置以下属性:这个是用来创建COMPACTION_QUEUE这张表的
(3)再启动metastore时报错:
Error rolling back: Can't call rollback when autocommit=true
去掉以下属性:之前没有安装,先安装:yum install -y nc
运行:
使用nc去连接,然后输入数据,数据以制表符分割
Hive中可以看到数据