[Celery分布式的异步任务框架]

[Celery分布式的异步任务框架]

Celery介绍

Celery:分布式的异步任务框架 Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

可以做的事,我们用它来解决什么问题

  • 异步任务 (异步的执行这个任务函数)解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
  • 延迟任务(延迟5s钟,执行一个任务(函数))解决延迟任务
  • 定时任务(定时的执行任务(函数))如果单纯执行定时任务,没必要用celery,可以使用别的

平台问题

Celery is a project with minimal funding, so we don’t support Microsoft Windows. 
Please don’t open any issues related to that platform
译:(Celery 是一个资金很少的项目,所以我们不支持微软的Windows。 请不要打开任何与该平台相关的问题 )

在Celery中几个基本的概念,需要先了解下,不然不知道为什么要安装下面的东西。概念:Broker,Backend。

什么是Broker:

  broker是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,进行程序执行,好吧,这个邮箱可以看成是一个消息队列,其中Broker的中文意思是经纪人,其实就是一开始说的消息队列,用来发送和接受信息。这个broker有几个方案可供选择:RabbitMQ(消息队列),Redis(缓存数据库),数据库(不推荐),等等

什么是backend?

   通常程序发送的消息,发完就完了,可能都不知道对方什么时候接受了,为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果,Backend是在Celery的配置中的一个配置项CELERY_RESULT_BACKEND,作用是保存结果和状态,如果你需要跟踪任务的状态,那么需要设置这一项,可以是Database backend,也可以是Cache backend.

对于brokers,官方推荐是rabbitmq和redis,至于backend,就是数据库,为了简单可以都使用redis。

[Celery分布式的异步任务框架]

Celery异步任务框架

"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
​
人是一个独立运行的服务 | 医院也是一个独立运行的服务
    正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
    人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

"""

Celery的架构

Celery的架构由三部分组成:

  • 消息中间件(message broker)
    • Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
  • 任务执行单元(worker)
    • Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
  • 任务执行结果存储(task result store)
    • Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

Celery简单使用

安装配置,目录组织结构

1 安装:pip install celery
  消息中间件:RabbitMQ/Redis    app=Celery('任务名',backend='xxx',broker='xxx')
    
2 两种目录组织结构
    -普通的
    -包管理的(提倡用包管理,结构更清晰)
# 普通的
# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意
​
# 包管理的
# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意

celery执行异步任务

基本使用

# 第一步:定义一个py文件(名字随意,我们叫celery_task)
from celery import Celery   # 导入安装的模块

backend = 'redis://127.0.0.1:6379/1' # 结果存储,放在db1库下
broker = 'redis://127.0.0.1:6379/2' # 消息中间件,放在db2库下
app = Celery(__name__,broker=broker, backend=backend) #__name__当前文件的名字  celery实例化得到对象app
​
# 被它修饰,就变成了celery的任务
@app.task   
def add(a,b):   # 定义一个任务
    return a+b
​
​
#第二步:提交任务(新建一个py文件:submit_task) 
from celery_task import add  # 在文件中导入我们创建的任务
# 异步调用
# 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
res=add.delay(33,41)   # add.delay 就是提交异步任务
print(res)
​
​
# 使用任务执行单元来执行任务(使用命令启动)
"""
启动celery(app)服务:
# 非windows
 命令:celery  -A celery_task worker -l info
# windows:
 pip3 install eventlet  # 需要安装一个文件
 celery  -A celery_task worker -l info -P eventlet
"""
​
​
#第三步:启动worker
# 需要去我们自己写的celery文件所在的目录下执行
# celery_task py文件的名字
# -l info日志输出级别是info 
# -P eventlet  在win平台需要加上
celery -A celery_task worker -l info -P eventlet
#如果队列里有任务,就会执行,如果没有任务,worker就等在这
​
​
​
# 第四步:查询结果是否执行完成(新建一个py文件,get_result)
from celery_task import app
from celery.result import AsyncResult
​
id = 'ed85b97a-a231-4d71-ba11-e5f6450d0b47'  
# 提交完任务会返回一个uuid 写这就可以查询该任务的状态信息

if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':   # 判断status的值
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')
    
'''
查询结果成功后返回到redis的信息
{
  "status": "SUCCESS",   
   "result":  true,
   "traceback":  null,
   "children":  [],
   "date_done":  "2021-07-30T07:03:32.849985",
   "task_id":  "dbebc6e5-a700-4f91-b50c-7ef5b25c2cab"
}
'''
    
 """
