- celeny 可以脱离flask独立运行,为了保持项目一致性,本项目上还是将celeny定义在lghome下
二、项目实例
- 本项目对发送短信利用生产者消费者开发模式,采用 celery
1、初步实现
1.1 定义任务
- 安装celery包
- 引入celery包的Celery 类:取名为home,数据存贮在redis中,为方便与其它数据区分,放在数据库1中
- 创建celery_app对象
- 定义短信发送任务
from celery import Celery
celery_app=Celery("home",broken="redis://127.0.0.1:6379/1")
@celery_app.task
def send_sms(tid,mobile,datas):
'''发送短信异步任务'''
ccp=CCP()
ccp.send_message(tid,mobile,datas)
1.2 调用celey
- 原有的调用方式是同步发送,应注销掉,改调用celery
# lghome/api_1_0/verify_code.py
# # 发短信(同步发送)
# try:
# ccp = CCP()
# result = ccp.send_message(1,mobile_code, (sms_code, int(constants.SMS_CODE_REDIS_EXPIRES/60)))
# except Exception as e:
# logging.error(e)
# return jsonify(errno=RET.THIRDERR, errmsg='发送异常')
#
# # 返回值
# if result == 0:
# return jsonify(errno=RET.OK, errmsg='发送成功')
# else:
# return jsonify(errno=RET.THIRDERR, errmsg='发送失败')
# 发送短信(异步发送)
send_sms.delay(1, mobile_code, (sms_code, int(constants.SMS_CODE_REDIS_EXPIRES / 60)))
return jsonify(errno=RET.OK, errmsg='发送成功')
- 调用发送短信时,未进行异常捕获:
1.3 启动服务:启动任务处理者worker
- 在命今行启动服务:
celery -A lghome.tasks.task_sms worker -l info
- 在windwos下使用下面命令
celery -A lghome.tasks.task_sms worker -l info -P eventlet
- 其中:
- -A 指对应的应用程序, 其参数是项目中 Celery实例的位置。
- worker指这里要启动的worker。
- -l指日志等级,比如info等级。
1.4 验证测试
2、celery目录分层
- 上面利用celery 实现相关功能,但没有对目录分层,随着项目越来越大,应当进行目录分层,保持结构合理,功能规划合理
- 在tasks包里建一个包sms用于发送短信(每一个功能一个包)
- 包(sms)的任务必须叫 tasks.py (不能随便取名)
- 在tasks包 里建一个入口文件main.py
2.1 定义任务
# lghome/tasks/sms/tasks.py
from lghome.tasks.main import celery_app
from lghome.libs.ronglianyun.ccp_SMS import CCP
@celery_app.task
def send_sms(tid, mobile, datas):
'''发送短信异步任务'''
ccp = CCP()
ccp.send_message(tid, mobile, datas)
2.2 定义入口文件
# lghome/tasks/main.py
from celery import Celery
# 创建celery对象
celery_app = Celery("home")
# 加载配置文件
celery_app.config_from_object("lghome.tasks.config")
# 注册任务
celery_app.autodiscover_tasks(["lghome.tasks.sms"])
2.3 配置文件
# lghome/tasks/config.py
# broker_url
BROKER_URL="redis://127.0.0.1:6379/1"
#
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
2.4 调用celey
# from lghome.tasks.task_sms import send_sms
from lghome.tasks.sms.tasks import send_sms
send_sms.delay(1, mobile_code, (sms_code, int(constants.SMS_CODE_REDIS_EXPIRES / 60)))
return jsonify(errno=RET.OK, errmsg='发送成功')
2.5 启动服务:启动任务处理者worker
- 在命今行启动服务:
celery -A lghome.tasks.main worker -l info
- 在windwos下使用下面命令
celery -A lghome.tasks.main worker -l info -P eventlet