介绍
celery分布式任务可以处理页面上那种数据读写或高密度计算的任务。这种任务如果频繁的执行的话,会造成网页卡死。
并且定时任务也是一种特殊的分布式任务。本例子讲的是从视图函数中直接写入数据,一旦运行到指定页面,后台自动更新数据库。类似于表单上传更改写入数据上传到某种页面,而该页面就是用分布式任务函数所处理。
分布式任务所需的模块
5个包
在settings.py中配置分布式任务和定时任务
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'index',
# 添加分布式任务功能
'django_celery_results',
# 添加定时任务功能
'django_celery_beat'
]
# 设置存储Celery任务队列的Redis数据库
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# 设置存储Celery任务结果的数据库
CELERY_RESULT_BACKEND = 'django-db'
# 设置定时任务相关配置
CELERY_ENABLE_UTC = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
定义数据模型来演示分布式任务
from django.db import models
class PersonInfo(models.Model):
id = models.AutoField(primary_key=True)
name = models.CharField(max_length=20)
age = models.IntegerField()
hireDate = models.DateField()
def __str__(self):
return self.name
class Meta:
verbose_name = '人员信息'
分布式任务具体流程
在主项目文件夹下新建个celery.py文件
import os
from celery import Celery
# 获取settings.py的配置信息
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyDjango.settings')
# 定义Celery对象,并将项目配置信息加载到对象中。
# Celery的参数一般以项目名称命名
app = Celery('MyDjango')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
然后编写init.py文件实现该实例化app(celery.py)对象与django的绑定
# celery官方文档
# 参考文档http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
from .celery import app as celery_app
__all__ = ['celery_app'] # celery框架的实例化对象app与Mydjango对象进行绑定
# 目的是使得Django运行的时候能自动加载Celery框架的实例化对象app
接着在具体应用中编写分布式任务函数,也要新建文件夹task.py
from celery import shared_task
from .models import *
import time
# 分布式任务
@shared_task
def updateData(id, info): # updateData分布式任务
try: # 分别代表主键id和模型字段的修改内容。一般没有要求函数设定返回值,若要有返回值,就作为分布式任务的执行结果
# 否则将执行结果设为None
# 所有的结果都会被记录在数据表django_celery_results_taskresult的result字段中
PersonInfo.objects.filter(id=id).update(**info) # 更新数据表中的字段
return 'Done'
except: # 分布式任务的触发条件被设置在了urls.py中,当用户访问特定的路由时,视图函数将调用编写的分布式函数
return 'Fail'
# 定时任务
@shared_task
def timing(): # 定时任务,须在后台设置,不知怎么他把celery许多的数据表单把注册到后台了,并没有使用django的admin
now = time.strftime("%H:%M:%S")
with open("d:\\output.txt", "a") as f:
f.write("The time is " + now)
f.write("\n") # 不过页真耗内存
f.close()
index应用下的路由信息urls.py
from django.urls import path
from .views import *
urlpatterns = [
path('', index, name='index'),
]
编写视图处理函数views.py
from django.http import HttpResponse
from .tasks import updateData
def index(request):
# 传递参数并执行异步任务
id = request.GET.get('id', 1) # 这里是手动写入数据,在views.py中,并没有从页面(表单)或者路由(get请求)中写入
info = dict(name='May', age=20, hireDate='2019-10-10')
# 调用分布式函数updateData,在调用delay方法创建任务队列,并将分布式函数设有的参数传入
updateData.delay(id, info) # delay应该时celery框架自带的方法
return HttpResponse("Hello Celery")
启动celery框架和定时任务的方法
celery -A MyDjango worker -1 info -P eventlet
这个命令报错
好吧,1和l真的很像,命令行输入时很容易弄错
celery -A MyDjango worker -l info -P eventlet 分布式任务
# 启动定时任务就必须保证django和celery框架是运行的
celery -A MyDjango worker -l info -S django 定时任务