kafka安装使用配置1.2

进入cd /usr/local/flume/conf/

vi kafka.conf

配置

agent.sources=s1

agent.channels=c1

agent.sinks=k1

agent.sources.s1.type=exec

agent.sources.s1.command=tail -F /tmp/logs/kafka.log

agent.sources.s1.channels=c1

agent.channels.c1.type=memory

agent.channels.c1.capacity=10000

agent.channels.c1.transactionCapacity=100

#设置Kafka接收器

agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

#设置Kafka的broker地址和端口号

agent.sinks.k1.brokerList=192.168.16.100:9092

#设置Kafka的Topic

agent.sinks.k1.topic=kafkatest

#设置序列化方式

agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.channel=c1

vi kafka.sh

for ((i=0;i<=1000;i++));

do

echo "kafka_test-"+$i >> /tmp/logs/kafka.log;

done

克隆一个窗口

cd /tmp/

mkdir logs

cd logs/ ls

cat kafka.log

另一个窗口touch /tmp/logs/kafka.log

sh kafka.sh

直接完事也太快了需要加延迟sleep 10

rm -rf * 干点所有

sh kafka.sh

cat logs里面的文件

老命令

kafka-server-start.sh -daemon config/server.properties &

新命令(一般用新)

kafka-server-start.sh /usr/local/kafka/config/server.properties &

克隆一个窗口

kafka-topics.sh --create --zookeeper 192.168.16.100:2181 --replication-factor 1 --partitions 1 --topic kafkatest

d.打开新终端,在kafka安装目录下执行如下命令,生成对topickafkatest的消费

kafka-console-consumer.sh --bootstrap-server 192.168.16.100:9092 --topic kafkatest --from-beginning

e.启动flum

flume-ng agent --conf . --conf-file /usr/local/flume/conf/kafka.conf --name agent -D flume.root.logger=DEBUG,console

flume-ng agent --name agent --conf . --conf-file kafka.conf -Dflume.root.logger=DEBUG,console

kafka安装使用配置1.2

上一篇:第1节 kafka消息队列:10、flume与kafka的整合使用


下一篇:非kerberos环境下,flume采集日志到kafka