简介
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等
主要应用场景是:日志收集系统和消息系统。
快速搭建测试
这里使用的docker进行快速构建服务测试
# 拉取基础镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
# 启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
# 启动kafka
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.38.94:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.38.94:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
# 队列先消费 后生产
# cd /opt/kafka
# 先消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group tt1 --from-beginning
# 生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
程序推送测试
from kafka import KafkaProducer
import json
import datetime
topic='test'
producer = KafkaProducer(bootstrap_servers='192.168.38.94:9092',value_serializer=lambda m:json.dumps(m).encode("utf-8")) # 连接kafka
for i in range(10, 20):
data={"num":i,"ts":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
producer.send(topic,data)
producer.close()