celery使用

1.Celery是什么

Celery是一个简单、灵活且可靠,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

celery使用

2.Celery的组成

既然Celery是一个实时的任务处理系统,那么他必然至少存在三部分,一任务发布者,二任务存储者,三任务处理者,四任务结果存储者,五任务结果消费者。

  1. 这个任务处理系统是独立出来帮我们完成任务的,那他一定需要一个发布任务的接口对吧。
  2. 任务发布上去了不是说处理就处理的,肯定需要时间等待,所以这个系统必须先将其存储起来,等到要处理的时候再去拉出来。在这里我们使用redis来存储任务
  3. 这个处理任务肯定也需要一个处理者来完成这个任务吧
  4. 处理完了任务是不是还需要返回结果?不然咱咋知道这个任务是否执行完了。在这里我们使用redis来保存任务结果。
  5. 任务消费者是celery提供给我们拉取任务执行结果的接口

上面可以看出,celery完全属于自己的部分只有两个,任务发布接口,任务处理器,而任务存储器和任务结果存储器都是借助的外部东西
既然他是实时的,那么这五部分应该可以独立运行。确实如此。

下面将对这几部分以简单的代码进行描述

一个最常见的使用案例,用户注册账号需要给用户邮箱确认链接,发送需要时间,用户点击也需要时间,如果采用同步执行的方式,用户在此期间什么也做不了,最常见的方式就是采用异步发送消息,后端服务器发送确认链接,用户可以进行其他操作。

2.1 celery系统定义

celery_task01.py

import celery
import time

backend = 'redis://192.168.1.101:6379/1'  #异步结果
broker = 'redis://192.168.1.101:6379/2'  #消息中间件

# 定义celery对象,task是celery对象的名字,backend是结果存储系统,broker是任务队列存储系统
cel = celery.Celery('task', backend=backend, broker=broker)


# 将执行函数封装成一个task执行单元
@cel.task
def send_email(name):
    print('向%s发送邮件。。。' % name)
    time.sleep(5)
    print('向%s发送邮件完成' % name)
    return 'ok'

# celery可以封装多个执行单元
@cel.task
def send_msg(name):
    print('向%s发送短信。。。' % name)
    time.sleep(5)
    print('向%s发送短信完成' % name)
    return 'ok'

在这里我们首先定义了一个Celery对象,这个对象就是系统。celery系统封装了系统与任务队列存储器和任务结果存储的交互,我们只需要指定这两个存储器,它内部自会运作。然后用celery系统的装饰器封装了两个任务执行流程。至此还需要我们发布任务和拉取任务结果。

2.2 任务发布

celery_produce.py

from celery_task01 import send_email,send_msg

result = send_email.delay("yaowy")
print(result.id)
result2 = send_email.delay("bai")
print(result2.id)

在这里我们必须引入我们设置好的celery系统,工作流程经过装饰以后多了delay方法,可以将参数传入函数中并且封装成一个任务对象,并且返回一个任务id,celery系统内部自会将其放入任务队列存储器。然后开启worker拉取任务并执行,然后将执行结果放入结果存储器。想要获取任务执行结果,还需要一个任务id

2.3 任务结果拉取

from celery_task01 import cel
from celery.result import AsyncResult

async_result=AsyncResult(id='e726337e-f529-407f-8988-ff373018e77f',app=cel)

if async_result.successful():
    result=async_result.get()
    print(result)

首先定义了一个异步拉取结果的对象,这里需要两个东西,一个是任务id,在这里也可以看出,需要先发布任务,并且获取任务id,才能获取结果,第二个是celery系统对象,你既然要拉取数据,肯定需要跟系统打交道。

2.4 测试

  1. 第一步肯定要运行系统,运行系统不能直接运行python文件,而是要用命令。我这里用的是celery5.2.3

命令:celery --app=celery_task01 worker -l info -P eventlet -E
注意:现在的博客多是用的celery3,命令为celery -A tasks worker --loglevel=info。这个命令在我这里已经不适用了。
解释:--app指定定义系统的python文件,worker表示开启worker任务处理进程 -l 指定日志打印级别为info,-P指定任务处理模式,这里使用了协程,celery4.x及以上在Windows上运行时必须指定这个,不然会报错。-E指定打开events。

  1. 第二步,执行任务发布python文件,即celery_produce.py,同时在控制台复制任务id粘贴到celery_consumer.py中。
  2. 第三步,执行任务结果拉取python文件,即celery_consumer.py。控制台打印了ok。说明成功,一个最简单的celery系统打通了。
上一篇:PC 全局loading的二次封装


下一篇:Node: 将时间戳转换成日期并分组