Celery进阶

Celery进阶

在你的应用中使用Celery

我们的项目

proj/__init__.py
  /celery.py
  /tasks.py
 1 # celery.py
 2 from celery import Celery
 3 ?
 4 app = Celery(proj,
 5              broker=amqp://,  # 消息中介(我更喜欢叫消息枢纽)
 6              backend=rpc://,  # 后端,跟踪任务状态和结果
 7              include=[proj.tasks])    # 引入指定的任务,即tasks.py
 8 ?
 9 # Optional configuration, see the application user guide.
10 app.conf.update(
11     result_expires=3600,    # 设置结果超时为1小时
12 )
13 ?
14 if __name__ == __main__:
15     app.start()
16 # tasks.py
17 from .celery import app
18 ?
19 ?
20 @app.task
21 def add(x, y):
22     return x + y
23 ?
24 ?
25 @app.task
26 def mul(x, y):
27     return x * y
28 ?
29 ?
30 @app.task
31 def xsum(numbers):
32     return sum(numbers)

启动worker

$ celery -A proj worker -l INFO
?
# 启动成功就会看到一下展示
--------------- celery@halcyon.local v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)            # 指的是当前可使用的进程有8个,默认是CPU的核数(进程池)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** -----
?
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

Celery支持进程池、Eventlet、Gevent以及运行一个简单的线程

调用任务

在入门的时候说了delay()是调用任务的方法,但它是apply_async()的快捷方式

1 add.apply_async((2,2),queue=‘lori‘,countdown=10)
2 # 第一个参数是个元组,是用来向add这个方法传值的
3 # queue:任务将会发送到名为‘lori’的队列中
4 # countdown:发送消息10秒后执行任务
5 # delay()只能传add方法的参数,而apply_async()不仅为add传参,还可以对消息做相关处理

delay()和apply_async()都会返回一个AsyncResult对象,用于任务执行的状态,但是必须另结果后台可用,从而将数据保存到某处(结果默认是不保存的)

 1 res = add.delay(2,2)
 2 res.get(timeout=1)  # 获取任务执行结果,超时为1秒
 3 res.id  # 任务的ID
 4 res.get(propagate=False)    # 屏蔽掉具体的异常展示
 5 res.failed()    # 任务执行失败返回True,成功返回False
 6 res.successful()    # 任务执行成功返回True,失败返回False
 7 res.state   # 返回任务的当前状态
 8 # 启动状态是一种特殊的状态,只有当task_track_started设置是启用的,或者为任务设置了@task(track_started=True)选项时,才会记录该状态。
 9 # 实际上PENDING状态不会被记录,所以
10 from proj.celery import app
11 res = app.AyncResult(this-id-does-not-exist)  # 这样在任务ID不存在的情况下,显示默认的状态
12 res.state
13 # ‘PENDING‘

Canvas:设计工作流

Celery提供了签名(signature),是用来封装任务的参数以及执行选项

与delay()和apply_async不同,签名不会运行

 1 s1 = add.signature((2,2),countdown=10)  # 或者简写s1 = add.s(2,2);s()是signature()的快捷方式,像delay(),凡是快捷方式都没有操作选项的参数
 2 res = s1.delay() # 运行最后还是要用过delay()和apply_async()
 3 
 4 # 以下两句是等价的
 5 add.apply_async(args, kwargs, **options)
 6 add.signature(args, kwargs, **options).apply_async()
 7 
 8 # 签名还可以进行克隆
 9 s = add.s(2)
10 >>> proj.tasks.add(2)
11  
12 s.clone(args=(4,), kwargs={debug: True})
13 >>> proj.tasks.add(4, 2, debug=True)

部分参数拼接args

1 s1 = add.s(2)  # 
2 res = s1.deplay(8)  # 实际执行的是s1.deplay(8,2)

参数覆盖kwargs

s2 = add.s(2,2,debug=True)
s2.delay(debuy=False)

原语

  1. Groups:组,用于并行调用任务,返回一个结果实例

    1 from celery import group
    2 from proj.tasks import add
    3 
    4 group(add.s(i, i) for i in range(10))().get()
    5 >>>[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    6 # 部分组,与部分签名一样
    7 g = group(add.s(i) for i in range(10))
    8 g(10).get()
    9 >>>[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
  2. Chains:链,用于串行调用任务,待一个任务返回结果后再调用下一个任务

     1 from celery import chain
     2 from proj.tasks import add, mul
     3 
     4 # (4 + 4) * 8
     5 chain(add.s(4, 4) | mul.s(8))().get() # 使用| 可以将上一个任务的结果作为参数,传到下一个任务签名中
     6 >>> 64
     7 # (? + 4) * 8
     8 g = chain(add.s(4) | mul.s(8))
     9 g(4).get()
    10 >>> 64
    11 (add.s(4, 4) | mul.s(8))().get()    # 简写可以去掉chain
    12 >>> 64
  3. Chords:弦,是一个带有回调的组,即串行和并行结合

    1 from celery import chord
    2 from proj.tasks import add, xsum
    3 
    4 chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()    # 将并行的结果实例传到串行的方法中执行
    5 >>> 90
    6 (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()    # 链接到其他任务的组将自动转换为和弦
    7 >>> 90
    8 upload_document.s(file) | group(apply_filter.s() for filter in filters)

路由

Celery支持RabbitMQ的所有路由,可将消息发送到指定的任务队列

 1 # rabbitMQ路由的配置
 2 app.conf.update(
 3     task_routes = {
 4         proj.tasks.add: {queue: hipri},
 5     },
 6 )
 7 ------------------------------
 8 # celery发送消息到指定队列
 9 from proj.tasks import add
10 add.apply_async((2, 2), queue=hipri)

可以使用celery -Q指定队列

$ celery -A proj worker -Q hipri        # 可以在运行worker的时候指定
$ celery -A proj worker -Q hipri,celery    # 用逗号分割指定多个

 

远程控制

  1. 检查

    # celery -A proj inspect --help 检查
    celery -A proj inspect active
    celery -A proj inspect active --destination=celery@example.com    # 指定worker
    
    celery -A proj status # 显示所有worker的状态列表
  2. 控制

    # celery -A proj control --help 控制
    celery -A proj control enable_events    # 远程启用事件
    celery -A proj events --dump    # 启动事件后,可以启动事件转储程序,并行查看woker执行状况
    celery -A proj control disable_events    # 远程禁用事件

     

 

Celery进阶

上一篇:Docker容器操作


下一篇:EasyNTS 交叉编译海思系统下的可执行程序实现及测试过程