celery分布式任务队列从入门到精通

目录

1. Celery简介

2. 安装Celery

3. 安装RabbitMQ或Redis

3.1 安装redis(本文将以redis作为broker)

3.2 安装RabbitMQ

4. 第一个Celery程序

5. 第一个Celery工程项目


1. Celery简介

    Celery是由纯python编写的,但是协议可以用任何语言实现。目前,已有Ruby实现的RCelery、Node.js实现的node-celery及一个PHP客户端,语言互通也可以通过using webhooks实现。在使用Celery之前,我们先来了解以下几个概念:

    任务队列:简单来说,任务队列就是存放着任务的队列,客户端将要执行的任务的消息放入任务队列中,执行节点worker进行持续监视队列,如果有新任务,就取出来执行该任务。这种机制就像生产者消、费者模型一样,客户端作为生产者,执行节worker点作为消费者,它们之间通过任务队列进行传递,如图:

celery分布式任务队列从入门到精通

     中间人(broker):Celery用于消息通信,通常使用中间人(broker)在客户端和worker之间传递,这个过程从客户端(生产者)向队列添加消息开始,之后中间人把消息派送给worker(消费者)。官方给出的实现broker的工具如下:

名称 状态 监视 远程控制
RabbitMQ 稳定
Redis 稳定
MongoDB 实验性
Beanstalk 实验性
AmazonSQS 实验性
Zookeeper 实验性
DjangoDB 实验性
SQLAlchemy 实验性
CouchDB 实验性
Iron MQ 第三方

        提示: 在实际的使用中,推荐使用RabbitMQ或者Redis作为broker

  •         任务生产者:调用Celery提供的API、函数、装饰器产生任务并交给任务队列的都是任务生产者。
  •         执行单元worker:属于任务队列的消费者,持续地监控任务队列,当队列中有新的任务时,便取出来执行。
  •         任务结果存储backend:用来存储worker执行任务的结果,Celery支持不同形式的存储任务结果,包含Redis,MongoDB等。
  •         任务调度器beat:Celery Beat进程会读取配置文件的内容,周期性地将配置中需要到期执行的任务发送到任务队列执行。

Celery的特性:

  •         高可用:如果连接丢失或失败,worker和客户端就会自动重试,并且中间人broker通过主/主,主/从方式来提高可用性。
  •         快速:单个Celery进程每分钟执行数以百万计的任务,且保持往返延迟在亚毫秒级,可以选择多进程、Gevernt等并发执行。
  •         灵活:Celery几乎所有模块都可以扩展或单独使用。可以自制连接池,日志、调度器、消费者、生产者等等。
  •         框架集成:Celery易于和web框架集成,如django-celery,web2py-celery、tornado-celery等等。
  •         强大的调度功能:Celery Beat进程来实现强大的调度功能,可以指定任务在若干秒后或一个时间点来执行,也可以基于单纯的时间间隔或支持分钟、小时、每周的第几天、每月的第几天等等,用crontab表达式来使用周期任务调度。
  •         易监控:可以方便地查看定时任务的执行情况,如执行是否成功,当前状态、完成任务花费时间等,还可以使用功能完备的管理后台或命令行添加、更新、删除任务,提供了完善的错误处理机制。

2. 安装Celery

 推荐使用pip安装Celery,方式如下:

pip3 install celery

# 或者,该方式安装celery时,捆绑了一组特性依赖:librabbitmq,redis,auth,msgpack
pip3 install celery[librabbitmq,redis,auth,msgpack]

以下是可用的捆绑,供使用时做参考

序列化

celery[auth]:用于使用auth安全序列化程序。

celery[msgpack]:用于使用 msgpack 序列化程序。

celery[yaml]:用于使用 yaml 序列化程序。

并发

celery[eventlet]:用于使用eventlet池。

celery[gevent]:使用gevent池。

传输和后端

celery[librabbitmq]:用于使用 librabbitmq C 库。

celery[redis]:使用 Redis 作为消息传输或结果后端。

celery[sqs]:使用 Amazon SQS 作为消息传输(实验性)。

celery[tblib]:用于使用该task_remote_tracebacks功能。

