celery的定时运用

celery是一个基于Python开发的模块,可以帮助我们对任务进行分发和处理。

1.1 环境的搭建

pip3 install celery==4.4
安装broker: redis或rabbitMQ
pip3 install redis / pika

1.2 快速使用

s1.py

from celery import Celery

app = Celery(tasks, broker=redis://192.168.10.48:6379, backend=redis://192.168.10.48:6379)

@app.task
def x1(x, y):
    return x + y

@app.task
def x2(x, y):
    return x - y

s2.py

from s1 import x1

result = x1.delay(4, 4)
print(result.id)

s3.py

from celery.result import AsyncResult
from s1 import app

result_object = AsyncResult(id="任务ID", app=app)
print(result_object.status)

 

1.3 django中应用celery

第一步:【项目/项目/settings.py 】添加配置

CELERY_BROKER_URL = redis://192.168.16.85:6379
CELERY_ACCEPT_CONTENT = [json]
CELERY_RESULT_BACKEND = redis://192.168.16.85:6379
CELERY_TASK_SERIALIZER = json

第二步:【项目/项目/celery.py】在项目同名目录创建 celery.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import os
from celery import Celery

# set the default Django settings module for the ‘celery‘ program.
os.environ.setdefault(DJANGO_SETTINGS_MODULE, demos.settings)

app = Celery(demos)

# Using a string here means the worker doesn‘t have to serialize
# the configuration object to child processes.
# - namespace=‘CELERY‘ means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(django.conf:settings, namespace=CELERY)

# Load task modules from all registered Django app configs.
# 去每个已注册app中读取 tasks.py 文件
app.autodiscover_tasks()

第三步,【项目/app名称/tasks.py】

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

第四步,【项目/项目/__init__.py

from .celery import app as celery_app

__all__ = (celery_app,)

启动worker

进入项目目录

celery worker -A demos -l info -P eventlet

编写视图函数,调用celery去创建任务。

  • url

  • url(r^create/task/$, task.create_task),
    url(r^get/result/$, task.get_result),

视图函数

from django.shortcuts import HttpResponse
from api.tasks import x1

def create_task(request):
    print(请求来了)
    result = x1.delay(2,2)
    print(执行完毕)
    return HttpResponse(result.id)


def get_result(request):
    nid = request.GET.get(nid)
    from celery.result import AsyncResult
    # from demos.celery import app
    from demos import celery_app
    result_object = AsyncResult(id=nid, app=celery_app)
    # print(result_object.status)
    data = result_object.get()
    return HttpResponse(data)

 

 

项目目录下执行celery worker -A 项目名称 -l info -P eventlet

运行系统查看结果正常

 

celery的定时运用

上一篇:多个模态框的淡入淡出问题


下一篇:odoo的图标和数据透视表