流程:
 一个代码往里提,提到一个消息队列里,然后启动起celry worker,
 如果有任务就执行,如果没有任务,worker就等在这,执行了就在
 任务执行结果存储里,再用其他代码在结果存储里根据id号查出来
 查的时候它有可能执行了有可能没执行,没有执行是因为代码提交
 了没有启动worker,在结果存储里也查不到,一但启动worker立马
 就会被执行再查就可以查到了
 
 """

[Celery分布式的异步任务框架]

包管理结构

1 包结构
 project
   ├── celery_task # celery包,默认都放在项目根路径下
   │     ├── __init__.py # 包文件
   │     ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
   │     ├── course_task.py  # 任务1
   │     ├── home_task.py  # 任务2
   │     └── user_task.py  # 任务3       
   ├── submit_task.py  # 提交和查询结果的py文件,可以在项目的任意目录下
   └── delay_task.py  # 执行延迟任务,(注意:这个py名称随意)
​
celery_task:这个包就类似于一座医院,是一个独立运行的服务
submit_task.py :这个py文件就类似于一个人,也是一个独立运行的服务
​

2 启动worker
先cd到F:\luffy\luffyapi\luffyapi\scripts目录下,这个时候直接启动celery_task这个包就行
但前提是celery_task包下必须有celery.py这个文件

celery.py

from celery import Celery   # 导入celery模块

backend = 'redis://127.0.0.1:6379/1'    # 任务执行完毕后存储的位置
broker = 'redis://127.0.0.1:6379/2'
# 表示三个task文件统一被app管理起来了
app = Celery(__name__, broker=broker, backend=backend,
             include=['celery_task.course_task', 'celery_task.user_task', 'celery_task.home_task'])

三个任务↓↓↓

home_task.py

# 计算加法任务
from .celery import app
@app.task
def add(a,b):
    import time
    time.sleep(10)
    return a+b

course_task.py

# 写文件任务
from .celery import app
@app.task
def wirte_file(s):
    with open('s1.txt','w',encoding='utf-8') as f:
        f.write(s)
​

user_task.py

# 计算乘法任务
from .celery import app
@app.task
def mul(a,b):
    return a*b

submit_task.py

# 在项目中任意位置使用
# 提交任务
from celery_task.user_task import mul
​
res=mul.delay(90,80)
print(res)
​
# 查询结果   (可以将查询结果的代码放入另一个py文件下  单独查询)
from celery_task.celery import app
from celery.result import AsyncResult
​
id = 'bc893136-44aa-4bdf-8a58-8e639191d7c6'  # 提交任务后回返回一个uuid 写入此处即可
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

[Celery分布式的异步任务框架]

在luffy项目app中测试

# 演示异步在 views.py中
from celery_task.home_task import add
from celery.result import AsyncResult
from celery_task import app
​
def test(request):
    # 提交一个计算 90+80的任务,需要10秒中才能拿到结果  
    res=add.delay(90,80)
    return HttpResponse('任务已经提,任务id为:%s'%str(res))
​
def get_result_test(request):   # FBV
    id=request.GET.get('id')
​
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')
​
    return HttpResponse('任务执行的结果是%s'%result)
​
​
# 路由urls.py
from django.urls import path
from . import views
urlpatterns = [
    path('test/', views.test),
    path('get_result_test/', views.get_result_test),
]
"""
做成异步任务需要两次操作
http://127.0.0.1:8000/user/test/  # 访问路径获取id
http://127.0.0.1:8000/user/get_result_test/?id=获取的id  # 可以拿到结果
"""

celery 执行延迟任务

# delay_task.py
# 其他不用变,提交任务的时候
from celery_task.user_task import mul
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(seconds=10) # 当前utc时间,往后推10s,时间对象
​
'''  如果配置为上海时区则不需要指定datetime.utcnow()  直接datetime.now() 即可
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
'''


# print(eta)
# 提交延迟任务
# args:mul任务的参数
# eta:延迟时间(延迟多长时间执行),必须传一个时间对象(写数字不行)
# 使用utc时间
# 10s后执行这个任务
# 参数传递需要使用args,传时间要使用时间对象,使用的是utc时间
mul.apply_async(args=(200, 50), eta=eta)
​
"""
先启动worker在执行延迟任务,可以发现我们指定的是10秒中,
也就是说worker在没有被占用的情况下,可以在10秒后执行任务
"""

celery执行定时任务

#第一步:在celery.py中配置
​
# 修改时区配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
​
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    # 定时任务一,每隔3秒做一次
    'task-mul': {
        'task': 'celery_task.user_task.mul',
        'schedule': timedelta(seconds=3),  # 3s后
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 15),
    },
    # 定时任务二,每隔10秒做一次
    'task-add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=10),  # 10s后
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (3, 5),
    },
}
​
#第二步:启动beat(beat负责定时提交任务)
celery -A celery_task beat -l info
​
# 第三步:启动worker,任务就会被worker执行了
celery -A celery_task worker -l info -P eventlet

定时更新(应用)

路飞项目首页轮播图定时更新(异步更新思路)

1 首页轮播图加入缓存了
2 双写一致性问题(mysql数据改了,redis缓存数据还是一样的)
3 几种更新缓存的思路
    -定时更新(具体接口,看公司需求),我们为了测试,快一些,3s更新一次
    -异步更新(咱们现在没有)
        -新增轮播图接口---》新增完轮播图后--》执行异步更新轮播图的任务--》update_banner.dely()
        
4 代码编写

应用

####### 第1步:在celery.py中
from celery import Celery
# 在脚本中导入django环境   
# 因为celery和django是两个独立的服务 互不影响  所以在django中想用celery的话需要在脚本中导入django环境
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
django.setup()  
​
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,
             include=['celery_task.course_task', 'celery_task.user_task', 'luffyapi.apps.home.home_task'])
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
​

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'update_banner': {
        'task': 'luffyapi.apps.home.home_task.update_banner',
        'schedule': timedelta(seconds=3),  # 3s后
    },
}


####### 第2步:在home的app下新建home_task.py 用于写入更新缓存的代码
from .celery import app  # 导入celery.py下的app
@app.task
def update_banner():
    from home.models import Bannder
    from home.serializer import BannerSerializer
    from django.conf import settings
    from django.core.cache import cache
    # 更新轮播图缓存的任务
    #只要这个task被执行一次,缓存中的轮播图就是最新的
    queryset = Bannder.objects.all().filter(is_show=True, is_delete=False).order_by('-orders')[0:settings.BANNER_SIZE]
    ser = BannerSerializer(instance=queryset,many=True)
    # 小问题,图片前面的路径是没有带的,处理路径
    for item in ser.data:
        item['image']= settings.REMOTE_ADDR+item['image']
​
    cache.set('banner_cache_list',ser.data)
    return True

# user_settings.py
# 后端项目地址:处理图片路径,项目上线可改配置
REMOTE_ADDR='http://127.0.0.1:8000'


####### 第3步:一定要让django启动在celery.py文件中加入
    #在脚本中导入django环境
    import os
    import django
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
    django.setup()
    
    
####### 第4步 启动django项目,启动beat,启动worker
	# 启动worker
	celery -A celery_task worker -l info -P eventlet    
    # 启动beat的命令(负责每隔几秒钟,向任务队列中提交任务)
	celery beat -A celery_task -l info
    -每隔3s就会更新缓存
上一篇:celery分布式任务队列从入门到精通


下一篇:Celery异步任务框架(简单使用)