celery是一个基于Python开发的模块,可以帮助我们对任务进行分发和处理。
pip3 install celery==4.4
安装broker: redis或rabbitMQ
pip3 install redis / pika
from celery import Celery app = Celery(‘tasks‘, broker=‘redis://192.168.10.48:6379‘, backend=‘redis://192.168.10.48:6379‘) @app.task def x1(x, y): return x + y @app.task def x2(x, y): return x - y
s2.py
from s1 import x1 result = x1.delay(4, 4) print(result.id)
s3.py
from celery.result import AsyncResult from s1 import app result_object = AsyncResult(id="任务ID", app=app) print(result_object.status)
CELERY_BROKER_URL = ‘redis://192.168.16.85:6379‘ CELERY_ACCEPT_CONTENT = [‘json‘] CELERY_RESULT_BACKEND = ‘redis://192.168.16.85:6379‘ CELERY_TASK_SERIALIZER = ‘json‘
第二步:【项目/项目/celery.py】在项目同名目录创建 celery.py
#!/usr/bin/env python # -*- coding:utf-8 -*- import os from celery import Celery # set the default Django settings module for the ‘celery‘ program. os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘demos.settings‘) app = Celery(‘demos‘) # Using a string here means the worker doesn‘t have to serialize # the configuration object to child processes. # - namespace=‘CELERY‘ means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object(‘django.conf:settings‘, namespace=‘CELERY‘) # Load task modules from all registered Django app configs. # 去每个已注册app中读取 tasks.py 文件 app.autodiscover_tasks()
第三步,【项目/app名称/tasks.py】
from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y
from .celery import app as celery_app __all__ = (‘celery_app‘,)
启动worker
进入项目目录
celery worker -A demos -l info -P eventlet
-
-
url(r‘^create/task/$‘, task.create_task), url(r‘^get/result/$‘, task.get_result),
视图函数
from django.shortcuts import HttpResponse from api.tasks import x1 def create_task(request): print(‘请求来了‘) result = x1.delay(2,2) print(‘执行完毕‘) return HttpResponse(result.id) def get_result(request): nid = request.GET.get(‘nid‘) from celery.result import AsyncResult # from demos.celery import app from demos import celery_app result_object = AsyncResult(id=nid, app=celery_app) # print(result_object.status) data = result_object.get() return HttpResponse(data)
项目目录下执行celery worker -A 项目名称 -l info -P eventlet
运行系统查看结果正常