celery(一) application

Application

application

  1. celery在使用之前,必须首先实例化。e.g. app = Celery()
  2. app 是线程安全的,即:不同配置、组件和任务的多个app可以共存在同一个进程空间。

任务注册表(task-registry)

在Celery中发送一个task 消息,这个消息并不包含任何源代码(函数体)。而是只有你所期望执行的task的名字。每个worker有一个任务注册表(task-registry),它是task 名称与 task 源代码(函数)的映射。每当你定义一个task,这个task就会被注册到本地的注册表中

懒加载

Celery创建app实例是延迟的,只有在调用它的时候才会创建。也就是说,Celery()命令并没有立即创建app实例。

Celery实例化的过程中,做了如下操作:

  • 创建了一个逻辑时钟,用于events
  • 创建了一个任务注册表 task-registry
  • 设置它自己为当前的Celery实例(如果set_as_current 参数被置为disabled,那么就不会进行该操作)
  • 调用app.on_init() (默认情况下,啥都没做)

@app.task 装饰器也是延迟创建task的。task被定义的时候(在一个函数头上加装饰器的时候),task并没有立即创建。在task被使用的时候,或者是app finalized 的时候,task才会创建

finalized 做了如下操作:

  • task是在多app之间共享的,拷贝task。(shard参数可以取消共享,使task独属于其绑定的app)
  • 创建所有的task
  • 确保所有的task都绑定到当前的app(只有绑定app,task才能读取默认的配置)

Main name

task 默认的名称组成是 model.fun e.g. tasks.add

当model名获取不到的时候就会以 __main__ 作为model名。因此会出现 某task以 __main__.add 为名注册到 任务注册表之后,当某个task被引入到其他模块时,会以源模块.task作为任务名,这个时候,就会出现不一致的情况,所以,在Celery实例化的时候一定要指定app的名字。e.g. app=Celery(‘tasks’)

配置Celery

Celery的配置有如下几种:

  • 直接配置app属性
  • 使用配置文件

直接配置app属性

  • 单个配置

app.conf.enable_utc = True

  • 多个配置
app.conf.update(enable_utc=True, timezone=‘Asia/Shanghai‘)

使用配置文件

从配置文件加载配置,使用 app.config_from_object()app.config_from_envvar()方法。

config_from_object

app = Celery()
app.config_from_object(‘celeryconfig.py‘)
# 项目中要有 celeryconfig.py 文件
from xxxx import configmodel

app = Celery()
app.config_from_object(configmodel)
class Config:
    enable_utc = True
app = Celery()
app.config_from_object(Config)

config_from_envva

从环境变量加载指定模块

app.config_from_envvar(‘CELERY_CONFIG_MODEL‘)

Task

所有使用 @task() 装饰器定义的task,都继承自基类 Task 。你也可以指定自己的基类 Task

  1. 在装饰器中指定其他类
@app.task(base=OtherTask)
def function():
    ...
  1. 在配置中指定其他类
app = Celery()
app.Task = OtherTask

自定义 Task

所有的自定义 Task 都必须

继承自 Task

class MyTask(Task):
    ...

最佳实践是 app 作为参数传给需要它的地方

class SomeClass:
    def __init__(self, app):
        self.app = app
        ...

celery(一) application

上一篇:UPenn - Robotics 5:Robotics: Estimation and Learning - week 3:Mapping


下一篇:IDEA中Mybatis传统配置映射文件的时候遇到org.apache.ibatis.binding.BindingException: Invalid bound statement (not found): mapper.UserMapper.getUserById错误时