一.测试在某个文件里面追加数据,然后流到flume,再流到kafka
1.配置flume
# Please paste flume.conf here. Example:
# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources = source1 fileSource
tier1.channels = channel1 fileChannel
tier1.sinks = sink1 fileSink
# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type = netcat
tier1.sources.source1.bind = 127.0.0.1
tier1.sources.source1.port = 9999
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type = memory
tier1.sinks.sink1.type = logger
tier1.sinks.sink1.channel = channel1
# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 100
#测试在某个文件里面追加数据,然后流到flume,再流到kafka
#tier1.sources =fileSource
#tier1.channels = fileChannel
#tier1.sinks = fileSink
#监听/home/flumeTest/下的fileFlumeKafka.txt文件的数据的追加
tier1.sources.fileSource.type =exec
tier1.sources.fileSource.command = tail -F /home/flumeTest/fileFlumeKafka.txt
#tier1.sources.fileSource.fileHeader = false
tier1.sources.fileSource.channels = fileChannel
#configure host for source 将输出Event: { headers:{hostname=master01} body: 7A 68 6F 75 6C 73 0D zhouls. }
tier1.sources.fileSource.interceptors = i1
tier1.sources.fileSource.interceptors.i1.type = host
tier1.sources.fileSource.interceptors.i1.useIP = false
tier1.sources.fileSource.interceptors.i1.hostHeader = hostname
tier1.channels.fileChannel.type = memory
tier1.channels.fileChannel.capacity = 10000
tier1.channels.fileChannel.transactionCapacity = 1000
#set sink1
tier1.sinks.fileSink.channel = fileChannel
tier1.sinks.fileSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.fileSink.topic = fileFlumeTest
tier1.sinks.fileSink.brokerList =master01:9092
tier1.sinks.fileSink.requiredAcks = all
tier1.sinks.fileSink.batchSize = 100
刷新flume配置
2.创建topic
/opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35/lib/kafka/bin/kafka-topics.sh --zookeeper slave02:2181 --topic fileFlumeTest --replication-factor 2 --partitions 1 --create
3.创建文件,并追加数据
[root@master01 flumeTest]# pwd
/home/flumeTest
[root@master01 flumeTest]# vi fileFlumeKafka.txt
1,zhangsan,23
2,lisi,24
3,wangwu,25
4,lier,12
5,sadm,54
[root@master01 flumeTest]# echo "6,kevin,54">>fileFlumeKafka.txt
[root@master01 flumeTest]# echo "7,tutengfei,54">>fileFlumeKafka.txt
[root@master01 flumeTest]# echo "8,tutengfei,54">>fileFlumeKafka.txt
4.启动kafka消费者,查看是否有数据