1、source为http模式,sink为logger模式,将数据在控制台打印出来。
conf配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http #该设置表示接收通过http方式发送过来的数据
a1.sources.r1.bind = hadoop-master #运行flume的主机或IP地址都可以
a1.sources.r1.port = 9000#端口
#a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger#该设置表示将数据在控制台打印出来
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume命令为:
bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。
显示如下的信息表示启动flume成功。
895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
打开另外一个终端,通过http post的方式发送数据:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。
hadoop-master就是flume配置文件绑定的主机名,9000就是绑定的端口。
然后在运行flume的窗口就是看到如下的内容:
2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }
2、source为netcat(udp、tcp模式),sink为logger模式,将数据打印在控制台
conf配置文件如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master#绑定的主机名或IP地址
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transcationCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。
然后在另外一个终端,使用telnet发送数据:
命令为:telnet hadoop-maser 44444
[root@hadoop-master ~]# telnet hadoop-master 44444
Trying 192.168.194.6...
Connected to hadoop-master.
Escape character is '^]'.
显示上面的信息表示连接flume成功,然后输入:
12213213213
OK
12321313
OK
在flume就会收到相应的信息:
2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }
2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }
3、source为netcat/http模式,sink为hdfs模式,将数据存储在hdfs中。
conf配置文件如下,文件名为hdfs.conf:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系统中存放的位置
a1.sinks.k1.hdfs.filePrefix = events- #文件的前缀
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,这个设置是以text的格式存放从flume传输过来的数据。
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hdfs文件系统中创建文件存放的路径:
hadoop fs -mkdir /flume/event1。
启动flume:
bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
通过telnet模式向flume中发送文件:
telnet hadoop-master 44444
然后输入:
aaaaaaaa
bbbbbbb
ccccccccc
dddddddddd
通过如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:
-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070
-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556
-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557
-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215
-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216
-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217
通过hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的内容:
aaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbb
ccccccccccccccccccc
dddddddddddddddd
eeeeeeeeeeeeeeeeeee
fffffffffffffffffffffff
gggggggggggggggggg
hhhhhhhhhhhhhhhhhhhhhhh
iiiiiiiiiiiiiiiiiii
jjjjjjjjjjjjjjjjjjj
http模式就是把hdfs.conf文件中的netcat改为http,然后传输文件从telnet改为:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。
在hadoop文件中就会看到上面命令传输的内容:badou flume。
4、source为netcat/http模式,sink为hive模式,将数据存储在hive中,并分区存储。
conf配置如下,文件名为hive.conf:
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083
a1.sinks.k1.hive.database=default#hive数据库名
a1.sinks.k1.hive.table=flume_user1
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.hive.partition=3#如果以netcat模式,只能静态设置分区的值,因为netcat模式传输数据,无法传输某个字段的值,只能按照顺序来。这里设置age的分区值为3。
#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能动态设置分区的值,因为http模式可以动态传输age的值。
a1.sinks.k1.serializer.delimiter=" "
a1.sinks.k1.serializer.serderSeparator=' '
a1.sinks.k1.serializer.fieldnames=user_id,user_name
a1.sinks.k1.hive.txnsPerBatchAsk = 10
a1.sinks.k1.hive.batchSize = 1500
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hive中创建表:
create table flume_user(
user_id int
,user_name string
)
partitioned by(age int)
clustered by (user_id) into 2 buckets
stored as orc
在hive-site.xml中添加如下内容:
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
将hive根目录下的/hcatalog/share/hcatalog文件夹中的如下三个文件夹添加到flume的lib目录下。
运行flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
重新打开一个窗口,
启动metastroe服务:
hive --service metastore &
重新打开一个客户端,通过telnet连接到flume
telnet hadoop-master 44444
然后输入:
1 1
3 3
就会在hive中看到如下两行数据:
flume_user1.user_id flume_user1.user_name flume_user1.age
1 1 3
3 3 3
age是在hive.conf中设置的值3。
现在将flume的source换成http模式,然后hive分区通过参数模式动态的传输分区值。
将hive.conf中的
a1.sources.r1.type = netcat改成a1.sources.r1.type = http
a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。
然后启动flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
在重新打开的窗口中通过http的模式传输数据到flume
curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。
在hive中可以看到如下的数据:
flume_user1.user_id flume_user1.user_name flume_user1.age
11 ligongong 109
由此可以看出通过http模式传输数据到hive中时,分区字段的信息是在header中传输,而其他字段的信息是放在bady中传输,并且不同列之间以hive.conf文件定义好的分隔符分隔。
5、使用avro模式,将数据在控制台打印出来。
不同的agent之间传输数据只能通过avro模式。
这里我们需要两台服务器来演示avro的使用,两台服务器分别是hadoop-master和hadoop-slave2
hadoop-master中运行agent2,然后指定agent2的sink为avro,并且将数据发送的主机名设置为hadoop-slave2。hadoop-master中flume的conf文件设置如下,名字为push.conf:
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= netcat
a2.sources.r1.bind= hadoop-master
a2.sources.r1.port = 44444
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro#制定sink为avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= hadoop-slave2#指定sink要发送数据到的目的服务器名
a2.sinks.k1.port= 44444#目的服务器的端口
hadoop-slave2中运行的是agent1,agent1的source为avro。flume配置内容如下,文件名为pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= hadoop-slave2
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000。
现在hadoop-slave2中启动flume,然后在hadoop-master中启动flume,顺序一定要对,否则会报如下的错误:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address
在hadoop-slave2中启动flume:
bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
在hadoop-master中启动flume:
bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console
重新打开一个窗口,通过telnet连接到hadoop-master
telnet hadoop-master 44444
然后发送11111aaaa
在hadoop-slave2的控制台中就会显示之前发送的,11111aaaa,如下所示:
2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }
6、通过flume将数据通传输到kafka,然后通过kafka将数据存储在hdfs和hive中。
首先要配置kafka。配置kafka请参考:https://blog.csdn.net/zxy987872674/article/details/72466504
在分别在hadoop-master、hadoop-slave1、hadoop-slave2上启动zookeeper。
命令为:
然后启动kafka,进入kafka的安装目录,执行命令:
./bin/kafka-server-start.sh config/server.properties &
在kafka中创建topic:
bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka
查看kafka中的topic:
bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181
启动kafka的消费者:
./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka
配置flume中conf文件,设置source类型为exec,sink为org.apache.flume.sink.kafka.KafkaSink,设置kafka的topic为上面创建的flume_kafka,具体配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#设置sources的类型为exec,就是执行命令的意思
a1.sources.r1.type = exec
#设置sources要执行的命令
a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt
# 设置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置kafka的broker地址和端口号
a1.sinks.k1.brokerList=hadoop-master:9092
# 设置Kafka的topic
a1.sinks.k1.topic=flume_kafka
# 设置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume:
只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有数据时flume就会加载kafka中,然后被上面启动的kafka消费者消费掉。
我们查看发现/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的数据:
131,dry pasta
131,dry pasta
132,beauty
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
134,specialty wines champagnes
134,specialty wines champagnes
134,specialty wines champagnes