celery[memcache]:使用 Memcached 作为结果后端(使用pylibmc

celery[pymemcache]:使用 Memcached 作为结果后端(纯 Python 实现)。

celery[cassandra]:使用 Apache Cassandra 作为 DataStax 驱动程序的后端。

celery[couchbase]:使用 Couchbase 作为结果后端。

celery[arangodb]:使用 ArangoDB 作为结果后端。

celery[elasticsearch]:使用 Elasticsearch 作为结果后端。

celery[riak]:使用 Riak 作为结果后端。

celery[dynamodb]:使用 AWS DynamoDB 作为结果后端。

celery[zookeeper]:使用 Zookeeper 作为消息传输。

celery[sqlalchemy]:使用 SQLAlchemy 作为结果后端(支持)。

celery[consul]:使用 Consul.io 键/值存储作为消息传输或结果后端(实验性)。

celery[django]:指定 Django 支持可能的最低版本。

使用源代码安装如下:(celery · PyPI

# 下载源代码文件
wget https://files.pythonhosted.org/packages/66/60/2713f5be1906b81d40f823f4c30f095f7b97b9ccf3627abe1c79b1e2fd15/celery-5.1.2.tar.gz

# 解压
tar zxvf celery-5.1.2.tar.gz

# 进入目录
cd celery-5.1.2

# 构建
python3 setup.py build

# 安装,注意权限,可在前面添加sudo
python3 setup.py install

3. 安装RabbitMQ或Redis

3.1 安装redis(本文将以redis作为broker)

        以Ubuntu为例,其他操作系统可参考RabbitMQ官网:Downloading and Installing RabbitMQ — RabbitMQ

        在Ubuntu系统安装redis可以使用一下命令

sudo apt-get update
sudo apt-get install redis-server

        启动redis

redis-server

         查看redis是否启动

redis-cli

        上面的命令将打开以下终端

redis 127.0.0.1:6379>

        其中127.0.0.1是本机ip,6379是redis服务端口号,现在输入ping命令:

redis 127.0.0.1:6379> ping
PONG

         以上说明redis已经安装成功。以下是通过源码包安装redis

wget http://download.redis.io/releases/redis-6.0.6.tar.gz
tar xzf redis-6.0.6.tar.gz
cd redis-6.0.6
make

        make命令执行完,在redis-6.0.6/src目录下会出现编译后的Redis服务程序redis-server和启动客户端程序redis-cli

        如下命令启动Redis,此命令会一直处于占用状态,我们再重新开一个命令行连接

cd redis-6.0.6/src

./redis-server ../redis.conf

         注意:如果redis-server 后面指定配置文件,则会以默认的配置启动redis服务。此处我们是使用的指定的默认redis配置文件。也可以根据需要使用自己的配置文件。

        启动redis服务后,显示如下:

        celery分布式任务队列从入门到精通

        以上表示启动成功,可以使用测试客户端程序redis-cli和redis进行交互了,例如:

# 有$ 的一行表示shell命令
$ cd src
$ ./redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

         配置celery的BROKER_URL,redis的默认连接URL如下:

BROKER_URL = 'redis://localhost:6379/0'

3.2 安装RabbitMQ

        这里仍以Ubuntu为例

       Centos7.6系统参考:CentOS7.6 安装RabbitMQ_大帅的博客-CSDN博客

        首先安装erlang。由于RabbitMQ需要Erlang语言的支持,因此需要先安装Erlang,执行命令:

sudo apt-get install erlang-nox

         再安装RabbitMQ

sudo apt-get update
sudo apt-get install rabbitmq-server

        启动、关闭、重启、状态RabbitMQ服务的命令如下:

# 启动
sudo rabbitmq-server start

# 关闭
sudo rabbitmq-server stop

# 重启
sudo rabbitmq-server restart

# 查看rabbitmq状态
sudo rabbitmqctl status

        要使用celery,需要创建一个RabbitMQ用户和虚拟主机,并且允许用户访问改虚拟主机。

# 创建rabbitmq的用户名为myuser,密码为mypassword,请自行设置
sudo rabbitmqctl add_user myuser mypassword

# 创建虚拟主机
sudo rabbitmqctl add_vhost myvhost

# 设置权限
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

        RabbitMQ是默认的中间人的URL位置,生产环境根据实际情况修改即可。

BROKER_URL = 'amqp://guest:guest@localhost:5672'

4. 第一个Celery程序

        我们选redis作为broker,首先要修改一下redis的配置文件redis.conf,修改bind=127.0.0.1为bind=0.0.0.0,意思是允许远程访问Redis数据库。修改完毕需要重启一下redis服务。

# sudo apt-get 方式安装重启
service redis-server restart

# 源码安装重启
src/redis-server ../redis.conf

        启动成功后检查:

[root@python celery_demo]# ps -elf | grep redis
0 S root      38987 110789  0  80   0 - 28203 pipe_w 06:19 pts/2    00:00:00 grep --color=auto redis
4 S root     104561   2276  0  80   0 - 40606 ep_pol 04:00 pts/0    00:00:23 src/redis-server 127.0.0.1:6379

        说明已成功启动。

        现在来编写一个Celery程序

【示例1】(my_first_celery.py)

# encoding=utf-8

from celery import Celery
import time

app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0'
)


@app.task
def add(x, y):
    time.sleep(3)  # 模拟耗时操作
    res = x + y
    print(f"x + y = {res}")
    return res

代码说明

        Celery()的第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称;第二个参数指定了中间人broker,第三个参数指定了后端存储。实现了一个add函数,该函数模拟了耗时操作,等待3秒,传入两个参数并返回之和,使用app.task来装饰该函数。

        接下来我们启动任务执行单元worker。

celery -A my_first_celery worker -l info

命令说明:

        -A 表示程序段模块名称,worker表示启动一个执行单元,-l是指-level,表示打印的日志等级,可以使用celery -help命令查看celery命令的帮助文档。

        启动成功后显示如下:

celery分布式任务队列从入门到精通

         如果不想用celery命令启动worker,则可以直接使用文件驱动,修改my_first_celery.py如下所示:

【实例2】使用入口函数启动my_first_celery.py

 添加了app.start()启动

# encoding=utf-8

from celery import Celery
import time

app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0'
)


