Celery-UserGuide-Application

在使用celery之前必须先实例化celery库,celery实例称为application或app。
因为application时线程安全的,所以不同配置、不同组件、不同任务的celery应用可以在同一个进程空间共存。
例如:

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最后一行现实的是应用的符号表示(textual representation):包含类的名称(Celery),当前main模块的名称(__main__)以及当前对象的内存地址(0x100469fd0)。

Main Name

上面三个符号中,main模块最重要。
当用户在celery中发送一条任务信息时,信息不会包含任何源码,仅含有希望执行的任务名称。这个网络域名的工作原理类似: 每个worker都维护一份映射表,映射表中记录了任务名称对应的实际函数,这个映射表称为注册表。
在定义任务的同时,任务会被注册到被的注册表中:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

这里又一次出现了__main__。当celery不能判断当前函数所属的模块时,就会使用main块的名称作为任务名的起始部分。
在以下使用场景中会导致问题:

  • 定义任务的模块被作为程序运行
  • application在python shell中创建
    比如下面的tasks模块同时使用app.worker_main()启动worker:
    tasks.py
from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()```

当执行上面的模块时,任务会被以__main__作为起始部分命名。但是当这个模块被其他应用引用时并调用task时,任务会以tasks(模块的真正名称)作为起始部分命名。

>>> from tasks import add
>>> add.name
tasks.add

用户可以指定注模块的名称

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

Configuration

用户通过设置一些选项控制celery的运行。这些选项可以直接通过app实例设置也可以使用专门的配置模块。
可以通过app.conf访问或修改配置:

>>> app.conf.timezone
'Europe/London'

可以直接修改单项设置:

>>> app.conf.enable_utc = True

也可以通过update方法修改多项配置:

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

配置对象包含多个配置字典,会按以下顺序查询:

  1. 运行时的修改
  2. 配置模块
  3. 默认配置celery.app.defaults
    用户可以通过使用app.add_defaults()方法添加默认配置。

config_from_object

config_from_object方法从configuration对象中加载配置。配置对象可以是configuration模块或包含confuration属性的对象。
调用config_from_object方法时会重置旧的配置。
例1,使用模块名称:
config_from_object可以接受完整的python模块名,或python参数名称,比如:“celeryconfig”, "myproj.config.celery"或 “myproj.config:CeleryConfig”:

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

celeryconfig模块如下
celeryconfig.py:

enable_utc = True
timezone = 'Europe/London'

只要可以import celeryconfig,就可以使用。

例2 Passing an actual module object
也可以传递一个已经引入的模块对象,但不推荐这种做法。

import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

例3 使用配置类或配置对象

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')

config_from_envvar

app.config_from_envvar()方法从环境变量中接受配置模块名。比如加载名为CELERY_CONFIG_MODULE的环境变量:

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

之后可以通过环境指定配置模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO

检查配置

在做配置debug或类似的操作时可以需要打印配置信息,这时用户会希望隐藏密码等敏感信息。celery提供了相应的工具,比如humanize():

>>> app.conf.humanize(with_defaults=False, censored=True)

这个方法返回列表字符串。默认情况下,这将只包含对配置的更改,但您可以通过启用with_defaults参数来包含内置的默认键和值。
如果用户希望使用字典类型的配置,可以使用table()方法:

>>> app.conf.table(with_defaults=False, censored=True)

因为celery使用正则表达式匹配敏感字段,所以不能自动隐藏所有的敏感信息。如果自定义的配置信息中有敏感字段,要注意使用celery可以识别的敏感命名。比如:API, TOKEN, SECRET, PASS, SIGNATURE, DATABASE。

懒加载

application实例是惰性的,这意味着直到实际使用时才会做合法性检查。
创建celery实例时会发生以下行为:

  1. 创建本地定时器实例,用于事件;
  2. 创建任务注册表;
  3. 把实例设置为当前应用(除非设置set_as_current禁用);
  4. 调用app.on_init()回调函数(此函数默认无行为);
    app.task()装饰器不会在任务定义时创建任务,而是在任务被使用或application定型(Finalization)时创建任务。
    下例展示了任务直到被使用和访问其属性时才会创建:
>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

通过调用app.finalize()或访问app.tasks属性触发Finalization发生。
定型一个app会触发如下行为:

  1. 复制应用间分享的任务(任务默认在应用间分享,除非在装饰器中设置禁用shared属性);
  2. 评估所有等待状态(pending)的任务装饰器
  3. 确保所有的任务都和当前app绑定(任务和app绑定,以读取配置信息)

打断调用链

尽管可以使用当前app的设置,单最好的方法依然是把app实例传递给需要使用它的地方。
者被称为app链条,因为这样会根据传递的app实例创造一连串的实例。
下例是一种不推荐的使用方法:

from celery import current_app

class Scheduler(object):

    def run(self):
        app = current_app

可以改成下面这样:

class Scheduler(object):

    def __init__(self, app):
        self.app = app

在celery内部会使用celery.app.app_or_default()方法,以确保一切都可以在基于模块的兼容性API中工作:

from celery.app import app_or_default

class Scheduler(object):
    def __init__(self, app=None):
        self.app = app_or_default(app)

在开发中可以通过CELERY_TRACE_APP环境变量以在app调用链断裂时抛出异常:

CELERY_TRACE_APP=1 celery worker -l INFO

抽象任务

所有使用task()装饰器创建的任务都继承自application的基础Task类。
通过设置base参数可以指定别的基类:

@app.task(base=OtherTask):
def add(x, y):
    return x + y

自定义任务类需要继承基类:celery.Task

from celery import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return self.run(*args, **kwargs)

注意,如果重写例任务的__call__方法,那么一定要调用self.run去执行任务体。不能使用super().__call__。因而celery.Task的__call__方法仅仅用于继承。celery把call部署在celery.app.trace.build_tracer.trace_task中,改类会在没有定义__call__方法时指定运行自定义的任务类。

基类celery.Task没有绑定任何app,当任务绑定到一个app时,会从配置中读取默认值。

实现基类需要使用app.task()装饰器创建任务:

@app.task(base=DebugTask)
def add(x, y):
    return x + y

也可以通过修改app.Task()属性修改application的默认基类:

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    queue = 'hipri'

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]
上一篇:web项目022-----celery的基本使用


下一篇:celery