celery的基本使用
1.下载
pip install celery
2 测试
Celery执行异步任务
方式一:快速使用
第一步:scripts/t-celery/main.py
####### 第一步:写一个py文件,实例化得到app,编写任务(main.py) from celery import Celery broker='redis://127.0.0.1:6379/1' # 消息中间件 backend='redis://127.0.0.1:6379/2' # 结果存储 app = Celery(__name__,backend=backend,broker=broker) # app对象的名字,把当前文件的名字给它 # 写任务(函数),使用装饰器装饰一下 @app.task def add(a, b): import time time.sleep(2) return a + b
第二步:scripts/t-celery/main.py/add_task.py在其他系统中,提交任务,(导入任务)
from main import add # 同步调用,不叫提交任务 # res=add(3,4) # print(res) # 异步执行,先提交任务,并不执行 res=add.apply_async(args=[3,4]) # add.apply_async(kwds={'a':4,'b':4}) print(res) # 任务id号:ef5c86e0-5efb-4c05-b073-5a872c8a8c28,查结果
第三步:启动worker执行任务
使用命令启动worker # win机器,需要安装 eventlet pip install eventlet 3.x 及以前:celery worker -A main -l info -P eventlet(需要切换到指定文件夹,cd scripts,cd t-celery) 4.x及以后:celery -A main worker -l info -P eventlet # linux执行下面: celery -A main worker -l info
第四步:worker就会从任务中取出任务执行(右键执行)
第五步:查看任务执行结果(result.py)
from main import app from celery.result import AsyncResult id = 'ef5c86e0-5efb-4c05-b073-5a872c8a8c28' if __name__ == '__main__': asy = AsyncResult(id=id, app=app) if asy.successful(): # 顺利执行完了,可以取结果了 res = asy.get() # 任务执行的结果 print(res) elif asy.failed(): print('任务失败') elif asy.status == 'PENDING': print('任务等待中被执行') elif asy.status == 'RETRY': print('任务异常后正在重试') elif asy.status == 'STARTED': print('任务已经开始被执行')
方式二:包管理形式
# 如果 Celery对象:Celery(...) 是放在一个模块下的 # 1)终端切换到该模块所在文件夹位置:scripts # 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:模块名随意 # 如果 Celery对象:Celery(...) 是放在一个包下的 # 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中 # 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:包名随意
第一步:建立如下格式
celery_task # 包名 __init__.py celery.py # 必须叫这个名字 course_task.py # 一堆任务 home_task.py # 一堆任务 user_task.py # 一堆任务 任务提交-正常不写在这.py 结果查看--正常也不写在这.py
第二步,在celery.py中写入
from celery import Celery broker='redis://127.0.0.1:6379/1' # 消息中间件 backend='redis://127.0.0.1:6379/2' # 结果存储 # include 表示哪些py文件的任务被app管理 app = Celery(__name__,backend=backend,broker=broker,include=['celery_task.course_task','celery_task.home_task','celery_task.user_task'])
第三步:写好多任务
#scripts/celery_task/user_task.py
from .celery import app # 使用相对路径导入 import time @app.task def send_sms(phone,code): print('短信发送成功','手机号为:%s,code为:%s'%(phone,code)) time.sleep(1) return '手机号为:%s,code为:%s'%(phone,code)
#scripts/celery_task/home_task.py
from .celery import app import time @app.task def banner_update(): print("更新图片") time.sleep(1) return True
第四步:启动worker,在包这一层路径 (scripts下)
(luffy) D:\python19\luffy_backend\scripts>celery -A celery_task worker -l info -P eventlet #在包这一层路径下
#celery -A 包名 worker -l info -P eventlet celery -A celery_task worker -l info -P eventlet
第五步:提交任务(什么时候提交都可以)
from celery_task.user_task import send_sms res=send_sms.apply_async(args=['3354325334',5555]) print(res) # 22f638b7-d5cb-41d4-ad08-12d666871436
第六步:结果查看
from celery_task.celery import app from celery.result import AsyncResult id = '22f638b7-d5cb-41d4-ad08-12d666871436' if __name__ == '__main__': asy = AsyncResult(id=id, app=app) if asy.successful(): # 顺利执行完了,可以取结果了 res = asy.get() # 任务执行的结果 print(res) elif asy.failed(): print('任务失败') elif asy.status == 'PENDING': print('任务等待中被执行') elif asy.status == 'RETRY': print('任务异常后正在重试') elif asy.status == 'STARTED': print('任务已经开始被执行')