Celery异步任务

情景: 用户发起request,并等待response返回。在本些views中,可能需要执行一段耗时的程序,那么用户就会等待很长时间,造成不好的用户体验,比如发送邮件、手机验证码等。

使用celery后,情况就不一样了。

解决: 将耗时的程序放到celery中执行。

Celery 是一个包含一系列的消息任务队列。您可以不用了解内部的原理直接使用,它的使用时非常简单的。

  • 选择并且安装一个消息中间件(Broker)
  • 安装 Celery 并且创建第一个任务
  • 运行职程(Worker)以及调用任务
  • 跟踪任务的情况以及返回值

celery官方网站: http://www.celeryproject.org/

celery中文文档: https://www.celerycn.io/

Celery名词:

  • 任务task:就是一个Python函数。
  • 队列queue:将需要执行的任务加入到队列中。
  • 工人worker:在一个新进程中,负责执行队列中的任务。
  • 代理人broker:负责调度,在布置环境中使用redis。

选择中间人(Broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker)

以下有几种选择:

  • RabbitMQ:RabbitMQ 的功能比较齐全、稳定、便于安装。在生产环境来说是首选的。
  • Redis: Redis 功能比较全,但是如果突然停止运行或断电会造成数据丢失。

安装 Celery

可以用标准的 Python 工具,诸如 pipeasy_install 来安装:

$ pip install celery

Celery单应用

创建Celery实例对象及task任务

创建第一个 Celery 实例程序,我们把创建 Celery 程序成为 Celery 应用或直接简称 为 app,创建的第一个实例程序可能需要包含 Celery 中执行操作的所有入口点,例如创建任务、管理职程(Worker)等,所以必须要导入 Celery 模块。

在本教程中将所有的内容,保存为一个 app 文件中。针对大型的项目,可能需要创建 独立的模块。

首先创建 utils/tasks.py

import os
import sys
import django
from celery import Celery
from django.conf import settings
from django.core.mail import send_mail
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer

project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))  # 确定项目路径
sys.path.append(project_path)  # 将项目路径添加到系统搜索路径中
os.environ['DJANGO_SETTINGS_MODULE'] = 'django_test.settings'  # 设置项目的配置文件
django.setup()  # Django初始化

app = Celery('tasks', broker='amqp://guest@localhost//')


@app.task
def task_send_email(user_id, username, email):
    # 加密用户信息,生成token
    info = {'confirm': user_id}
    serializer = Serializer(settings.SECRET_KEY, 3600)
    token = serializer.dumps(info).decode()

    # 发送邮件
    '''发送重置密码邮件'''
    reset_url = "{}/user/reset/password/?token={}".format(settings.HOST_URL, token)
    subject = "重置密码"
    message = '邮件正文'
    sender = settings.DEFAULT_FROM_EMAIL
    receiver = [email]
    html_message = '<h1>{}, 您好!</h1>请在1小时内点击下面链接重置密码:<br/><a href="{}">{}</a>'.format(username, reset_url, reset_url)
    send_mail(subject, message, sender, receiver, html_message=html_message)

  • 第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称。

  • 第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery默认使用的也是RabbitMQ)。

    更多相关的 Celery 中间人(Broker)的选择方案,可查阅上面的中间人(Broker)。

    例如,对于 RabbitMQ 可以写为 amqp://localhost ,使用 Redis 可以写为 redis://localhost

  • 创建了一个名称为 task_send_email 的任务,实现邮件的发送。

注意: 在 单文件中,想要导入 Django项目的settings.py配置文件

  • 先确定Django项目所在的绝对路径
  • 追加Django所在的路径到系统搜索路径
  • 配置系统的Django环境变量

运行 Celery 子进程(Worker)服务

现在可以使用 worker 参数进行执行我们刚刚创建职程(Worker):

注意: 此时运行celery,是在Django项目下根目录中,*此时启动文件路径必须和 调用 异步任务的导包路径一致,否则,不识别*-A celery实例名,即为当前文件的名

$ celery -A utils.task worker --loglevel=info

Celery集成django使用

注意: 以前版本的Celery需要一个单独的库才能与Django一起使用,但是从3.1开始,情况不再如此。现在就已支持Django。

注意: Celery 4.0支持Django 1.8和更高版本。对于Django 1.8之前的版本,请使用Celery 3.1。

创建Celery实例对象

要将Celery与Django项目一起使用,必须首先定义Celery库的实例(称为 “app”)

如果您拥有当前主流的Django项目结构,例如:

- django_test/
  - manage.py
  - django_test/
    - __init__.py
    - settings.py
    - urls.py

