牛年第一章:别乱找了,flask配置celery4全在这里了

前言

最近在使用flask开发的时候,遇到了一些需要放到后台去运行的任务,在django里面我会立马就想到django-celery这个库,但是去github搜索了一下flask类似的库,质量却是严重残次不齐,有的甚至已经年久失修了,花费了很久的时间去实践最后却是各种报错。

并且官方文档中的celery后套任务只适合那app.py模式的开发的,工厂模式好像只字未提,反正我是没有找到。为了避免更多的人继续踩坑,同时也给自己一个可以记录的文章,这里我就通过这篇博文简单说一说我的flask+celery的脱坑之旅吧。

安装

首先我的电脑是Windows10专业版,python版本是3.8.6,只要你是python3.6+的应该问题不大

pip install flask
pip install celery==4.4.2  # 目前celery的最新版本是5.0,但是我们还是先使用4.4.2吧

GitHub中大部分的仓库都是基于celery3的,但是4.4.2修复了很多的问题,所以选择4.4.2版本。

参见:https://www.pythonheidong.com/blog/article/414978/9e3f20768743047ded01/

由于我的broker和backend设置的是redis所以还需要安装redis库

pip install redis

由于我们是用的redis做得broker,所以我们先启动redis。

牛年第一章:别乱找了,flask配置celery4全在这里了

另外值得注意的是在Win10上面使用celery4.4.2是需要安装一个eventlet库的,具体原因还请看看celery官方仓库的issues

地址:https://github.com/celery/celery/issues/4081

如果不安装的话是会报错的,报错详情:ValueError: not enough values to unpack (expected 3, got 0)

pip install eventlet

坑几何?

包括以下几种:

  • 循环导入问题
  • 没有在app上下文环境中
  • 找不到task任务

我本人已经踩过这些坑了,所以下面直接就是正确的示例,是不是很OK!

单文件模式

flask单文件模式,就是常见的app = Flask(__name__)模式。在一个文件中也可以完成的。

下面直接上代码

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/1'
app.config['CELERY_RESULT_BACKEND'] = 'redis://127.0.0.1:6379/2'
celery_app = Celery(__name__,
                    broker=app.config['CELERY_BROKER_URL'],
                    backend=app.config['CELERY_RESULT_BACKEND'])


@celery_app.task()
def add2(x, y):
    return x + y


@app.route('/')
def index():
    results = add2.delay(3, 5)
    return str(results.wait())


if __name__ == '__main__':
    app.run(debug=True)

这些就是单文件模式的代码,这其中我们添加了一个任务add2,然后启动flask。

牛年第一章:别乱找了,flask配置celery4全在这里了

然后由于celery和flask是同级别的app,所以我们需要一个新的窗口启动celery,加入-P参数指定eventlet

牛年第一章:别乱找了,flask配置celery4全在这里了

当我们启动celery之后。看到最后一行的ready的时候,说明我们的celery已经启动成功了。然后再看有一个[tasks]位置下面有一个. app.add2的标识说明我们的任务已经被添加成功了。如果[tasks]下面没有这种标识,说明我们的celery任务加载失败了。就是上面的坑3

这时我们打开浏览器访问以下我们的网址:http://127.0.0.1:5000/

牛年第一章:别乱找了,flask配置celery4全在这里了

同时我们查看一下celery的窗口:

牛年第一章:别乱找了,flask配置celery4全在这里了

打印出了任务执行的日志。单文件模式我们就讲到这里吧。

工厂模式

当然我们如果用flask写一个稍微复杂的东西的话,其实工厂模式我们应该用的更多。下面我们一起来看看工厂模式中的配置。首先我们先规划一个flask+celery的目录结构。

首先是一级目录flasker,我们先在项目中创建这个目录。然后创建下面的文件:

文件名 作用
__init__.py flask工厂模式app创建
config.py 配置文件
tasks.py celery任务模块
views.py flask视图模块
workers.py celery APP创建

首先呢我们需要在配置文件中写入东西

import os

SECRET_KEY = os.getenv('SECRET_KEY', "2021")
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

里面就写一个加密和celery的一些配置信息吧。

__init__.py

我们在__init__.py文件中创建出flask的app

from flask import Flask


def create_app():
    app = Flask(__name__)
    app.config.from_pyfile('config.py')
    register_blueprints(app)
    return app


