我正在用Celery替换一些自己开发的代码,但很难复制当前的行为.我希望的行为如下:
>创建新用户时,应使用user.created路由键将消息发布到任务交换.
>此消息应触发两个Celery任务,即send_user_activate_email和check_spam.
我尝试通过使用ignore_result = True参数定义user_created任务,以及send_user_activate_email和check_spam的任务来实现此功能.
在我的配置中,我添加了以下路由和队列定义.当消息传递给user_created队列时,它不会传递给其他两个队列.
理想情况下,消息仅传递给send_user_activate_email和check_spam队列.使用vanilla RabbitMQ时,消息将发布到交换机,队列可以绑定到该交换机,但Celery似乎直接向队列发送消息.
我如何在Celery中实现上述行为?
CELERY_QUEUES = {
'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'send_user_activate_email': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'check_spam': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
}
解决方法:
听起来你期望一条消息被两个队列触发/消耗,但这不是Celery的工作方式. Exchange会将任务发布到符合条件的队列,但一旦被使用,其他队列将忽略该消息.每个要触发的任务需要一条消息.
新的Celery用户经常会感到困惑,因为在这个系统中有两种“队列”用途; Queue()和文档引用的Kombu队列,以及AMQP队列,它们直接保存消息并由工作人员使用.当我们发布到队列时,我们会想到AMQP,这是不正确的. (感谢下面的回答).
回到你的问题,如果我理解正确,当使用user_created时,你希望它产生另外两个任务; send_user_activate_email和check_spam.此外,这些不应相互依赖;它们可以在不同的机器上并行运行,而不需要知道彼此的状态.
在这种情况下,您希望user_created为“apply_async”这两个新任务并返回.这可以直接完成,或者您可以使用包含check_spam和send_user_activate_email的Celery“Group”来实现此目的.这个小组提供了一些不错的速记,并为你的任务提供了一些结构,所以我个人推动你的方向.
#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
此设置将创建四个消息;一个用于您要执行的每个任务,另外一个用于Group(),它本身将具有结果.
在您的情况下,我不确定Exchange或ignore_result是否必要,但我需要查看任务代码并更多地了解系统以做出判断.
http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups
http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys
Why do CELERY_ROUTES have both a “queue” and a “routing_key”?
(如果我离开了,我会删除/删除答案……)