@app.task
def add(x, y):
    time.sleep(3)  # 模拟耗时操作
    res = x + y
    print(f"x + y = {res}")
    return res


if __name__ == '__main__':
    app.start()

        然后再命令中执行python3 my_first_celery.py worker即可,启动后的界面和使用celery命令的结果是一致的。

接下来,编写任务调度程序:start_task.py

from my_first_celery import add  # 导入任务函数add
import time

# delay异步调用,因为add函数里面会等待3秒,这里调用不会阻塞,程序会立即向下执行
result = add.delay(12, 12)

# ready方法检查任务是否执行完毕,此处会循环检查
while not result.ready():
    print(time.strftime("%H:%M:%S"))
    time.sleep(1)

print(result.get())         # 获取任务返回的结果,也就是两个数相加之和
print(result.successful())  # 判断任务是否成功执行

        执行 python3 start_task.py 得到以下结果:

[root@python celery_demo]# python3 start_task.py
06:48:05
06:48:06
06:48:07
24
True

        等待了3秒后(有可能会打印4次秒数),任务返回了24,并且成功完成。此时worker界面增加的信息如下:

[2021-09-11 06:48:05,236: INFO/MainProcess] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] received
[2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8] x + y = 24
[2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8]

[2021-09-11 06:48:08,244: INFO/ForkPoolWorker-8] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] succeeded in 3.007353223998507s: 24

        启动 41425cd6-63c8-41df-bb74-74fd0c5c7438 是 taskid ,只要指定了backend,根据这id就可以随时去backend查找运行结果。使用方法如下:

>>> from my_first_celery import add
>>> taskid='41425cd6-63c8-41df-bb74-74fd0c5c7438'
>>> add.AsyncResult(taskid).get()
24
>>>

        或者

>>> from celery.result import AsyncResult
>>> AsyncResult(taskid).get()
24

5. 第一个Celery工程项目

        明天继续写。。

上一篇:django-celery vs django-celery-beat


下一篇:[Celery分布式的异步任务框架]