python测试开发django-161.Celery 定时任务保存到数据库 (djcelery)

前言

接着前面Celery 定时任务,这篇使用Celery + djcelery 把定时任务存到数据库。

djcelery 环境准备

定时任务基础环境准备,就不多说了,接着前面一篇https://www.cnblogs.com/yoyoketang/p/15432907.html.
Celery的使用方式有两种:

  • Celery 只用Celery,本身自带worker 和 beat (定时任务)功能,定时任务在setting配置
  • Celery + djcelery 使用了djcelery,可以在任务中方便的直接操作 Django 数据库,而且最终的任务可以在 Django 的后台中查看和修改相关的任务。

多安装一个 djcelery 主要是把定时任务放到数据库中,方便配置和管理。

pip 安装django-celery

pip install django-celery==3.3.1

在 setting 里面配置 INSTALLED_APPS,添加’djcelery’

INSTALLED_APPS = [
    ......
    'djcelery'
]

同步数据库

python manage.py makemigrations
python manage.py migrate

执行完成后会看到djcelery相关的几张表
python测试开发django-161.Celery 定时任务保存到数据库 (djcelery)

CELERY 配置

在setting.py 添加CELERY 相关配置

import djcelery
djcelery.setup_loader()

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

BROKER_URL = 'redis://192.168.1.1:6379'
# RESULT_BACKEND 结果保存数据库
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定时任务保存数据库
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

相对于前面一篇,修改了以下内容

# 新增这2句
import djcelery
djcelery.setup_loader()

# RESULT_BACKEND 结果保存数据库
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定时任务保存数据库
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

tasks任务

在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

from __future__ import absolute_import
from celery import shared_task


@shared_task
def add(x, y):
    print("task----------111111----------")
    return x + y


@shared_task
def mul(x, y):
    print("task----------22222----------")
    return x * y

views视图创建任务

创建视图,把定时任务信息写入数据库

from django.http import JsonResponse
import datetime
import json
from djcelery.models import PeriodicTask, CrontabSchedule


def create_task(request):
    task_name = "test"	 # 唯一值,自定义不能重复, 这里是测试下,先写死
    task = 'yoyo.tasks.add'  # 任务的注册路径
    # task_args = [10, 11]   # 任务参数
    task_kwargs = {'x': 10, 'y': 11}  # 关键字参数
    # 定时任务规则
    crontab_time = {
        'minute': '*/2',       # 每2分钟执行一次
        'hour': '*',
        'day_of_week': '*',
        'day_of_month': '*',
        'month_of_year': '*'
    }
    # 写入 schedule表
    schedule = CrontabSchedule.objects.create(**crontab_time)
    # 任务和 schedule 关联
    task, created = PeriodicTask.objects.get_or_create(
        name=task_name,		# 名称保持唯一
        task=task,
        crontab=schedule,
        enabled=True,	  # 是否开启任务
        # args=json.dumps(task_args),
        kwargs=json.dumps(task_kwargs),
        # 任务过期时间,设置当前时间往后1天
        expires=datetime.datetime.now()+datetime.timedelta(days=1)
    )
    if created:
        return JsonResponse({"code": 0, "msg": "success"})
    else:
        return JsonResponse({"code": 111, "msg": "create failed"})

分别启动django, worker 和 beat服务

python manage.py runserver 0.0.0.0:8000
celery -A MyDjango beat -l info
celery -A MyDjango worker -l info

访问接口触发后,数据库 djcelery_crontabschedule 表会写入定时信息
python测试开发django-161.Celery 定时任务保存到数据库 (djcelery)
djcelery_periodictask 表记录任务信息
python测试开发django-161.Celery 定时任务保存到数据库 (djcelery)

上一篇:项目的celery日志报错:redis.exceptions.ConnectionError 无法和redis连接


下一篇:python测试开发django-159.Celery 异步与 RabbitMQ 环境搭建