我是新来的芹菜,有一个问题.我有一个简单的任务:
@app.task(name='test_install_queue')
def test_install_queue():
return subprocess.call("exit 0",shell=True)
我稍后在一个测试用例中调用此任务
result = tasks.test_default_queue.apply_async(queue="install")
该任务在队列安装中成功运行(因为我在celery日志中看到了它,并且可以正常完成.但是我想以编程方式从存储在结果中的对象中查找任务test_install_queue在哪个队列中运行的方式.
谢谢!
编辑:
我将任务更改为:
@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
return self.request.__dict__
然后我按如下方式使用apply_async的结果:
result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]
解决方法是该工作程序(主机名)与该工作程序中初始化的唯一队列的名称相同.
解决方法:
您可以尝试以下方法:
delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
if queue.exchange.name == delivery_info['exchange']
and queue.routing_key == delivery_info['routing_key']:
original_queue = queue.name
break
该方法基于以下假设:您使用默认的芹菜设置,并且您的交换是直接的.如果您需要用于扇出和主题交换的通用解决方案,则必须检查app.amqp.queues中每个已声明队列的路由键.