celery的分布式任务

介绍

celery分布式任务可以处理页面上那种数据读写或高密度计算的任务。这种任务如果频繁的执行的话,会造成网页卡死。
并且定时任务也是一种特殊的分布式任务。本例子讲的是从视图函数中直接写入数据,一旦运行到指定页面,后台自动更新数据库。类似于表单上传更改写入数据上传到某种页面,而该页面就是用分布式任务函数所处理。

分布式任务所需的模块

5个包

在settings.py中配置分布式任务和定时任务

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'index',
    # 添加分布式任务功能
    'django_celery_results',
    # 添加定时任务功能
    'django_celery_beat'
]

# 设置存储Celery任务队列的Redis数据库
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# 设置存储Celery任务结果的数据库
CELERY_RESULT_BACKEND = 'django-db'

# 设置定时任务相关配置
CELERY_ENABLE_UTC = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

定义数据模型来演示分布式任务

from django.db import models

class PersonInfo(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=20)
    age = models.IntegerField()
    hireDate = models.DateField()
    def __str__(self):
        return self.name
    class Meta:
        verbose_name = '人员信息'

分布式任务具体流程

在主项目文件夹下新建个celery.py文件

import os
from celery import Celery
# 获取settings.py的配置信息
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyDjango.settings')
# 定义Celery对象,并将项目配置信息加载到对象中。
# Celery的参数一般以项目名称命名
app = Celery('MyDjango')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

然后编写init.py文件实现该实例化app(celery.py)对象与django的绑定

# celery官方文档
# 参考文档http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
from .celery import app as celery_app
__all__ = ['celery_app'] # celery框架的实例化对象app与Mydjango对象进行绑定
# 目的是使得Django运行的时候能自动加载Celery框架的实例化对象app

接着在具体应用中编写分布式任务函数,也要新建文件夹task.py

from celery import shared_task
from .models import *
import time

 # 分布式任务
@shared_task
def updateData(id, info): # updateData分布式任务
    try: # 分别代表主键id和模型字段的修改内容。一般没有要求函数设定返回值,若要有返回值,就作为分布式任务的执行结果
        # 否则将执行结果设为None
        # 所有的结果都会被记录在数据表django_celery_results_taskresult的result字段中
        PersonInfo.objects.filter(id=id).update(**info) # 更新数据表中的字段
        return 'Done'
    except: # 分布式任务的触发条件被设置在了urls.py中,当用户访问特定的路由时,视图函数将调用编写的分布式函数
        return 'Fail'

# 定时任务
@shared_task
def timing(): # 定时任务,须在后台设置,不知怎么他把celery许多的数据表单把注册到后台了,并没有使用django的admin
    now = time.strftime("%H:%M:%S")
    with open("d:\\output.txt", "a") as f:
        f.write("The time is " + now)
        f.write("\n") # 不过页真耗内存
        f.close()

index应用下的路由信息urls.py


from django.urls import path
from .views import *
urlpatterns = [
    path('', index, name='index'),
]

编写视图处理函数views.py


from django.http import HttpResponse
from .tasks import updateData


def index(request):
    # 传递参数并执行异步任务
    id = request.GET.get('id', 1) # 这里是手动写入数据,在views.py中,并没有从页面(表单)或者路由(get请求)中写入
    info = dict(name='May', age=20, hireDate='2019-10-10')
    # 调用分布式函数updateData,在调用delay方法创建任务队列,并将分布式函数设有的参数传入
    updateData.delay(id, info) # delay应该时celery框架自带的方法
    return HttpResponse("Hello Celery")

启动celery框架和定时任务的方法

 celery -A  MyDjango worker -1 info -P eventlet
 这个命令报错
 好吧,1和l真的很像,命令行输入时很容易弄错
 celery -A MyDjango worker -l info -P eventlet 分布式任务
# 启动定时任务就必须保证django和celery框架是运行的
celery -A MyDjango worker -l info -S django 定时任务 
上一篇:优秀 python 开源项目


下一篇:高性能异步爬虫(二)