def create_celery_app():
    app = Flask(__name__)
    app.config.from_pyfile('config.py')
    # 如果有extensions,仅添加extensions即可
    return app


def register_blueprints(app):
    from .views import th
    app.register_blueprint(th, url_prefix='/')

可以看到我们创建了两个app第一个create_app()是flask项目的app,第二个是针对celery另外创建的flask-celery-app。这两个app有什么不同呢,主要是为了解决,循环引用的问题。在create_app中我们导入了视图,就是register_blueprints,但是在为celery中创建的我们没有导入。就是为了避免循环导入的问题。为什么会产生循环引用的问题呢?我在这里画一张图让我们可以更清晰的看到。

牛年第一章:别乱找了,flask配置celery4全在这里了

可以看大我们单独创建一个app的话是不必注册蓝图的,就不存在循环导入的问题。而直接使用flask的app由于需要注册蓝图到app中,所以就会产生循环导入的问题。

workers.py

上面的两个准备工作做好了,我们就可以创建一个celery的app了,我们新建一个workers的py文件,写入以下内容

from celery import Celery
from flasker import create_celery_app


def make_celery(app):
    celery = Celery(app.import_name,
                    broker=app.config['CELERY_BROKER_URL'],
                    backend=app.config['CELERY_RESULT_BACKEND'])
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery


flask_app = create_celery_app()
celery = make_celery(flask_app)

这其中主要的就是make_celery函数了,写法我们就复制的flask官方文档中的内容。

牛年第一章:别乱找了,flask配置celery4全在这里了

然后导入我们刚才创建的create_celery_app专门为celery服务的flask app,然后生成celery对象。

一切准备就绪后,我们开始创建任务

tasks.py

from .workers import celery


@celery.task()
def add2(x, y):
    return x * y

我们导入celery app然后创建一个任务。

views.py

任务创建好之后我们创建一个蓝图来使用我们的celery任务。

from flask import Blueprint
from .tasks import add2

th = Blueprint('', __name__)


@th.route('/')
def index():
    res = add2.delay(6, 6)
    return str(res.wait())

这里我们创建了一个蓝图,然后导入add2任务,并在index视图中使用该任务。

执行任务

接下来我们开始启动一下,首先我们启动flask > flask run,同时和单文件模式一样我们还需要启动celery。

在这里我们启动celery我们有两种方式启动celery

启动方式1:

牛年第一章:别乱找了,flask配置celery4全在这里了

可以看到命令行提示信息,我们已经启动成功了。我们打开浏览器查看

牛年第一章:别乱找了,flask配置celery4全在这里了

可以看到执行成功了。再看看命令行。

牛年第一章:别乱找了,flask配置celery4全在这里了

任务已经成功的执行了。

启动方式2:

我们接着按照第二种启动方式执行一下。

牛年第一章:别乱找了,flask配置celery4全在这里了

和启动方式1不同的地方就是在红框中被框选的内容。可以看到启动成功了。但是这种启动方式有一个坑,什么坑呢 ,看红色的框框中的[tasks],没有发现任务,就是上面的坑之一喽。出现这种情况,我们应该怎么做呢?上面有一个django配置的文章,我们可以参照django的celery配置内容,在make_celery的函数return之前给它加入自动发现任务。具体内容如下:

	...
    
	celery.Task = ContextTask
    celery.autodiscover_tasks([app.import_name])
    return celery

加入autodiscover_tasks方法传入并传入应用名称的列表,加好之后启动celery。

牛年第一章:别乱找了,flask配置celery4全在这里了

启动好了之后,我们启动flask和celery,访问浏览器。

牛年第一章:别乱找了,flask配置celery4全在这里了

可以看到执行成功了。这样flask—celery就配置好了。

最后一个坑

当然还有两个坑我们没有说到,一个就是没有在app上下文中。第二就是找不到flask-app,第二个我现在已经找不到怎么复现了,所以我们直接就说不在应用上下文这个吧。这个怎么回事呢?就是当我们给make_celery中传入current_app时就会报这个错误。

from flask import current_app
celery = make_celery(current_app)

这样的原因我还不知道,可能以后看懂源码就知道了。。。

后记

好了flask+celery就配置完成了。

可能相比django+celery的配置就麻烦了许多,所以flask的学习就是要更多更多的去参考社区的资料。所以Google常备身边。

上一篇:adminset解析


下一篇:djcelery消息分布式系统配置