参考资料:https://blog.csdn.net/weixin_40475396/article/details/80439781
投入到指定的队列用:add.delay(1, 3, queue='queue_add1')
my_task.apply_async((2, 2), queue='my_queue', countdown=10) 任务my_task将会被发送到my_queue队列中,并且在发送10秒之后执行
Celery 库在使用之前必须初始化,一个celery实例被称为一个应用(或者缩写 app)。
Celery 应用是线程安全的
配置项可以通过 app.conf设置和获取,如:app.conf.timezone pp.conf.enable_utc
app.task() 装饰器在任务定义时不会创建任务,而是延迟任务的创建到任务使用时,或者应用被终止时。
所有使用 task() 装饰器创建的任务都会继承应用的基础 Task 类。你可以使用装饰器的 base 参数给任务声明一个不同的基类
@app.task(base=OtherTask):
def add(x, y):
return x + y
创建一个自定义的任务类,你应该继承这个中性类:celery.Task
from celery import Task
class DebugTask(Task):
def __call__(self, *args, **kwargs):
print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
return super(DebugTask, self).__call__(*args, **kwargs)
如果你覆盖了任务的 __call__ 方法,那么非常重要的一点是你还需要调用父类的方法使得在任务被直接调用时基类call方法能设置好默认请求。
如果你的任务函数是幂等的,你可以设置 acks_late 选项让工作单元在任务执行返回之后再确认任务消息
当使用多个装饰器装饰任务函数时,确保 task 装饰器最后应用(在python中,这意味它必须在第一个位置)
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
可以通过任务函数的 .name 属性获取任务的名称
显示设置任务名称的例子:
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
... return x + y
>>> add.name
'sum-of-two-numbers'
你不应该使用老式的相对导入:
from module import foo # BAD!
from proj.module import foo # GOOD!
新式的相对导入能够被正常使用:
from .module import foo # GOOD!
可以通过覆盖 app.gen_task_name() 方法修改自动名称生成行为
from celery import Celery
class MyCelery(Celery):
def gen_task_name(self, name, module):
if module.endswith('.tasks'):
module = module[:-6]
return super(MyCelery, self).gen_task_name(name, module)
app.Task.retry() 函数可以用来重新执行任务,例如在可恢复错误的事件中
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc) #exc 参数是用来传递在日志中使用或者在后端结果中存储的异常信息
# raise self.retry(exc=exc, countdown=60) # 60秒后再重试
Task.rate_limit 限制给定时间内可以运行的任务数量,当设置了速率限制后,已经开始的任务仍然会继续完成,但是任务开始前将等待一些时间。
T.apply_async(countdown=60, expires=120)从现在开始1分钟后执行任务,任务过期时间为2分钟
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
链接任务:add.apply_async((2, 2), link=add.s(16))#add.s 被称为一个签名, 这里第一个任务 (4) 将会发送到另一个任务将 16 与前面结果相加,形成表达式 (2 + 2) + 16 = 20
如果任务抛出异常(errback),你也可以让回调函数执行,但是与常规的回调不同的是它将会传递父任务的ID而不是结果值
countdown 是设置 ETA 的快捷方式。countdown 是一个整数, 但是eta 必须是一个 datetime 对象,用来声明一个精确的日期和时间(包含毫秒精度,以及时区信息
expires 参数定义了一个可选的过期时间,可以是任务发布后的秒数,或者使用 datetime 声明一个日期和时间。
当工作单元接收到一个过期任务,它会将任务标记为 REVOKED(TaskRevokeError)
客户端和工作单元之间的数据传输需要序列化,所以每个celery 的消息都有一个 content_type 请求头用来描述编码使用的序列化方法。内建的序列化器有 JSON, pickle, YAML 以及 msgpack
路由任务到不同的队列:add.apply_async(queue='priority.high')不建议在代码中硬编码队列名称,最佳到实践是配置路由器(task_routes)
signature:任务签名包含了一次任务调用的参数、关键字参数以及执行选项信息,它可以传递给其他函数,甚至序列化后通过网络传输。
回调函数可以通过使用 apply_async 函数的 link 参数添加到任何任务:
add.apply_async((2, 2), link=other_task.s())
group可以用来执行并行任务。
组结果包含如下操作:
successful()
如果所有的子任务都成功执行,那么返回 True(例如,没有抛出异常)。
failed()
如果任意子任务失败,那么返回 True。
waiting()
如果任意子任务没有执行完成,那么返回 True。
ready()
如果所有的任务都执行完成,那么返回 True。
completed_count()
返回完成的子任务数。
revoke()
取消所有子任务。
join()
收集所有子任务的返回值,并按照他们调用的顺序返回(作为一个列表)
可以在同一台机器上启动多个工作单元,只要确保给每个独立的工作单元使用 --hostname 参数声明一个节点名称。
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h