缘由
使用kombu
读取队列数据的时候报如下错误
amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg ‘x-max-priority’ for queue ‘douyin.pg.logo.ready’ in vhost ‘douyin_pggolden’: received none but current is the value ‘5’ of type ‘signedint’
错误原因
从队列中读取消息的代码如下
from kombu import Connection
rabitmq_user = "admin"
rabitmq_pwd = "admin"
rabitmq_ip = "8.8.8.8"
queue_name = "example.queue"
virtual_host = "example"
src_rabitmq_host = "amqp://%s:%s@%s:5672/%s"%(rabitmq_user,rabitmq_pwd,rabitmq_ip,virtual_host)
conn = Connection(src_rabitmq_host)
rec = conn.SimpleQueue(queue_name)
while True:
msg = rec.get(block=True, timeout=1)
msg.ack()
#从队列中获取消息
print(msg.payload)
break
因为rabitmq的队列中设置了x-max-priority
解决办法
我们需要将SimpleQueue
改成Queue
即可
- 使用kombu
from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
from kombu.utils.functional import reprcall
from kombu import Exchange, Queue
logger = get_logger(__name__)
task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri',queue_arguments={'x-queue-mode': 'lazy', 'x-max-priority': 10},max_priority=5)]
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
accept=['pickle', 'json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
fun = body['fun']
args = body['args']
kwargs = body['kwargs']
logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwargs)
except Exception as exc:
logger.error('task raised exception: %r', exc)
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
# setup root logger
setup_logging(loglevel='INFO', loggers=[''])
with Connection('amqp://guest:guest@localhost:5672//') as conn:
try:
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print('bye bye')
- 使用celery和kombu
from celery import Celery
from kombu import Exchange, Queue, binding
def route_task(exchange_name, exchange_type):
def wrapper(name, args, kwargs, options, task=None, **kw):
return {
'exchange': exchange_name,
'exchange_type': exchange_type,
'routing_key': name
}
return wrapper
def create_task_queues(binding_list, exchange):
binding_map = {}
queues = []
for routing_key, queue_name in binding_list:
binding_map.setdefault(queue_name, [])
binding_map[queue_name].append(routing_key)
for queue_name, routing_keys in binding_map.items():
queues.append(
Queue(
queue_name,
[binding(exchange, routing_key=routing_key) for routing_key in routing_keys],
queue_arguments={'x-queue-mode': 'lazy', 'x-max-priority': 10},max_priority=5
)
)
return queues
class ExampleConfig(object):
broker_url = 'amqp://guest:guest@localhost:5672//'
task_default_exchange = 'example'
task_default_exchange_type = 'topic'
bindings = [
('example.queue', 'example.queue')
]
task_queues = create_task_queues(bindings, Exchange(task_default_exchange, type=task_default_exchange_type))
task_routes = (route_task(task_default_exchange, task_default_exchange_type),)
app_example= Celery("example")
app_example.config_from_object(ExampleConfig)
@app_example.task(name="example.queue")
def pg_logo_recon(**kwargs):
pass