celery札记-如何查看celery状态

celery是什么?

我的理解比较简单,它是一个「任务队列」,我主要拿他来做两件事情:

1.处理异步任务

2.处理定时任务

一个简单任务

安装相应的pip包

pip install celery[redis]

准备项目文件

项目文件结构如下:

.
├── caller.py
├── tasks.py

tasks.py中存放任务函数:

import time

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')


@app.task
def add(x, y):
    time.sleep(30)
    print(f'add end, res:{x+y}')
    return x + y

caller.py中存放调用函数:

from tasks import add

for i in range(0, 20):
    add.delay(4, 4)

运行celery

在当前目录下运行一个worker:

celery -A tasks worker --loglevel=info

如何查看任务状态

一些原理

此处使用redis做celery的消息队列(broker),函数触发一次任务调用时,发送消息给redis,woker从redis中获取任务进行处理,如果任务过多,worker处理不过来,worker不会领取任务,任务会先存放在redis中。

默认运行的worker启动的进程数等于cpu核数(),例如我这里是4。也就是说此处的celery一次只能处理4个任务,如果一次有多个任务发送过来,有的任务就需要排队。此处将add函数的处理时间模拟为30s。

celery有一个「prefetch」的动作。例如上面的例子,虽然这个worker一次只能处理4个任务,但是这个worker除了接收它能处理的4个任务立即去执行,还要再接收一些任务准备运行,它打算再接收多少任务预备着呢?这取决于一个配置参数:worker_prefetch_multiplier,这个参数默认是4(),那么他会多接收的数量为:worker_prefetch_multiplier * 并发进程数,放在此处就是:4 * 4 = 16。也就是说,如果redis中有很多等待处理的任务,其实worker运行起来会一次拿走4 + 16 = 20个任务。

测试

查看有多少任务在消息队列中

celery在redis中的存放任务的队列key默认名称是celery),这个key只有当redis中有积压任务时才会存在,如果它不存在就代表当前消息队列中无消息。

运行caller.py,然后登陆redis查询:

127.0.0.1:6379> llen celery
(integer) 0

可以看到,尽管celery一次只能处理4个任务,但它把20个任务全领走了。由于我们的任务要30s才能处理完成,立即再运行一次caller.py,然后redis查询:

127.0.0.1:6379> llen celery
(integer) 20

可以看到,有20个任务还在消息队列中等待处理。

查看有多少任务正在运行

celery -A tasks  inspect active

查看有多少任务接收了但还未运行

这种任务在celery中叫做reserved task

celery -A tasks  inspect reserved
# 统计个数(数量为下面的结果-1)
celery -A tasks  inspect reserved | wc -l

查看worker状态

运行:

$ celery -A tasks  status
celery@itscs-MacBook-Pro.local: OK

1 node online.

可以看到,提示有一个worker(node)是在线的(online)。

使用flower在线查看

flower可以实时监控celery的状态,并且还能修改一些配置(生产环境慎用)。(

pip install flower
celery -A tasks flower

浏览器打开http://localhost:5555即可看到一个现实celery状态的网页。

参考

celery官方文档

上一篇:python3 简单邮箱验证的实现 celery异步


下一篇:celery简单使用