介绍
一个简单的celery + rabbitmq 的搭建例子,用于记录
Celery
异步处理框架, 安装命令
pip install celery
RabbitMQ
消息中间件,用来做队列
安装配置参考 https://juejin.cn/post/6993229378729541646
项目目录结构
先写celery_app文件夹里面的内容
celery_app/init.py
引入配置文件,并指定用什么做队列
from celery import Celery
# app = Celery("celery_app", backend=‘amqp‘)
app = Celery("celery_app", backend=‘amqp‘)
app.config_from_object("celery_app.celeryconfig")
celery_app/celeryconfig.py
celery的配置文件
RABBIT_MQ = {
‘HOST‘: ‘192.168.254.128‘,
‘PORT‘: 5672,
‘USER‘: ‘test‘,
‘PASSWORD‘: ‘123456‘
}
BROKER_URL = ‘amqp://%s:%s@%s:%s/myvhost‘ % (RABBIT_MQ[‘USER‘], RABBIT_MQ[‘PASSWORD‘], RABBIT_MQ[‘HOST‘], RABBIT_MQ[‘PORT‘])
CELERYD_LOG_FORMAT = ‘[%(asctime)s] [%(levelname)s] %(message)s‘
CELERY_TIMEZONE = ‘Asia/Shanghai‘ # 时区
CELERY_IMPORTS = ( # 指定需要导入的任务模块
‘celery_app.task1‘, # task1和task2是模块名称
‘celery_app.task2‘,
)
# 指定队列
CELERY_ROUTES={
"add":{"queue":"add",},
"multiply":{"queue":"mul"},
}
# CELERY_ROUTES = {
# ‘add‘: {‘queue‘: ‘add‘, ‘routing_key‘: ‘key1‘},
# ‘multiply‘: {‘queue‘: ‘mul‘, ‘routing_key‘: ‘key2‘},
# }
引入两个异步任务
celery_app/task1.py
import time
from celery_app import app
@app.task(queue="add") # 指定队列
def add(x,y):
time.sleep(2)
return x + y
celery_app/task2.py
import time
from celery_app import app
@app.task(queue="multiply")
def multiply(x, y):
time.sleep(2)
return x * y
最后,最外层的测试,将任务push进队列中去执行异步的任务
api.py
from celery_app.task1 import add
from celery_app.task2 import multiply
print("start...")
result = add.delay(1,2)
# result2 = multiply.delay(1,2)
print(‘异步。。‘,result, result2)
# print(‘异步。。‘,result)
print("end...")
启动
按照文章上面的步骤,安装好 rabbitmq 之后,启动 rabbitmq
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
然后启动 celery
celery -A celery_app worker --loglevel=info -Q add # celery_app 是celery指定的app名字,-l 是输出日志, -q 是指定队列
测试运行celery + rabbitmq
完。