flume+kafka 监控日志文件

(一) 下载 :wget http://mirror.bit.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz

tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /home/hadoop


(二)编辑工程配置文件 vi /home/hadoop/flume-180/conf/kafka_sink.conf 内容如下
###############################################################################
#example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#a1.sources.r1.type = netcat
#a1.sources.r1.bind = localhost
#a1.sources.r1.port = 44444

#use log file monitor
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/flume-180/logdemo/log.log

 

# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_test1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.91.112:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

###############################################################################


创建topic (已经创建过)
bin/kafka-topics.sh --create --bootstrap-server 192.168.91.112:9092 --replication-factor 2 --partitions 2 --topic topic_test1

查看主题列表

bin/kafka-topics.sh --list --bootstrap-server 192.168.91.112:9092

启动控制台生产者

bin/kafka-console-producer.sh --broker-list 192.168.91.112:9092 --topic topic_test1

启动控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.91.113:9092 --topic topic_test1 --from-beginning

启动flume

./bin/flume-ng agent --conf /usr/local/flume/conf --conf-file /home/hadoop/flume-180/conf/kafka_sink.conf --name a1 -Dflume.root.logger=INFO,console


用telnet 方式 发送消息
telnet 127.0.0.1 44444

{"a": 1, "b": [1, 2, 3]}

在log 文件中添加日志消息

# while true ;do echo ' new mesaage {"a": 1, "b": [1, 2, 3]} from app ....' >>/home/hadoop/flume-180/logdemo/log.log ; sleep 1 ; done

##以上命令 注意要关闭
####
在kafka消费端可以看到 telnet 或者 log 文件的的消息

new mesaage {"a": 1, "b": [1, 2, 3]} from app ....

上一篇:05 flume将数据导入到Hive


下一篇:Flume学习笔记