Celery分布式异步任务处理框架

1. Celery 简介

Celery是一个自带电池的基于Python开发的分布式异步任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。它非常易于使用,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用Celery。它主要适用于两大类场景:

  • 异步:
    有的任务执行时间较长,你不想让程序一直等待结果返回,可以先将改任务放入celery任务队列中,并从Celery获取一个任务ID。后续通过询问Celery来得知任务执行状态和进度。

  • 定时:
    需要定时执行同样的任务,Celery任务队列支持定时触发,可以按照时间间隔或者crontab表达式来触发任务。

在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“经纪人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

Celery具有以下优点:

  • 简单
    Celery 使用和维护都非常简单,并且不需要配置文件。交流论坛:一个mailing-list 和一个IRC channel.

  • 高可用
    woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。

  • 快速
    单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, redis和优化设置时)

  • 灵活
    Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

这有一个最简单的应用示例,你可以参照:

from celery import Celery

broker = 'redis://192.168.53.184:6379/5'
backend = 'redis://192.168.53.184:6379/6'

app = Celery('tasks', broker=broker, backend=backend)

@app.task
def hello():
    return 'hello world'

if __name__ == "__main__":
    hello()

2.Celery工作流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6VNUHJ7S-1624110584096)(assets/584bbf78e1783.png)]

3. Celery安装

你可以安装Celery通过Python包管理平台(PyPI)或者源码安装
使用pip安装:

$ pip install celery==5.0.5

4. Celery使用

4.1 Brokers

Celery支持多种消息中间件作为Broker,即中间人。来在应用程序和Worker之间传递消息。

支持的消息中间件总览:

消息中间件 支持适配状态 支持监控 支持远程控制
RabbitMQ 稳定
Redis 稳定
Amazon SQS 稳定
Zookeeper 实验

RabbitMQ是默认的Broker它不需要其他额外的依赖和初始化配置。

BROKER_URL = "redis://192.168.53.184:6379/5"

4.2 Backend

通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。Backend是在Celery的配置中的一个配置项 CELERY_RESULT_BACKEND ,作用是保存结果和状态,如果你需要跟踪任务的状态,那么需要设置这一项,可以是Database backend,也可以是Cache backend 。

对于 brokers,官方推荐是 rabbitmq 和 redis,至于 backend,就是数据库。为了简单可以都使用 redis。

CELERY_RESULT_BACKEND = "redis://192.168.53.184:6379/6"

4.3 Woker

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

celery worker -A tasks --loglevel=info

#一次启动多个worker
celery -A CeleryProject.app worker -Q default,tasks_A,tasks_B -l info -E

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lzz5CW8B-1624110584101)(assets/image-20210515215509465.png)]

4.4 Beat

Celery通过celery beat进程来完成定时任务。Celery beat启动之后, 读取配置文件中的定时任务信息,到了需要执行时间点, 消费者Celery beat便将其加入到queue中, worker进程拿来消费。为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。

定时任务配置的其中一种方式:

CELERYBEAT_SCHEDULE = {
    "add":{
        "task":"CeleryProject.tasks.add",
        "schedule":timedelta(seconds=10),   #每10s执行一次
        "args":(10,16)
    },
    "taskA":{
        "task": "CeleryProject.tasks.taskA",
        "schedule": crontab(minute="*/1")   #每1min执行一次
    },
    "taskB":{
        "task": "CeleryProject.tasks.taskB",
        "schedule": crontab(minute="*/1")   #每1min执行一次
    }
}

启动celery beat

celery beat -A CeleryProject.app -l info

4.5路由配置

celery beat生成任务消息,然后发送到exchange(交换机),交换机决定哪个队列接收这个消息,这个就需要配置交换机的路由key。

三种交换类型:

  • Direct Exchange
    • 直接交换,也就是指定一个队里来接收,这个消息被celerybeat发送给指定的routekey所绑定的队列。
  • Topic Exchange
    • Topic可以根据同类的属性进程通配。例如,你有三个队列和三个消息, A消息可能希望被X,Y处理,B消息你希望被,X,Z处理,C消息你希望被Y,Z处理.并且这个不是队列的不同而是消息希望被相关的队列都去执行
  • Fanout Exchange
    • Fanout类型的消息在生成的时候为多份,每个队列一份,相当于是广播
CELERY_QUEUES = {
    Queue("default", routing_key="task.default"),  # 路由键以task开头的信息都进入到default队列中
    Queue("tasks_A", routing_key="A.#"),  # 路由键以A开头的信息都进入到task_A队列中
    Queue("tasks_B", routing_key="B.#")  # 路由键以B开头的信息都进入到task_B队列中
}

CELERY_DEFAULT_QUEUE = "default"
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"

