Celery

一、Celery架构介绍

Celery是分布式异步任务框架,Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

1、使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计

二、Celery简单使用

pip install celery==5.1.2  # 安装

1、使用

# 使用(举例,假设放在s1.py文件上)
from celery import Celery

# broker = 'redis://:密码@127.0.0.1:6379/1'  # -有密码的写法
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址


# 实例化得到celery对象,第一个参数是起个名字的意思
app = Celery(__name__, backend=backend, broker=broker)


# 使用装饰器包裹任务,将任务管理起来
@app.task()  # 写一个任务函数(示例,实际情况下任务应该单独创建文件)
def add(x, y):
    return x + y

#############################################################

# 任务提交(在另外一个s2.py文件提交任务)
import s1

# 异步执行
# 第一步:提交(使用的任务名.apply_async(参数))
res = s1.add.apply_async(args=[2, 3])
print(res)  # 会返回一个任务id号,唯一标识

#############################################################

# 第二步:让worker执行-->结果存在redis,使用命令启动

# 【非Windows】
celery worker -A s1文件名 -l info

# 【Windows】需要安装一个模块 pip3 install eventlet
celery worker -A 文件名 -l info -P eventlet  # 5.X前的启动命令
celery -A 文件名 worker -l info -P eventlet  # 5.X的启动命令

#############################################################

# 第三步:查看任务结果(放在任意文件查看,如s3.py)
from s1 import app
from celery.result import AsyncResult

id = '1b814481-08aa-484f-a1f9-3952a762df9a'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

三、celery包解构

# 目录结构
-celery_task               # 包名
	__init__.py
	celery.py             # app所在py文件
	course_task.py        # 任务
	order_task.py         # 任务
	user_task.py          # 任务
提交任务.py                # 提交任务
查看结果.py                # 查看结果

1、执行异步任务

"""首先是celery.py文件:存放celery的使用"""

from celery import Celery

# broker = 'redis://:密码@127.0.0.1:6379/1'  # -有密码的写法
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址


"""
实例化得到celery对象,第一个参数是起个名字的意思,第二和第三个参数对应上面的redis地址,
第三个参数include是一个列表,放被管理的task的py文件
"""
app = Celery(__name__, backend=backend, broker=broker, include=[
    'celery_task.user_task',
    'celery_task.order_task',
])

##################################################################################

"""然后就是user_task.py这些任务文件,举个例子,模拟一个发送短信验证码的任务"""
from .celery import app

@app.task()  # 装饰器管理任务
def send_sms(phone, code):
    import time
    time.sleep(3)  # 模拟发送短信延迟
    print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code))
    return '短信发送成功'

##################################################################################

"""然后就是提交我们所写的任务,放在提交任务.py文件中,然后运行该文件,再去启动worker命令"""
from celery_task import user_task

# 提交一个短信任务
a = user_task.send_sms.apply_async(args=(137000, 4691))  # 使用位置传参
# a = user_task.send_sms.apply_async(kwargs={'x': 137000, 'y': 4691})  也可以使用关键字传参
# a = user_task.send_sms.delay('137000','4691')  也可以写delay这种方式,可以直接传参
print(a)  # 会以字符串形式返回一个任务id号,可以用来查看任务结果

###################################################################################

执行worker命令后,提交一次任务,worker就执行一次,如果没有任务就会一直阻塞着,等待下一次任务执行。(执行worker记得把位置切到需要启动的包的上一层文件夹里)

# 【非Windows】
celery worker -A 文件名 -l info

# 【Windows】需要安装一个模块 pip3 install eventlet
celery worker -A 文件名 -l info -P eventlet  # 5.X前的启动命令
celery -A 文件名 worker -l info -P eventlet  # 5.X的启动命令

# 文件名就是目录结构的包名

###################################################################################

"""查看结果.py文件,用于查看任务执行的结果,下面基本就是固定代码"""
from celery_task.celery import app  # 导入celery文件下的app对象
from celery.result import AsyncResult

id = '1b814481-08aa-484f-a1f9-3952a762df9a'  # 使用提交任务返回的字符串,来查看结果
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

2、执行延迟任务

拿上面user_task的短信任务为例,将短信任务延迟10秒之后执行,所以直接在 "提交任务.py"下写

from celery_task import user_task  # 导入celery_task包下的user_task短信任务文件
from datetime import datetime, timedelta  # 导入关于时间的模块

# utcnow当前utc时间
eta = datetime.utcnow() + timedelta(seconds=10)  # 这句代码的意思是 当前utc时间的10秒后

# 10s之后发送短信
res = user_task.send_sms.apply_async(args=(137000, 4691),eta=eta)
print(res)

3、执行定时任务

