celery task异步任务

业务端后台:通过python manage运行
运行用例时,用python manage运行时会卡,影响效率
celery task 本身自己也是个服务,异步处理case
异步:小明去给我买个东西,我去写代码,小明买完回来给我
同步:小明去给我买个东西,我在这里等着他回来

celery结构
--celery task
  --run
    --tasks #运行异步任何的核心地址
  --config.py #存celery配置
  --main.py #运行目录

安装命令:

pip install celery 

pip  install celery==4.4.7    #指定版本安装
pip install eventlet

一、config.py

# 接收任务的中间件
broker_url = "redis://:@127.0.0.1:6379/1"
# 任务结果的中间件
result_backend = "redis://:@127.0.0.1:6379/1"

二、main.py 

from celery import Celery
import os,django
if not os.getenv('DJANGO_SETTINGS_MODULE'):
    os.environ['DJANGO_SETTINGS_MODULE'] = 'sky.settings'
app = Celery('sksystem')

# 获取celery的配置信息
app.config_from_object('celery_tasks.config')
# 提供tasks的路径,自动发现需要运行的task任务
app.autodiscover_tasks(['celery_tasks.run'])


# Mac 安装
# sudo pip install celery
# 启动celery的方法 默认以cpu的核数多进程的方式启动
# celery -A 应用路径 worker -l 日志级别
# mac同学
# celery -A celery_tasks.main worker -l info 普通启动
# celery multi start w1 -A celery_tasks.main -l info  --logfile=logs/celerylog.log --pidfile=logs/celerypid.pid 后台运行
# celery flower -A celery_tasks.main 打开一个web页面启动 需要提前安装下flow  安装命令:pip install flower

# win同学
# pip install celery
# pip install eventlet
# celery -A celery_tasks.main worker -l info -P eventlet

# -- * - **** ---
# - ** ---------- [config]
# - ** ---------- .> app:         sksystem:0x103d0deb8                      启动是那个app的任务
# - ** ---------- .> transport:   redis://10.168.100.21:6379/2              设置的broker的队列是那个
# - ** ---------- .> results:     redis://10.168.100.21:6379/3              设置backend的存储地址
# - *** --- * --- .> concurrency: 4 (prefork)                               默认启动的进程数
# -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
# --- ***** -----
#  -------------- [queues]
#                 .> celery           exchange=celery(direct) key=celery


# 在view视图中只需要导入tasks中写好的任务方法 通过任务方法调用delay()即可
# from celery_tasks.run.tasks import run_case
# 调用task任务 参数可以在delay中传递,正常调用一样
# run_case.delay()

三、Task.py

# 通过装饰器 app.task 实现将普通函数变为celery的执行函数
# name 给方法命名
@app.task(name='demo')
def demo():
    logger.info('info')
    logger.debug('debug')

执行启动命令后

celery -A celery_tasks.main worker -l info -P eventlet
可以看到可执行的任务:
[tasks]
  . demo

四、tests.py

#测试
from celery_tasks.run.tasks import demo
#通过这个方法调用异步函数
demo.delay()
#调用日志
[2021-01-23 22:13:28,277: INFO/MainProcess] celery@xiaolin-PC ready.
[2021-01-23 22:13:28,288: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
[2021-01-23 22:31:06,912: INFO/MainProcess] Received task: demo[fe18b516-40bc-47e4-8243-5451242bd4d8]
[2021-01-23 22:31:06,914: INFO/MainProcess] demo[fe18b516-40bc-47e4-8243-5451242bd4d8]: info
[2021-01-23 22:31:06,918: INFO/MainProcess] Task demo[fe18b516-40bc-47e4-8243-5451242bd4d8] succeeded in 0.0s: None

  

 

上一篇:173. Python语言 的 Flask框架项目 之 异步方案 Celery 第二章:Celery 介绍和使用


下一篇:[源码解析] 并行分布式框架 Celery 之架构 (2)