那么建议的方法是创建一个新的django_test/django_test/celery.py模块,该模块定义Celery实例

file: django_test/django_test/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings')


app = Celery('django_test')


# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')


# Load task modules from all registered Django app configs.
app.autodiscover_tasks()




@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Django加载Celery

然后,需要将此程序导入到django_test/django_test/__ init__.py模块中。这样可以确保在Django启动时加载应用程序,以便@shared_task装饰器将使用该应用程序:

file:django_test/django_test/__init__.py:

from __future__ import absolute_import, unicode_literals


# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app


__all__ = ('celery_app',)

请注意,此示例项目结构适用于较大的项目,对于简单项目,可以使用一个包含的模块来定义应用程序和任务。

步骤解析

让我们分解一下第一个模块中发生的情况,首先我们从__freture__导入absolute_import导入unicode_literals,以使我们的celery.py模块不会与库冲突:

from __future__ import absolute_import

然后,为celery命令行程序设置默认的DJANGO_SETTINGS_MODULE环境变量:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings')

不需要此行,但可以避免始终将设置模块传递到celery程序中。它必须始终在创建应用程序实例之前出现,我们接下来要做的是:

app = Celery('django_test')

这是我们的库实例,可以有很多实例,但是使用Django时可能没有理由。 我们还将Django设置模块添加为Celery的配置源。这意味着不必使用多个配置文件,而直接从Django设置中配置Celery;但也可以根据需要将它们分开。

app.config_from_object('django.conf:settings', namespace='CELERY')

大写名称空间意味着所有Celery配置选项必须以大写而不是小写指定,并以CELERY_开头,因此例如task_always_eager设置变为CELERY_TASK_ALWAYS_EAGERbroker_url设置变为CELERY_BROKER_URL

可以直接传递设置对象,但是使用字符串更好,因为这样一来,工作人员就不必序列化该对象。 CELERY_名称空间也是可选的,但建议使用(以防止与其他Django设置重叠)。

接下来,可重用应用程序的常见做法是在单独的task.py模块中定义所有任务,而Celery可以自动发现这些模块:

app.autodiscover_tasks()

在上述行中,Celery将按照task.py约定自动发现所有已安装应用程序中的任务:

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

这样,不必手动将各个模块添加到CELERY_IMPORTS设置中。 最后,debug_task示例是一个转储其自己的请求信息的任务。这是使用Celery 3.1中引入的新的bind = True任务选项来轻松引用当前任务实例。

使用@shared_task装饰器

编写的任务可能会存在于可重用应用程序中,并且可重用应用程序不能依赖于项目本身,因此不能直接导入应用程序实例。 @shared_task装饰器可让您创建任务而无需任何具体的应用程序实例:

file: user/tasks.py:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from django.conf import settings
from django.core.mail import send_mail
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer


@shared_task
def task_send_email(user_id, username, email):
    # 加密用户信息,生成token
    info = {'confirm': user_id}
    serializer = Serializer(settings.SECRET_KEY, 3600)
    token = serializer.dumps(info).decode()

    # 发送邮件
    '''发送重置密码邮件'''
    reset_url = "{}/user/reset/password/?token={}".format(settings.HOST_URL, token)
    subject = "重置密码"
    message = '邮件正文'
    sender = settings.DEFAULT_FROM_EMAIL
    receiver = [email]
    html_message = '<h1>{}, 您好!</h1>请在1小时内点击下面链接重置密码:<br/><a href="{}">{}</a>'.format(username, reset_url,reset_url)
    send_mail(subject, message, sender, receiver, html_message=html_message)

运行 Celery 子进程(Worker)服务

现在可以使用 worker 参数进行执行我们刚刚创建职程(Worker):

注意: 此时运行celery,是在Django项目下-A celery实例名,即为Django项目的同名目录

$ celery -A django_test worker --loglevel=info

调用任务

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

delay()apply_async() 的快捷方法,可以更好的控制任务的执行(详情:调用任务:Calling Tasks):

from django.contrib.auth import authenticate
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from user.tools.auth import JSONWebTokenAuthentication # 自定义token认证类
# from .task import task_send_email # 导入Django集成的task任务
from utils.task import task_send_email  # 导入 celery单应用的task任务

class ResetPassword(APIView):
    authentication_classes = [JSONWebTokenAuthentication]
    permission_classes = [IsAuthenticated]

    def post(self, request):
        user = request.user

        task_send_email.delay(user_id=user.id, username=user.username, email=user.email)
        return Response('发送成功')

该任务已经有职程(Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。

上一篇:Android Wifi直接多重连接ad-hoc


下一篇:Django(40)解决celery报错 No module named 'click._bashcomplete'