CELERY_ROUTES = (
    {
        re.compile(r"CeleryProject\.tasks\.(taskA|taskB)"): {"queue": "tasks_A", "routing_key": "A.import"}
    },
    {
        "CeleryProject.tasks.add": {"queue": "default", "routing_key": "task.default"}
    }
)

5.celery flower安装

pip install flower

#启动celery flower
celery flower -A CeleryProject.app --address=0.0.0.0 --port=5555 --broker=redis://192.168.53.184:6379/5

#通过post请求向celery worker 发送异步任务请求
curl -X POST -d '{"args":[122,18]}' http://localhost:5555/api/task/send-task/tasks.add

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2kQwHFel-1624110584102)(assets/image-20210515215540776.png)]

celery flower启动之后可以访问web界面:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f5DiIvsO-1624110584105)(assets/image-20210515215716765.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RAhLhORs-1624110584106)(assets/image-20210515215623084.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hiaXVVXg-1624110584108)(assets/image-20210515215649468.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GKwHmJG4-1624110584109)(assets/image-20210515215745516.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E42GM6RB-1624110584111)(assets/image-20210515215819321.png)]

6.完整celery项目代码

项目地址

项目结构:

(base) upchina@ubuntu:~/project/tmp/python_demo/CeleryProject$ tree
.
├── app.py				#celery应用
├── __init__.py			#Celery项目包,空文件
├── settings.py         #应用配置及任务配置
└── tasks.py			#任务模块

0 directories, 4 files

app.py

#!/usr/bin/env python
# encoding: utf-8
"""
@author: chaochen
@file: app.py
@time: 2021/5/15 下午3:42
"""
from celery import Celery
app = Celery("CeleryProject", include=["CeleryProject.tasks"])
app.config_from_object("CeleryProject.settings")


if __name__ == "__main__":
    app.start()
    pass

tasks.py

#!/usr/bin/env python
# encoding: utf-8
"""
@author: chaochen
@file: tasks.py
@time: 2021/5/15 下午4:02
"""

import os
import time
import socket
from CeleryProject.app import app

def get_host_ip():
    """
    查询worker节点的ip
    :return: ip
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

@app.task
def add(x, y):
    s = x + y
    time.sleep(3)
    print("主机IP{}:x + y = {}".format(get_host_ip(), s))
    return s

@app.task
def taskA():
    print("taskA begin...")
    print("主机IP{}:taskA".format(get_host_ip()))
    time.sleep(3)
    print("taskA end.")


@app.task
def taskB():
    print("taskB begin...")
    print("主机IP{}:taskB".format(get_host_ip()))
    time.sleep(3)
    print("taskB end.")


if __name__ == "__main__":
    print(get_host_ip())
    taskA()
    pass

settings.py

#!/usr/bin/env python
# encoding: utf-8
"""
@author: chaochen
@file: settings.py
@time: 2021/5/15 下午3:45
"""
from celery.schedules import crontab
from kombu import Queue
import re
from datetime import timedelta

CELERY_QUEUES = {
    Queue("default", routing_key="task.default"),  # 路由键以task开头的信息都进入到default队列中
    Queue("tasks_A", routing_key="A.#"),  # 路由键以A开头的信息都进入到task_A队列中
    Queue("tasks_B", routing_key="B.#")  # 路由键以B开头的信息都进入到task_B队列中
}

CELERY_DEFAULT_QUEUE = "default"
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"

CELERY_ROUTES = (
    {
        re.compile(r"CeleryProject\.tasks\.(taskA|taskB)"): {"queue": "tasks_A", "routing_key": "A.import"}
    },
    {
        "CeleryProject.tasks.add": {"queue": "default", "routing_key": "task.default"}
    }
)

BROKER_URL = "redis://192.168.53.184:6379/5"
CELERY_RESULT_BACKEND = "redis://192.168.53.184:6379/6"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ACCEPT_CONTENT = ['json']

CELERYBEAT_SCHEDULE = {
    "add":{
        "task":"CeleryProject.tasks.add",
        "schedule":timedelta(seconds=10),   #每10s执行一次
        "args":(10,16)
    },
    "taskA":{
        "task": "CeleryProject.tasks.taskA",
        "schedule": crontab(minute="*/1")   #每1min执行一次
    },
    "taskB":{
        "task": "CeleryProject.tasks.taskB",
        "schedule": crontab(minute="*/1")   #每1min执行一次
    }
}

if __name__ == "__main__":
    pass

7.Celery API

8.参考链接:

celery官方文档

Python 并行分布式框架 Celery 详解_cuomer的博客-CSDN博客_celer

RabbitMQ的Python客户端pika使用调研

CeleryProject

上一篇:芹菜错误:result.get超时


下一篇:应急通信市场的大哥,MESH自组网的前世今生。