在你的应用中使用Celery
我们的项目
proj/__init__.py
/celery.py
/tasks.py
1 # celery.py 2 from celery import Celery 3 ? 4 app = Celery(‘proj‘, 5 broker=‘amqp://‘, # 消息中介(我更喜欢叫消息枢纽) 6 backend=‘rpc://‘, # 后端,跟踪任务状态和结果 7 include=[‘proj.tasks‘]) # 引入指定的任务,即tasks.py 8 ? 9 # Optional configuration, see the application user guide. 10 app.conf.update( 11 result_expires=3600, # 设置结果超时为1小时 12 ) 13 ? 14 if __name__ == ‘__main__‘: 15 app.start() 16 # tasks.py 17 from .celery import app 18 ? 19 ? 20 @app.task 21 def add(x, y): 22 return x + y 23 ? 24 ? 25 @app.task 26 def mul(x, y): 27 return x * y 28 ? 29 ? 30 @app.task 31 def xsum(numbers): 32 return sum(numbers)
启动worker
$ celery -A proj worker -l INFO ? # 启动成功就会看到一下展示 --------------- celery@halcyon.local v4.0 (latentcall) --- ***** ----- -- ******* ---- [Configuration] - *** --- * --- . broker: amqp://guest@localhost:5672// - ** ---------- . app: __main__:0x1012d8590 - ** ---------- . concurrency: 8 (processes) # 指的是当前可使用的进程有8个,默认是CPU的核数(进程池) - ** ---------- . events: OFF (enable -E to monitor this worker) - ** ---------- - *** --- * --- [Queues] -- ******* ---- . celery: exchange:celery(direct) binding:celery --- ***** ----- ? [2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
Celery支持进程池、Eventlet、Gevent以及运行一个简单的线程
在入门的时候说了delay()是调用任务的方法,但它是apply_async()的快捷方式
1 add.apply_async((2,2),queue=‘lori‘,countdown=10) 2 # 第一个参数是个元组,是用来向add这个方法传值的 3 # queue:任务将会发送到名为‘lori’的队列中 4 # countdown:发送消息10秒后执行任务 5 # delay()只能传add方法的参数,而apply_async()不仅为add传参,还可以对消息做相关处理
delay()和apply_async()都会返回一个AsyncResult对象,用于任务执行的状态,但是必须另结果后台可用,从而将数据保存到某处(结果默认是不保存的)
1 res = add.delay(2,2) 2 res.get(timeout=1) # 获取任务执行结果,超时为1秒 3 res.id # 任务的ID 4 res.get(propagate=False) # 屏蔽掉具体的异常展示 5 res.failed() # 任务执行失败返回True,成功返回False 6 res.successful() # 任务执行成功返回True,失败返回False 7 res.state # 返回任务的当前状态 8 # 启动状态是一种特殊的状态,只有当task_track_started设置是启用的,或者为任务设置了@task(track_started=True)选项时,才会记录该状态。 9 # 实际上PENDING状态不会被记录,所以 10 from proj.celery import app 11 res = app.AyncResult(‘this-id-does-not-exist‘) # 这样在任务ID不存在的情况下,显示默认的状态 12 res.state 13 # ‘PENDING‘
Canvas:设计工作流
Celery提供了签名(signature),是用来封装任务的参数以及执行选项
与delay()和apply_async不同,签名不会运行
1 s1 = add.signature((2,2),countdown=10) # 或者简写s1 = add.s(2,2);s()是signature()的快捷方式,像delay(),凡是快捷方式都没有操作选项的参数 2 res = s1.delay() # 运行最后还是要用过delay()和apply_async() 3 4 # 以下两句是等价的 5 add.apply_async(args, kwargs, **options) 6 add.signature(args, kwargs, **options).apply_async() 7 8 # 签名还可以进行克隆 9 s = add.s(2) 10 >>> proj.tasks.add(2) 11 12 s.clone(args=(4,), kwargs={‘debug‘: True}) 13 >>> proj.tasks.add(4, 2, debug=True)
部分参数拼接args
1 s1 = add.s(2) # 2 res = s1.deplay(8) # 实际执行的是s1.deplay(8,2)
参数覆盖kwargs
s2 = add.s(2,2,debug=True)
s2.delay(debuy=False)
原语
-
Groups:组,用于并行调用任务,返回一个结果实例
1 from celery import group 2 from proj.tasks import add 3 4 group(add.s(i, i) for i in range(10))().get() 5 >>>[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 6 # 部分组,与部分签名一样 7 g = group(add.s(i) for i in range(10)) 8 g(10).get() 9 >>>[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
-
Chains:链,用于串行调用任务,待一个任务返回结果后再调用下一个任务
1 from celery import chain 2 from proj.tasks import add, mul 3 4 # (4 + 4) * 8 5 chain(add.s(4, 4) | mul.s(8))().get() # 使用| 可以将上一个任务的结果作为参数,传到下一个任务签名中 6 >>> 64 7 # (? + 4) * 8 8 g = chain(add.s(4) | mul.s(8)) 9 g(4).get() 10 >>> 64 11 (add.s(4, 4) | mul.s(8))().get() # 简写可以去掉chain 12 >>> 64
-
Chords:弦,是一个带有回调的组,即串行和并行结合
1 from celery import chord 2 from proj.tasks import add, xsum 3 4 chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() # 将并行的结果实例传到串行的方法中执行 5 >>> 90 6 (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get() # 链接到其他任务的组将自动转换为和弦 7 >>> 90 8 upload_document.s(file) | group(apply_filter.s() for filter in filters)
路由
Celery支持RabbitMQ的所有路由,可将消息发送到指定的任务队列
1 # rabbitMQ路由的配置 2 app.conf.update( 3 task_routes = { 4 ‘proj.tasks.add‘: {‘queue‘: ‘hipri‘}, 5 }, 6 ) 7 ------------------------------ 8 # celery发送消息到指定队列 9 from proj.tasks import add 10 add.apply_async((2, 2), queue=‘hipri‘)
可以使用celery -Q指定队列
$ celery -A proj worker -Q hipri # 可以在运行worker的时候指定 $ celery -A proj worker -Q hipri,celery # 用逗号分割指定多个
远程控制
-
检查
# celery -A proj inspect --help 检查 celery -A proj inspect active celery -A proj inspect active --destination=celery@example.com # 指定worker celery -A proj status # 显示所有worker的状态列表
-
控制
# celery -A proj control --help 控制 celery -A proj control enable_events # 远程启用事件 celery -A proj events --dump # 启动事件后,可以启动事件转储程序,并行查看woker执行状况 celery -A proj control disable_events # 远程禁用事件