Python RabbitMQ 消息队列监听

# coding: utf-8 # 测试消息消费 import datetime import logging as log import os from pathlib import Path from typing import List import pika # 设置日志格式 Path("./logs").mkdir(parents=True, exist_ok=True) os.chdir("./logs/") log_file_name = datetime.date.today().strftime("%Y-%m-%d") log_format = ( "%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s" ) log.basicConfig( level=log.INFO, filename="python-check-" + log_file_name + ".log", datefmt="%Y-%m-%d %H:%M:%S", format=log_format, encoding="utf-8", ) class RabbitMQConsumer: def __init__(self, host="localhost", queue_name="test-queue", batch_size=5): self.host = host self.queue_name = queue_name self.batch_size = batch_size self.connection = None self.channel = None self.message_count = 0 self.messages: List[pika.spec.Basic.Deliver] = [] self.delivery_tags: List[int] = [] def conn(self): log.info("测试消费者 连接RabbitMQ开始!") self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host) ) self.channel = self.connection.channel() # 声明队列 self.channel.queue_declare(queue=self.queue_name, durable=True) # 设置QoS,限制未确认的消息数量 self.channel.basic_qos(prefetch_count=self.batch_size) log.info("测试消费者 连接RabbitMQ成功!") def close(self): log.info("测试消费者 关闭RabbitMQ连接!") if self.connection and not self.connection.is_closed: self.connection.close() def start_consuming(self): log.info("测试消费者 监听开始!") try: # 确保已连接 if not self.connection or self.connection.is_closed: self.conn() log.info(f"测试消费者 监听队列:{self.queue_name}") # 设置消费回调 self.channel.basic_consume( queue=self.queue_name, on_message_callback=self.customer, ) # 开始消费 self.channel.start_consuming() except KeyboardInterrupt: log.error("测试消费者 停止消费!") self.close() except Exception as e: log.error(f"测试消费者 发生错误 停止消费:{str(e)}") self.close() def customer(self, ch, method, properties, body): log.error(f"测试消费者 接受到消息:{body.decode()}") try: # 打印消息内容 print(f"测试消费者 接受到消息:{body.decode()}") # 设置计数器和列表 消息达到batch_size才消费 self.messages.append(body) self.delivery_tags.append(method.delivery_tag) self.message_count += 1 # 当达到批处理大小时,进行批量确认 if self.message_count >= self.batch_size: print(f"\n批量处理完成 {self.batch_size} 条消息") print("消息内容:", [msg.decode() for msg in self.messages]) # 确认所有消息 ch.basic_ack(delivery_tag=self.delivery_tags[-1], multiple=True) # 重置计数器和列表 self.message_count = 0 self.messages = [] self.delivery_tags = [] except Exception as e: log.error(f"测试消费者 处理消息异常:{str(e)}") # 发生错误时,拒绝消息并重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) if __name__ == "__main__": # 创建消费者实例并开始消费 consumer = RabbitMQConsumer(host="localhost", queue_name="test-queue", batch_size=5) consumer.start_consuming()
上一篇:巡飞单机多旋翼无人机技术详解


下一篇:模拟退火算法(Simulated Annealing)详细解读