目录
3.1 安装redis(本文将以redis作为broker)
1. Celery简介
Celery是由纯python编写的,但是协议可以用任何语言实现。目前,已有Ruby实现的RCelery、Node.js实现的node-celery及一个PHP客户端,语言互通也可以通过using webhooks实现。在使用Celery之前,我们先来了解以下几个概念:
任务队列:简单来说,任务队列就是存放着任务的队列,客户端将要执行的任务的消息放入任务队列中,执行节点worker进行持续监视队列,如果有新任务,就取出来执行该任务。这种机制就像生产者消、费者模型一样,客户端作为生产者,执行节worker点作为消费者,它们之间通过任务队列进行传递,如图:
中间人(broker):Celery用于消息通信,通常使用中间人(broker)在客户端和worker之间传递,这个过程从客户端(生产者)向队列添加消息开始,之后中间人把消息派送给worker(消费者)。官方给出的实现broker的工具如下:
名称 | 状态 | 监视 | 远程控制 |
RabbitMQ | 稳定 | 是 | 是 |
Redis | 稳定 | 是 | 是 |
MongoDB | 实验性 | 是 | 是 |
Beanstalk | 实验性 | 否 | 否 |
AmazonSQS | 实验性 | 否 | 否 |
Zookeeper | 实验性 | 否 | 否 |
DjangoDB | 实验性 | 否 | 否 |
SQLAlchemy | 实验性 | 否 | 否 |
CouchDB | 实验性 | 否 | 否 |
Iron MQ | 第三方 | 否 | 否 |
提示: 在实际的使用中,推荐使用RabbitMQ或者Redis作为broker
- 任务生产者:调用Celery提供的API、函数、装饰器产生任务并交给任务队列的都是任务生产者。
- 执行单元worker:属于任务队列的消费者,持续地监控任务队列,当队列中有新的任务时,便取出来执行。
- 任务结果存储backend:用来存储worker执行任务的结果,Celery支持不同形式的存储任务结果,包含Redis,MongoDB等。
- 任务调度器beat:Celery Beat进程会读取配置文件的内容,周期性地将配置中需要到期执行的任务发送到任务队列执行。
Celery的特性:
- 高可用:如果连接丢失或失败,worker和客户端就会自动重试,并且中间人broker通过主/主,主/从方式来提高可用性。
- 快速:单个Celery进程每分钟执行数以百万计的任务,且保持往返延迟在亚毫秒级,可以选择多进程、Gevernt等并发执行。
- 灵活:Celery几乎所有模块都可以扩展或单独使用。可以自制连接池,日志、调度器、消费者、生产者等等。
- 框架集成:Celery易于和web框架集成,如django-celery,web2py-celery、tornado-celery等等。
- 强大的调度功能:Celery Beat进程来实现强大的调度功能,可以指定任务在若干秒后或一个时间点来执行,也可以基于单纯的时间间隔或支持分钟、小时、每周的第几天、每月的第几天等等,用crontab表达式来使用周期任务调度。
- 易监控:可以方便地查看定时任务的执行情况,如执行是否成功,当前状态、完成任务花费时间等,还可以使用功能完备的管理后台或命令行添加、更新、删除任务,提供了完善的错误处理机制。
2. 安装Celery
推荐使用pip安装Celery,方式如下:
pip3 install celery
# 或者,该方式安装celery时,捆绑了一组特性依赖:librabbitmq,redis,auth,msgpack
pip3 install celery[librabbitmq,redis,auth,msgpack]
以下是可用的捆绑,供使用时做参考:
序列化
celery[auth]:
用于使用auth
安全序列化程序。
celery[msgpack]:
用于使用 msgpack 序列化程序。
celery[yaml]:
用于使用 yaml 序列化程序。
并发
celery[eventlet]:
用于使用eventlet池。
celery[gevent]:
使用gevent池。
传输和后端
celery[librabbitmq]:
用于使用 librabbitmq C 库。
celery[redis]:
使用 Redis 作为消息传输或结果后端。
celery[sqs]:
使用 Amazon SQS 作为消息传输(实验性)。
celery[tblib]:
用于使用该task_remote_tracebacks功能。
celery[memcache]:
使用 Memcached 作为结果后端(使用pylibmc)
celery[pymemcache]:
使用 Memcached 作为结果后端(纯 Python 实现)。
celery[cassandra]:
使用 Apache Cassandra 作为 DataStax 驱动程序的后端。
celery[couchbase]:
使用 Couchbase 作为结果后端。
celery[arangodb]:
使用 ArangoDB 作为结果后端。
celery[elasticsearch]:
使用 Elasticsearch 作为结果后端。
celery[riak]:
使用 Riak 作为结果后端。
celery[dynamodb]:
使用 AWS DynamoDB 作为结果后端。
celery[zookeeper]:
使用 Zookeeper 作为消息传输。
celery[sqlalchemy]:
使用 SQLAlchemy 作为结果后端(支持)。
celery[consul]:
使用 Consul.io 键/值存储作为消息传输或结果后端(实验性)。
celery[django]:
指定 Django 支持可能的最低版本。
使用源代码安装如下:(celery · PyPI)
# 下载源代码文件
wget https://files.pythonhosted.org/packages/66/60/2713f5be1906b81d40f823f4c30f095f7b97b9ccf3627abe1c79b1e2fd15/celery-5.1.2.tar.gz
# 解压
tar zxvf celery-5.1.2.tar.gz
# 进入目录
cd celery-5.1.2
# 构建
python3 setup.py build
# 安装,注意权限,可在前面添加sudo
python3 setup.py install
3. 安装RabbitMQ或Redis
3.1 安装redis(本文将以redis作为broker)
以Ubuntu为例,其他操作系统可参考RabbitMQ官网:Downloading and Installing RabbitMQ — RabbitMQ
在Ubuntu系统安装redis可以使用一下命令
sudo apt-get update
sudo apt-get install redis-server
启动redis
redis-server
查看redis是否启动
redis-cli
上面的命令将打开以下终端
redis 127.0.0.1:6379>
其中127.0.0.1是本机ip,6379是redis服务端口号,现在输入ping命令:
redis 127.0.0.1:6379> ping
PONG
以上说明redis已经安装成功。以下是通过源码包安装redis。
wget http://download.redis.io/releases/redis-6.0.6.tar.gz
tar xzf redis-6.0.6.tar.gz
cd redis-6.0.6
make
make命令执行完,在redis-6.0.6/src目录下会出现编译后的Redis服务程序redis-server和启动客户端程序redis-cli
如下命令启动Redis,此命令会一直处于占用状态,我们再重新开一个命令行连接
cd redis-6.0.6/src
./redis-server ../redis.conf
注意:如果redis-server 后面指定配置文件,则会以默认的配置启动redis服务。此处我们是使用的指定的默认redis配置文件。也可以根据需要使用自己的配置文件。
启动redis服务后,显示如下:
以上表示启动成功,可以使用测试客户端程序redis-cli和redis进行交互了,例如:
# 有$ 的一行表示shell命令
$ cd src
$ ./redis-cli
redis> set foo bar
OK
redis> get foo
"bar"
配置celery的BROKER_URL,redis的默认连接URL如下:
BROKER_URL = 'redis://localhost:6379/0'
3.2 安装RabbitMQ
这里仍以Ubuntu为例
Centos7.6系统参考:CentOS7.6 安装RabbitMQ_大帅的博客-CSDN博客
首先安装erlang。由于RabbitMQ需要Erlang语言的支持,因此需要先安装Erlang,执行命令:
sudo apt-get install erlang-nox
再安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
启动、关闭、重启、状态RabbitMQ服务的命令如下:
# 启动
sudo rabbitmq-server start
# 关闭
sudo rabbitmq-server stop
# 重启
sudo rabbitmq-server restart
# 查看rabbitmq状态
sudo rabbitmqctl status
要使用celery,需要创建一个RabbitMQ用户和虚拟主机,并且允许用户访问改虚拟主机。
# 创建rabbitmq的用户名为myuser,密码为mypassword,请自行设置
sudo rabbitmqctl add_user myuser mypassword
# 创建虚拟主机
sudo rabbitmqctl add_vhost myvhost
# 设置权限
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
RabbitMQ是默认的中间人的URL位置,生产环境根据实际情况修改即可。
BROKER_URL = 'amqp://guest:guest@localhost:5672'
4. 第一个Celery程序
我们选redis作为broker,首先要修改一下redis的配置文件redis.conf,修改bind=127.0.0.1为bind=0.0.0.0,意思是允许远程访问Redis数据库。修改完毕需要重启一下redis服务。
# sudo apt-get 方式安装重启
service redis-server restart
# 源码安装重启
src/redis-server ../redis.conf
启动成功后检查:
[root@python celery_demo]# ps -elf | grep redis
0 S root 38987 110789 0 80 0 - 28203 pipe_w 06:19 pts/2 00:00:00 grep --color=auto redis
4 S root 104561 2276 0 80 0 - 40606 ep_pol 04:00 pts/0 00:00:23 src/redis-server 127.0.0.1:6379
说明已成功启动。
现在来编写一个Celery程序
【示例1】(my_first_celery.py)
# encoding=utf-8
from celery import Celery
import time
app = Celery(
'tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0'
)
@app.task
def add(x, y):
time.sleep(3) # 模拟耗时操作
res = x + y
print(f"x + y = {res}")
return res
代码说明:
Celery()的第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称;第二个参数指定了中间人broker,第三个参数指定了后端存储。实现了一个add函数,该函数模拟了耗时操作,等待3秒,传入两个参数并返回之和,使用app.task来装饰该函数。
接下来我们启动任务执行单元worker。
celery -A my_first_celery worker -l info
命令说明:
-A 表示程序段模块名称,worker表示启动一个执行单元,-l是指-level,表示打印的日志等级,可以使用celery -help命令查看celery命令的帮助文档。
启动成功后显示如下:
如果不想用celery命令启动worker,则可以直接使用文件驱动,修改my_first_celery.py如下所示:
【实例2】使用入口函数启动(my_first_celery.py)
添加了app.start()启动
# encoding=utf-8
from celery import Celery
import time
app = Celery(
'tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0'
)
@app.task
def add(x, y):
time.sleep(3) # 模拟耗时操作
res = x + y
print(f"x + y = {res}")
return res
if __name__ == '__main__':
app.start()
然后再命令中执行python3 my_first_celery.py worker即可,启动后的界面和使用celery命令的结果是一致的。
接下来,编写任务调度程序:start_task.py
from my_first_celery import add # 导入任务函数add
import time
# delay异步调用,因为add函数里面会等待3秒,这里调用不会阻塞,程序会立即向下执行
result = add.delay(12, 12)
# ready方法检查任务是否执行完毕,此处会循环检查
while not result.ready():
print(time.strftime("%H:%M:%S"))
time.sleep(1)
print(result.get()) # 获取任务返回的结果,也就是两个数相加之和
print(result.successful()) # 判断任务是否成功执行
执行 python3 start_task.py 得到以下结果:
[root@python celery_demo]# python3 start_task.py
06:48:05
06:48:06
06:48:07
24
True
等待了3秒后(有可能会打印4次秒数),任务返回了24,并且成功完成。此时worker界面增加的信息如下:
[2021-09-11 06:48:05,236: INFO/MainProcess] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] received
[2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8] x + y = 24
[2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8]
[2021-09-11 06:48:08,244: INFO/ForkPoolWorker-8] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] succeeded in 3.007353223998507s: 24
启动 41425cd6-63c8-41df-bb74-74fd0c5c7438 是 taskid ,只要指定了backend,根据这id就可以随时去backend查找运行结果。使用方法如下:
>>> from my_first_celery import add
>>> taskid='41425cd6-63c8-41df-bb74-74fd0c5c7438'
>>> add.AsyncResult(taskid).get()
24
>>>
或者
>>> from celery.result import AsyncResult
>>> AsyncResult(taskid).get()
24
5. 第一个Celery工程项目
明天继续写。。