confluent-kafka demo

安装:confluent-kafka 

pip install confluent-kafka 我直接在PyCharm里面安装

启动zk, 启动kafka server

查看已有topic

./kafka-topics.sh --zookeeper localhost:2181 --list

创建topic test

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test

控制台发送topic

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

消费者, 自动commit消息

from time import sleep

from confluent_kafka import Consumer, KafkaError

mybroker = "127.0.0.1:9092"

c = Consumer({
    bootstrap.servers: mybroker,
    group.id: mygroup,
    client.id: gxf,
    enable.auto.commit: True,
    default.topic.config: {
    auto.offset.reset: earliest
    }
})

c.subscribe([test])

while True:
    msg = c.poll(1.0)
    # print("msg:", msg)
    if msg is None:
        continue
    if msg.error():
        print("msg error")
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print(Received message: {}.format(msg.value().decode(utf-8)))
    sleep(1)

c.close()

 

confluent-kafka demo

上一篇:python处理csv文件里面得数据


下一篇:1016 Phone Bills (25 分)