celery框架

[toc]

celery框架:

介绍:

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

  Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

	celery 组成: broker   |   worker    |   backend

    消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

  任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

    
使用场景:
	Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。
    还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。
    延时任务:解决延迟任务
    

注意: RabbitMQ : 异步的消息队列 (线上使用)

celery + redis :

环境搭建:

配置:
	pip install celery
    pip install django-redis
    # Windows中还需要安装以下模块,用于任务执行单元
    pip install eventlet
    
redis-- settings.py:
    CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}


任务结构:

创建文件目录结构:
	pro_cel
    ├── celery_task# celery相关文件夹
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── tasks1.py    #  所有任务函数
    │   └── tasks2.py    #  所有任务函数
    ├── check_result.py # 检查结果
    └── send_task.py    # 触发任务
    
 注意,检查结果与触发任务的模块不能写在celery_task模块中,不然会报导入celery的错误。

celery框架工作流程
    1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
    2)给worker对应的app添加可处理的任务函数,用include配置给worker的app
    3)完成提供的任务的定时配置app.conf.beat_schedule
    4)启动celery服务,运行worker,执行任务
    5)启动beat服务,运行beat,添加任务

任务实现:

基本使用:
	1. 创建环境: celery 环境搭建
	2.创建app  + 任务
    	(创建: broker  + app  + backend)
	3.执行任务:
    	# 非windows
        celery worker -A celery_task -l info
        # windows:
        pip3 install eventlet
        celery worker -A celery_task -l info -P eventlet
   4.添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

   5.获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本

eg:
    from celery import Celery
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
task.py:
    from .celery import app
    @app.task
    def add(n, m)
    	pass

定时任务

celery.py:

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0'  # 无密码
# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'

# 生成celery对象,'task'相当于key,用于区分celery对象(任意名)
# include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=[
    'celery_task.add_task',
    'celery_task.send_email'
])

# 时区 (可设置任意一个)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 定时执行
app.conf.beat_schedule = {
    # 名字随意命名
    'add-every-5-seconds': {
        # 执行add_task下的addy函数
        'task': 'celery_task.add_task.add',
        # 每10秒执行一次
        'schedule': timedelta(seconds=10),
        # add函数传递的参数
        'args': (1, 2)
    },
    'add-every-10-seconds': {
        'task': 'celery_task.add_task.add',
        # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
        'schedule': crontab(minute=5),
        'args': (1, 2)
    }
}

send_msg.py:

#项目配置

# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '1504703554@qq.com'  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = '授权码'  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '2333<'1504703554@qq.com>'
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True


import os
if __name__ == "celery_task.send_email":
    # 因为需要用到django中的内容,所以需要配置django环境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "do_celery.settings")
    import django
    django.setup()
    # 导入celery对象app
    from celery_task.celery import app
    from app01 import models
    # 导入django自带的发送邮件模块
    from django.core.mail import send_mail
    import threading


	@app.task
    def send_email1(id):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作
        # 此处的id由用户注册的视图函数中传入
        user_obj = models.UserInfo.objects.filter(pk=id).first()
        email = user_obj.email
        # 启用线程发送邮件,此处最好加线程池
        t = threading.Thread(target=send_mail, args=(
            "激活邮件,点击激活账号",  # 邮件标题
            '点击该邮件激活你的账号,否则无法登陆',  # 给html_message参数传值后,该参数信息失效
            settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址
            [email],  # 接收邮件的邮件地址,可以写多个
            ),
            # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
            kwargs={'html_message': "<a href='http://127.0.0.1:8000/active_user/?id=%s'>点击激活gogogo</a>" % id}
        )
        t.start()

check_result.py:

from celery.result import AsyncResult
from celery_task.celery import app

def check_result(task_id):
    async1 = AsyncResult(id=task_id, app=app)

    if async1.successful():
        result = async1.get()
        print(result)
        return result
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
	 elif async1.failed():
        print('执行失败')
        return '执行失败'
    elif async1.status == 'PENDING':
        print('任务等待中被执行')
        return '任务等待中被执行'
    elif async1.status == 'RETRY':
        print('任务异常后正在重试')
        return '任务异常后正在重试'
    elif async1.status == 'STARTED':
        print('任务已经开始被执行')
        return '任务已经开始被执行'

执行:

启用任务执行单元worker(以windows为例):

celery worker -A celery_task -l info  -P  eventlet

app.conf.beat_schdule定时任务时,还需要启用beat,用于定时朝消息队列提交任务:

celery beat -A celery_task -l info

延时任务:

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间,这里是10秒后执行任务
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

django + celery:

from datetime import timedelta
from celery import Celery 
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒执行一次
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 传递参数
        'args': ('test',)
    },
}

执行:
    # 启动一个beat
    celery beat -A celery_task -l info

    # 启动work执行
    celery worker -A celery_task -l info -P  eventlet

上一篇:Kubernetes的本质


下一篇:Celery的使用