Celery 使用(一)
架构
- Producer:任务发布者;
- Celery Beat:任务调度器,Beat进程会读取配置文件中的内容,周期性的将配置中到期需要执行的任务发送给任务队列;
- Broker:消息代理,接受生产者的任务消息,存进队列后发送给消费者;
- Celery Worker:执行任务的消费者;
- Result Backend:保存消费者执行任务完后的结果;
如下图:
整体的流程,任务发布者或者是任务调度,将任务发送到消息代理中,接着消息代理将任务发送给消费者来执行,其执行完后将结果保存到backend中。
使用
先创建Celery实例和相关的任务,这里配置Celery可以通过创建一个配置文件,然后通过app.config_from_object('proj.celeryconfig')
加载即可。
task.py如下
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost:5672//')
@app.task
def add(x, y):
return x + y
接下来,创建相关的任务发布者,run_task.py如下
from task2 import add
result = add.delay(1,1)
import time
while not result.ready():
print('not yet ready')
time.sleep(1)
print(result.get())
然后在项目目录中,通过命令执行Celery,命令如下:
celery -A task2.app worker --loglevel=info
如果看到命令行里显示该实例的所有任务说明执行成功,最后再开一个进程执行任务,接着就会看到命令行中的提示信息,说明消费者执行成功,并且将结果保存到backend中。
一个简单的用例就结束了。
疑惑
之前有一个疑惑的点在于,既然RabbitMQ可以实现任务队列,要Celery有何用呢?
从我搜索到的知识来看,有以下几点:
- Celery可以支持多种消息代理,不仅仅是RabbitMQ,还有Redis、MongoDB等等;
- RabbitMQ是消息队列,其可以应用到各种场景中,包括任务分发系统,也就是需要自己实现一个Celery,而Celery本身就是一个任务分发系统,显然简化了代码,不需要重复造*;