一、django中使用celery
1 celery是独立的,跟框架没有关系 2 django-celery第三方模块,兼容性不好,不采用第三方模块,就使用通用方式 3 目录 celery_task __init__.py celery.py home_task.py order_task.py user_task.py luffyapi
代码:
启动django项目(略)
启动worker
# 启动worker celery worker -A celery_task -l info -P eventlet
luffyapi/celery_task/celery.py
import celery import os # 执行django配置文件,环境变量加入 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev") broker='redis://127.0.0.1:6379/1' # 1 表示使用redis 1 这个db backend='redis://127.0.0.1:6379/2' # 2 表示使用redis 2 这个db app=celery.Celery('test',broker=broker,backend=backend, include=['celery_task.order_task','celery_task.user_task','celery_task.home_task'] ) # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False ### 可能会使用到的参数 # app.conf.task_time_limit = 100 # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { # 'send-msg':{ # 'task': 'celery_task.user_task.send_sms', # # 'schedule': timedelta(hours=24*10), # # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 # 'schedule': crontab(hour=8, day_of_month=1), # 每月一号早八点 # 'args': ('18964352112',), # } 'update-banner':{ 'task': 'celery_task.home_task.update_banner', 'schedule': timedelta(seconds=10), 'args': (), } }
luffyapi/celery_task/home_task.py
from celery_task.celery import app @app.task def update_banner(): from django.core.cache import cache from django.conf import settings from home import models from home import serializer banners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE] ser = serializer.BannerModelSerializer(instance=banners,many=True) banner_data=ser.data # 拿不到request对象,所以头像的连接base_url要自己组装 for banner in banner_data: banner['img'] = 'http://127.0.0.1:8000%s' % banner['img'] cache.set('banner_data',banner_data) return True
luffyapi/celery_task/order_task.py
from .celery import app @app.task def process_order(a, b): print(a) print(b) return '订单处理完了' @app.task def cancel_order(): import random res = random.choice([1, 0]) if res == 0: print('订单状态改了,取消订单了') return True else: print('订单取消失败') return False
luffyapi/celery_task/user_task.py
from .celery import app @app.task def send_sms(phone): import time time.sleep(1) print('%s短信发送成功'%phone) return '%s短信发送成功'%phone
luffyapi\apps\user\urls.py
path('test_celery/',views.test_celery),
luffyapi\apps\user\views.py
from celery_task import user_task from celery_task.celery import app from celery.result import AsyncResult def test_celery(request): res_id = request.GET.get('id') if res_id: res = AsyncResult(id=res_id, app=app) if res.successful(): result = res.get() print(result) return HttpResponse('执行完成了,结果是:%s' % result) res = user_task.send_sms.delay('188****5678') return HttpResponse('任务号是:%s' % str(res))
访问路径
http://127.0.0.1:8000/user/test_celery/