"""对定时任务进行配置,放在celery.py里"""

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab  # 这个模块有各种指定任务时间的命令
app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {  # 给配置起名字
        'task': 'celery_task.user_task.send_sms',  # 指定执行的任务
        'schedule': timedelta(seconds=3),  # 执行任务内容,例如这个是每三秒发送一次短信
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (137000, 4691),  # 传参数,如果任务没有参数,就不用写
    }
}

"""
这时候worker启动后并没有执行,是因为我们之前执行任务是手动提交任务,为了能自动提交任务,我们需要启动beat
再开一个终端,不要把启动worker的终端顶掉了,然后输入命令启动beat
"""
celery beat -A celery_task -l  # 5.X之前的命令
celery -A celery_task  beat -l info  # 5.X之后的命令

四、Django集成Celery

# 了解性质内容:django-celery,一个把django和celery集成起来的第三方模块,但是第三方写的包的版本,需要跟celery和django版本完全对应。略显麻烦,所以我们不用
    	
# 我们自己使用包结构集成到django中

# 第一步,把写好的包,直接复制到项目根路径

# 第二步,在视图类/函数中
from celery_task.user_task import send_sms  # 导入想要使用的功能

def test(request):
    mobile = request.GET.get('mobile')
    code = '9999'
    res = send_sms.delay(mobile, code)  # 同步发送假设3分支钟,异步发送,直接就返回id了,后期通过id查询发送是否成功
    print(res)
    return HttpResponse(res)

# 启动worker,访问http://127.0.0.1:8000/test/?mobile=1111,带着参数,就访问成功了,后台的终端就会显示 短信发送成功,手机号是:1111,验证码是:9999

1、django集成celery实现定时任务

1-1、定时更新首页轮播图

我们设想这样一个场景,一个网站的首页可能会有 海报轮播图,这个轮播图通常很长时间都不变,如果用户每次访问首页,网站就得去查询一次数据库,对数据库的压力非常大

这时候我们可以使用Redis缓存,以后用户访问首页,只需要第一次访问时从数据库中获取数据放入Redis中,之后的访问都是直接从Redis中获取数据,减少了数据库查询的次数

class BannerView(ViewSetMixin, ListAPIView):
    queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.BANNER_COUNT]
    serializer_class = serializer.BannerSerializer

    def list(self, request, *args, **kwargs):
        # 先去缓存中获取,如果缓存有,直接返回,如果没有,去数据库查询,放到缓存

        # 先去缓存中获取
        banner_list = cache.get('banner_list_cache')
        if not banner_list:  # 去数据库中获取
            # 没有走缓存
            print('查了数据库')
            res = super().list(request, *args, **kwargs)
            banner_list = res.data  # res是Response对象
            # 放入到缓存中
            cache.set('banner_list_cache', banner_list)
        return Response(data=banner_list)

1-2、解决双写一致性问题

虽然用Redis缓存很方便,可以有效解决数据库压力,但这种方法也会伴随着一定问题,比如说:双写一致性问题(redis缓存和mysql数据不同步),或者缓存穿透,缓存击穿,缓存雪崩等问题,不过我们也有相应的解决办法

# 双写一致性问题:是缓存数据库更新的策略,数据库的数据更新了,但是Redis缓存没更新,确怎么解决

# 缓存更新策略
	-先删除缓存,在更新数据库
	-先更新数据库。再更新缓存(可靠性高)
    
    -定时更新(对实时性要求不高)
    	-可以设置为某个时间段,自动更新缓存

1-2-1、home_task.py

from home import serializer
from .celery import app
from apps.home import models
from django.conf import settings
from django.core.cache import cache

@app.task()
def update_banner():
    # 从mysql中取出轮播图数据
    queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.BANNER_COUNT]

    # 2 序列化
    ser = serializer.BannerSerializer(instance=queryset, many=True)
    # 3 获取到字典,手动拼上前面的地址
    banner_list = ser.data
    for banner in banner_list:
        banner['image'] = settings.BACKEND_URL % str(banner['image'])

    # 4 放到缓存中
    cache.set('banner_list_cache', banner_list)
    return True

1-2-2、celery.py

from celery import Celery

# 由于celery和django 是独立的两个服务,要想在celery服务中使用django,必须加这两句
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

# 由于celery和django 是独立的两个服务,要想在celery服务中使用django,必须加这两句
import os

# os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一个列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker, include=[
    'celery_task.gagaluansha',
    'celery_task.user_task',
])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta

app.conf.beat_schedule = {
    'update_banner_every_3_seconds': {  # 给配置起名字
        'task': 'celery_task.home_task.update_banner',  # 指定执行的任务
        'schedule': timedelta(seconds=3),  # 执行任务内容,例如这个是每三秒发送一次短信
    }
}

1-3-3、启动djagno,启动beat,启动worker

python manage.py runserver 
celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet
上一篇:python测试开发django-160.Celery 定时任务 (beat)


下一篇:caffe 模型训练超参数配置