在使用celery之前必须先实例化celery库,celery实例称为application或app。
因为application时线程安全的,所以不同配置、不同组件、不同任务的celery应用可以在同一个进程空间共存。
例如:
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>
最后一行现实的是应用的符号表示(textual representation):包含类的名称(Celery),当前main模块的名称(__main__
)以及当前对象的内存地址(0x100469fd0)。
Main Name
上面三个符号中,main模块最重要。
当用户在celery中发送一条任务信息时,信息不会包含任何源码,仅含有希望执行的任务名称。这个网络域名的工作原理类似: 每个worker都维护一份映射表,映射表中记录了任务名称对应的实际函数,这个映射表称为注册表。
在定义任务的同时,任务会被注册到被的注册表中:
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.name
__main__.add
>>> app.tasks['__main__.add']
<@task: __main__.add>
这里又一次出现了__main__
。当celery不能判断当前函数所属的模块时,就会使用main块的名称作为任务名的起始部分。
在以下使用场景中会导致问题:
- 定义任务的模块被作为程序运行
- application在python shell中创建
比如下面的tasks模块同时使用app.worker_main()启动worker:
tasks.py
from celery import Celery
app = Celery()
@app.task
def add(x, y): return x + y
if __name__ == '__main__':
app.worker_main()```
当执行上面的模块时,任务会被以__main__
作为起始部分命名。但是当这个模块被其他应用引用时并调用task时,任务会以tasks(模块的真正名称)作为起始部分命名。
>>> from tasks import add
>>> add.name
tasks.add
用户可以指定注模块的名称
>>> app = Celery('tasks')
>>> app.main
'tasks'
>>> @app.task
... def add(x, y):
... return x + y
>>> add.name
tasks.add
Configuration
用户通过设置一些选项控制celery的运行。这些选项可以直接通过app实例设置也可以使用专门的配置模块。
可以通过app.conf
访问或修改配置:
>>> app.conf.timezone
'Europe/London'
可以直接修改单项设置:
>>> app.conf.enable_utc = True
也可以通过update方法修改多项配置:
>>> app.conf.update(
... enable_utc=True,
... timezone='Europe/London',
...)
配置对象包含多个配置字典,会按以下顺序查询:
- 运行时的修改
- 配置模块
- 默认配置
celery.app.defaults
用户可以通过使用app.add_defaults()
方法添加默认配置。
config_from_object
config_from_object方法从configuration对象中加载配置。配置对象可以是configuration模块或包含confuration属性的对象。
调用config_from_object方法时会重置旧的配置。
例1,使用模块名称:
config_from_object可以接受完整的python模块名,或python参数名称,比如:“celeryconfig”, "myproj.config.celery"或 “myproj.config:CeleryConfig”:
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
celeryconfig模块如下
celeryconfig.py:
enable_utc = True
timezone = 'Europe/London'
只要可以import celeryconfig,就可以使用。
例2 Passing an actual module object
也可以传递一个已经引入的模块对象,但不推荐这种做法。
import celeryconfig
from celery import Celery
app = Celery()
app.config_from_object(celeryconfig)
例3 使用配置类或配置对象
from celery import Celery
app = Celery()
class Config:
enable_utc = True
timezone = 'Europe/London'
app.config_from_object(Config)
# or using the fully qualified name of the object:
# app.config_from_object('module:Config')
config_from_envvar
app.config_from_envvar()
方法从环境变量中接受配置模块名。比如加载名为CELERY_CONFIG_MODULE的环境变量:
import os
from celery import Celery
#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')
之后可以通过环境指定配置模块:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO
检查配置
在做配置debug或类似的操作时可以需要打印配置信息,这时用户会希望隐藏密码等敏感信息。celery提供了相应的工具,比如humanize():
>>> app.conf.humanize(with_defaults=False, censored=True)
这个方法返回列表字符串。默认情况下,这将只包含对配置的更改,但您可以通过启用with_defaults参数来包含内置的默认键和值。
如果用户希望使用字典类型的配置,可以使用table()方法:
>>> app.conf.table(with_defaults=False, censored=True)
因为celery使用正则表达式匹配敏感字段,所以不能自动隐藏所有的敏感信息。如果自定义的配置信息中有敏感字段,要注意使用celery可以识别的敏感命名。比如:API, TOKEN, SECRET, PASS, SIGNATURE, DATABASE。
懒加载
application实例是惰性的,这意味着直到实际使用时才会做合法性检查。
创建celery实例时会发生以下行为:
- 创建本地定时器实例,用于事件;
- 创建任务注册表;
- 把实例设置为当前应用(除非设置set_as_current禁用);
- 调用app.on_init()回调函数(此函数默认无行为);
app.task()装饰器不会在任务定义时创建任务,而是在任务被使用或application定型(Finalization)时创建任务。
下例展示了任务直到被使用和访问其属性时才会创建:
>>> @app.task
>>> def add(x, y):
... return x + y
>>> type(add)
<class 'celery.local.PromiseProxy'>
>>> add.__evaluated__()
False
>>> add # <-- causes repr(add) to happen
<@task: __main__.add>
>>> add.__evaluated__()
True
通过调用app.finalize()或访问app.tasks属性触发Finalization发生。
定型一个app会触发如下行为:
- 复制应用间分享的任务(任务默认在应用间分享,除非在装饰器中设置禁用shared属性);
- 评估所有等待状态(pending)的任务装饰器
- 确保所有的任务都和当前app绑定(任务和app绑定,以读取配置信息)
打断调用链
尽管可以使用当前app的设置,单最好的方法依然是把app实例传递给需要使用它的地方。
者被称为app链条,因为这样会根据传递的app实例创造一连串的实例。
下例是一种不推荐的使用方法:
from celery import current_app
class Scheduler(object):
def run(self):
app = current_app
可以改成下面这样:
class Scheduler(object):
def __init__(self, app):
self.app = app
在celery内部会使用celery.app.app_or_default()方法,以确保一切都可以在基于模块的兼容性API中工作:
from celery.app import app_or_default
class Scheduler(object):
def __init__(self, app=None):
self.app = app_or_default(app)
在开发中可以通过CELERY_TRACE_APP环境变量以在app调用链断裂时抛出异常:
CELERY_TRACE_APP=1 celery worker -l INFO
抽象任务
所有使用task()装饰器创建的任务都继承自application的基础Task类。
通过设置base参数可以指定别的基类:
@app.task(base=OtherTask):
def add(x, y):
return x + y
自定义任务类需要继承基类:celery.Task
from celery import Task
class DebugTask(Task):
def __call__(self, *args, **kwargs):
print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
return self.run(*args, **kwargs)
注意,如果重写例任务的__call__
方法,那么一定要调用self.run去执行任务体。不能使用super().__call__
。因而celery.Task的__call__
方法仅仅用于继承。celery把call部署在celery.app.trace.build_tracer.trace_task中,改类会在没有定义__call__
方法时指定运行自定义的任务类。
基类celery.Task没有绑定任何app,当任务绑定到一个app时,会从配置中读取默认值。
实现基类需要使用app.task()装饰器创建任务:
@app.task(base=DebugTask)
def add(x, y):
return x + y
也可以通过修改app.Task()属性修改application的默认基类:
>>> from celery import Celery, Task
>>> app = Celery()
>>> class MyBaseTask(Task):
... queue = 'hipri'
>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
<unbound MyBaseTask>,
<unbound Task>,
<type 'object'>]