djcelery消息分布式系统配置

Celery是什么?

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

djcelery是Python的第三方库,它可以用于是任何的Python的项目中,因为我们始终可以把Celery看成一个独立的模块去操纵其它的模块。因此,我们也可以在Django项目中使用的Celery,但值得注意的是,在Django中使用Celery的方式有两种:

消息中间件

Celery本身不提供消息服务,依赖于中间件RabbitMQ, Redis等等

使用场景

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计

安装 celery

pip install django-celery

配置

目录结构

app
  - urls.py
  - models.py
  - tasks.py
project
  - __init__.py
  - settings.py
  - celery.py
  - urls.py
manage.py

settings.py

INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'djcelery',
)

# celery相关配置
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://:{password}@{host}:{port}/{db}'.format(password=REDIS_PWD,
                                                             host=REDIS_HOST,
                                                             port=REDIS_PORT,
                                                             db=REDIS_DB,
                                                            )
CELERY_TIMEZONE = TIME_ZONE
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_ENABLE_UTC = False
CELERYD_LOG_FILE = BASE_DIR + "/logs/celery/celery.log"  # log路径
CELERYBEAT_LOG_FILE = BASE_DIR + "/logs/celery/beat.log"  # beat log路径
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']  # 允许的格式
CELERY_TASK_SERIALIZER = 'json'
CELERYD_MAX_TASKS_PER_CHILD = 3

配置消息中间件
Redis

BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://:{password}@{host}:{port}/{db}'

RabbitMQ

BROKER_BACKEND = 'amqp'
broker_url = 'amqp://guest:guest@localhost:5672//'

celery.py

#!/usr/bin/env python2
# -*- coding:utf-8 -*-
# Author:jj
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lot_canal_info.settings')  # 设置django环境

app = Celery('lot_canal_info')

app.config_from_object('django.conf:settings')  # 使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 发现任务文件每个app下的task.py

 __init__.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']

tasks.py

# -*- coding: utf-8 -*-
"""
Created on 2019-11-06

@author: jj
"""
import django
django.setup()
from celery import task
from celery import shared_task
from django.conf import settings
import datetime

@shared_task
def TASK():
	print datetime.datetime.now()
	

启动定时任务

然后启动终端,切换到Django项目的根目录下,运行:

celery worker -A 项目名 -l info
这条命令用于启动worker, worker本质上执行任务的线程,就是一个干苦力的工人。

celery beat -A 项目名 -l info
上面这条任务用于启动beater,它就像一个领导,负责把任务分发给工人。

推荐文章

django-celery beat报错 error pid
djcelery 异常报错解决方案
celery后台服务配置Windows、Linux 待完成

上一篇:牛年第一章:别乱找了,flask配置celery4全在这里了


下一篇:celery异步发邮件