30 celery的基本使用-执行异步任务

celery的基本使用

1.下载

pip install celery

2 测试

30 celery的基本使用-执行异步任务

 

 

Celery执行异步任务

方式一:快速使用

30 celery的基本使用-执行异步任务

 

 

 

第一步:scripts/t-celery/main.py

####### 第一步:写一个py文件,实例化得到app,编写任务(main.py)
from celery import Celery
broker='redis://127.0.0.1:6379/1'   # 消息中间件
backend='redis://127.0.0.1:6379/2'  # 结果存储
app = Celery(__name__,backend=backend,broker=broker)  # app对象的名字,把当前文件的名字给它
# 写任务(函数),使用装饰器装饰一下
@app.task
def add(a, b):
    import time
    time.sleep(2)
    return a + b

第二步:scripts/t-celery/main.py/add_task.py在其他系统中,提交任务,(导入任务)

from main import add
# 同步调用,不叫提交任务
# res=add(3,4)
# print(res)
# 异步执行,先提交任务,并不执行
res=add.apply_async(args=[3,4])
# add.apply_async(kwds={'a':4,'b':4})
print(res)  # 任务id号:ef5c86e0-5efb-4c05-b073-5a872c8a8c28,查结果

第三步:启动worker执行任务

 使用命令启动worker
# win机器,需要安装 eventlet
 pip install eventlet
 3.x 及以前:celery worker -A main -l info -P eventlet(需要切换到指定文件夹,cd scripts,cd t-celery)
 4.x及以后:celery -A main worker -l info -P eventlet

# linux执行下面:
celery -A main worker -l info

第四步:worker就会从任务中取出任务执行(右键执行)

第五步:查看任务执行结果(result.py)

from main import app
from celery.result import AsyncResult
id = 'ef5c86e0-5efb-4c05-b073-5a872c8a8c28'
if __name__ == '__main__':
    asy = AsyncResult(id=id, app=app)
    if asy.successful(): # 顺利执行完了,可以取结果了
        res = asy.get()  # 任务执行的结果
        print(res)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

方式二:包管理形式

30 celery的基本使用-执行异步任务

 

# 如果 Celery对象:Celery(...) 是放在一个模块下的
# 1)终端切换到该模块所在文件夹位置:scripts
# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:模块名随意


# 如果 Celery对象:Celery(...) 是放在一个包下的
# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
# 注:包名随意

第一步:建立如下格式

celery_task # 包名
    __init__.py
    celery.py # 必须叫这个名字
    course_task.py # 一堆任务
    home_task.py # 一堆任务
    user_task.py # 一堆任务
任务提交-正常不写在这.py
结果查看--正常也不写在这.py

第二步,在celery.py中写入

from celery import Celery
broker='redis://127.0.0.1:6379/1'   # 消息中间件
backend='redis://127.0.0.1:6379/2'  # 结果存储
# include 表示哪些py文件的任务被app管理
app = Celery(__name__,backend=backend,broker=broker,include=['celery_task.course_task','celery_task.home_task','celery_task.user_task'])

第三步:写好多任务

#scripts/celery_task/user_task.py

from .celery import app   # 使用相对路径导入
import time
@app.task
def send_sms(phone,code):
    print('短信发送成功','手机号为:%s,code为:%s'%(phone,code))
    time.sleep(1)
    return '手机号为:%s,code为:%s'%(phone,code)

#scripts/celery_task/home_task.py

from .celery import app
import time
@app.task
def banner_update():
    print("更新图片")
    time.sleep(1)
    return True

第四步:启动worker,在包这一层路径   (scripts下)

(luffy) D:\python19\luffy_backend\scripts>celery -A celery_task worker -l info -P eventlet    #在包这一层路径下


#celery -A 包名 worker -l info -P eventlet celery -A celery_task worker -l info -P eventlet

第五步:提交任务(什么时候提交都可以) 

from celery_task.user_task import send_sms
res=send_sms.apply_async(args=['3354325334',5555])
print(res)  # 22f638b7-d5cb-41d4-ad08-12d666871436

第六步:结果查看

from celery_task.celery import app

from celery.result import AsyncResult

id = '22f638b7-d5cb-41d4-ad08-12d666871436'
if __name__ == '__main__':
    asy = AsyncResult(id=id, app=app)
    if asy.successful(): # 顺利执行完了,可以取结果了
        res = asy.get()  # 任务执行的结果
        print(res)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

30 celery的基本使用-执行异步任务

 

上一篇:8 前台使用elementui,bootstrap,jQuery


下一篇:Hive设置map和reduce数量