一般使用celery来做Django的异步消息队列
先安装必要的包
pip3 install celery
我的项目目录结构:
celeryApp.py
1 import celery 2 import os 3 4 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "videoApp.settings") 5 6 import django 7 django.setup() 8 9 app = celery.Celery(main="celery_app", broker='redis://localhost:6379/2') 10 11 app.autodiscover_tasks(['apps.video_manager'])
app.autodiscover_tasks()
接收参数为tasks.py文件所在的包
看一下 autodiscover_tasks的源码:
看注释,说的很清楚,如果你的tasks.py文件在 foo.bar.tasks.py, 那么参数传递 foo.bar,更改方法接受列表,会将列表里的所有tasks.py的任务都注册
任务函数:
apps/video_manager/tasks.py
1 from videoApp.celeryApp import app 2 import time 3 4 @app.task(name="fir_task") 5 def my_task(): 6 with open('yeah.txt', 'w') as f: 7 f.write(str(time.time())) 8 return "success"
现在在项目根目录执行命令,开启celery
celery -A videoApp.celeryApp worker -l info
控制台会显示info信息,注意看:
如果tasks里的函数出现在了这里,说明任务注册成功了,否则注册失败,如果注册失败,在进行调用的时候,会报错,任务未注册
任务里的return,return出去的数据仅仅是作为日志显示用的,目前未发现有别的用处。
需要补充的是,任务一旦注册,代码就会写到内存里,这时候即便更新了任务代码再调用,也是执行的旧代码,需要重启celery才会再次更新新代码。
任务调用方式:
1 from apps.video_manager.tasks import my_task 2 3 my_task.delay(*args, **kwargs)
delay的参数就是任务接受的参数,如果注册的任务接受参数,那么调用时将参数传递给delay即可。