先说下问题:
正常使用kafka消费者,接收消息时,会出现消息循环无法结束问题,增加参数 consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka。(例子如下)
consumer.py文件:
from kafka import KafkaProducer, KafkaConsumer
import time
class KafkaClient(object):
topic = "topic" # 使用的kafka的topic
client = "0.0.0.0:19823" # kafka所在的服务地址
group_id = "test_consumer_group" # kafka组信息
@staticmethod
def log(log_str):
t = time.strftime(r"%Y-%m-%d_%H:%M:%S", time.localtime())
print("[%s]%s" % (t, log_str))
def info_send(self, key, info_str):
"""key: 发送信息的key;info_str:要发送的信息内容"""
producer = KafkaProducer(bootstrap_servers=[self.client])
producer.send(self.topic, key=key.encode("utf-8"), value=info_str.encode("utf-8"))
# 批量提交可以使用 producer.flush()
producer.close()
def message_consumer():
# consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka
consumer = KafkaConsumer(self.topic, group_id=self.group_id, bootstrap_servers=[self.client], consumer_timeout_ms=3000)
for msg in consumer:
# partition:消息所在的分区,offset:消息所在分区的位置,key:消息的key,value:消息的内容
print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)