一 前言
开发自动化管理平台的过程中,有执行时间较长的任务比如安装基础软件,备份恢复;有定时执行的任务比如定期收集元数据,检查慢日志数量等等,我们可以自己开发一套任务系统,当然也可以依赖Celery 实现上述功能。
二 Celery 是什么?
2.1 概念
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度,支持异步执行任务。更令人欣喜的是常见的Python的web框架都能和Celery 耦合,给广大开发者带来极大的便利。Celery 是开源的,使用 BSD 许可证 授权。
2.2 原理
Celery 实现异步调用的原理核心其实是将任务执行单元 worker 和 任务派发单元 分开,从而达到异步的效果;
Celery将需要执行的任务发送到消息队列中,然后再由任务执行单元根据具体的配置(绑定到具体哪个队列,默认为defaults)从消息队列中获取任务执行,这样就实现了异步的效果。
2.3 架构
Celery 使用简洁的模块架构提供了完整的功能,上手容易,部署简单。主要包含消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store).我们使用一张图展示Celery运行机制。
task producer - 任务生产者
顾名思义就是发起调度任务的,然后交给任务队列去处理。简单的Python代码、耦合在Django/Flask Web 服务里请求任务比如调用备份或者调用初始化安装机器的任务,在程序里面调用Celery任务装饰的函数,产生任务并分发到任务队列处理的,我们都可以称之为任务生产者。
celery beat - 任务调度器
Celery beat 是 Celery 系统自带的任务生产者,它以独立进程的形式存在,该进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。需要注意的是在一个Celery系统中,只能存在一个 Celery beat 调度器。
broker - 任务代理
其实broker就是一个队列存储,是负责接收task producer发送的任务消息,存储到队列之后再进行调度,分发给任务消费方 (celery worker)。常见的broker有RabbitMQ、Redis 等。
celery worker - 任务消费方
Celery worker 就是任务的执行者,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。Celery worker 对应的就是操作系统中的一个进程。Celery 支持分布式部署和横向扩展,我们可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。
Result Stores/backend
存储Celery worker 执行任务之后的结果和状态信息,以供应用系统查询任务的状态信息。Celery 内置支持Django ORM,Redis,RabbitMQ 等方式来保存任务处理后的状态信息。
三 快速入门
3.1 安装
本例子使用redis作为存储任务消息的介质,需要提前安装redis 并启动相关服务。
用 pip 安装:
pip install -U Celery
用 easy_install 安装:
easy_install -U Celery
3.2 部署
我们先使用一个单一程序文件包含所有celery相关的配置作为例子。要使用celery,就需要先初始化一个celery实例,配置好broker和backend为redis。编写程序文件。目录结构
python/
tasks.py
client.py
tasks.py
import time
from celery import Celery
app = Celery('tasks',
broker='redis://localhost:6379/1',
backend='redis://localhost:6379/1')
@app.task
def add(x, y):
time.sleep(2)
return x + y
@app.task
def mul(x, y):
time.sleep(5)
return x * y
在和tasks 同一层级执行
celery -A tasks worker -l info
这里需要说明的是
命令行执行celery worker -A app -l info时, app 必须可导入,app 可以为py模块或包,本例为tasks 。不管是包还是模块都必须正确指定Celery入口文件(如果为包则默认的入口文件名为 celery.py )的绝对导入名称(proj.celery),但是从工程上我们推荐在包的__init__.py 文件进行celey的初始化。
另起一个命令行 本例子使用ipython
In [5]: from tasks import add,mul
In [6]: add.delay(3,3)
Out[6]:
In [7]: add.delay(3,7)
Out[7]:
In [8]: r=add.delay(6,10)
In [10]: r.status
Out[10]: u'SUCCESS'
In [11]: r.result
Out[11]: 16
In [14]: r.ready()
Out[14]: True
In [15]: r.get()
Out[15]: 16
从例子可以看到,直接调用add 函数并未直接返回 6 ,10 这样的算术结果而是返回 AsyncResult 对象。程序中可以使用这个对象检查任务状态,等待任务执行完成,获取任务结果,如果任务失败,它会返回异常信息或者调用栈。
将AsyncResult赋值给r, 可以通过调用 r.get() 或者 r.result 取结任务的结果,r.status,r.ready()获取任务的执行状态。
我们在优化一下上面的调用方式,编写一段小程序 client.py ,做同步调用 当然我们也可以做异步调用。
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import time
from tasks import add,mul
add_ret = add.delay(4, 4)
mul_ret = mul.delay(5, 5)
while not add_ret.ready():
time.sleep(1)
print "waiting for task to be done "
print 'add task done: {0}'.format(add_ret.get())
while not mul_ret.ready():
time.sleep(2)
print "waiting for task to be done "
print 'mul task done: {0}'.format(mul_ret.get())
执行 python client.py
celery 输出的log如下
因为tasks任务中add 和mul函数都设置了等待 sleep,可以看出调用 add_ret.ready() 的时候并未直接返回结果,而是等待了具体的时间之后才返回。真实项目中我们需要改写 client.py ,利用Celery的异步执行特性。
四 小结
本文浅显的介绍了celery的架构和如何使用。Celery并不是一个队列,而是一套任务管理平台,通过队列实现任务的异步功能。有计划开发自己独立运维平台的还没有使用过celery朋友可以尝试用起来。