flask项目1实战:2.5 Celery介绍和使用(待完善)

flask项目1实战:2.5 Celery介绍和使用(待完善)

(根据居然老师直播课内容整理)
  • celeny 可以脱离flask独立运行,为了保持项目一致性,本项目上还是将celeny定义在lghome下

二、项目实例

  • 本项目对发送短信利用生产者消费者开发模式,采用 celery

1、初步实现

1.1 定义任务

  • 安装celery包
  • 引入celery包的Celery 类:取名为home,数据存贮在redis中,为方便与其它数据区分,放在数据库1中
  • 创建celery_app对象
  • 定义短信发送任务
from celery import Celery 

celery_app=Celery("home",broken="redis://127.0.0.1:6379/1")

@celery_app.task
def send_sms(tid,mobile,datas):
    '''发送短信异步任务'''
    ccp=CCP()
    ccp.send_message(tid,mobile,datas)

1.2 调用celey

  • 原有的调用方式是同步发送,应注销掉,改调用celery
	# lghome/api_1_0/verify_code.py

    # # 发短信(同步发送)
    # try:
    #     ccp = CCP()
    #     result = ccp.send_message(1,mobile_code, (sms_code, int(constants.SMS_CODE_REDIS_EXPIRES/60)))
    # except Exception as e:
    #     logging.error(e)
    #     return jsonify(errno=RET.THIRDERR, errmsg='发送异常')
    #
    # # 返回值
    # if result == 0:
    #     return jsonify(errno=RET.OK, errmsg='发送成功')
    # else:
    #     return jsonify(errno=RET.THIRDERR, errmsg='发送失败')

    # 发送短信(异步发送)
    send_sms.delay(1, mobile_code, (sms_code, int(constants.SMS_CODE_REDIS_EXPIRES / 60)))

    return jsonify(errno=RET.OK, errmsg='发送成功')
  • 调用发送短信时,未进行异常捕获:

1.3 启动服务:启动任务处理者worker

  • 在命今行启动服务:

celery -A lghome.tasks.task_sms worker -l info

  • 在windwos下使用下面命令

celery -A lghome.tasks.task_sms worker -l info -P eventlet

  • 其中:
    • -A 指对应的应用程序, 其参数是项目中 Celery实例的位置。
    • worker指这里要启动的worker。
    • -l指日志等级,比如info等级。
      flask项目1实战:2.5 Celery介绍和使用(待完善)

1.4 验证测试

flask项目1实战:2.5 Celery介绍和使用(待完善)

2、celery目录分层

  • 上面利用celery 实现相关功能,但没有对目录分层,随着项目越来越大,应当进行目录分层,保持结构合理,功能规划合理
  • 在tasks包里建一个包sms用于发送短信(每一个功能一个包)
  • 包(sms)的任务必须叫 tasks.py (不能随便取名)
  • 在tasks包 里建一个入口文件main.py

2.1 定义任务

# lghome/tasks/sms/tasks.py 
from lghome.tasks.main import celery_app
from lghome.libs.ronglianyun.ccp_SMS import CCP


@celery_app.task
def send_sms(tid, mobile, datas):
    '''发送短信异步任务'''
    ccp = CCP()
    ccp.send_message(tid, mobile, datas)

2.2 定义入口文件

# lghome/tasks/main.py

from celery import Celery

# 创建celery对象
celery_app = Celery("home")

# 加载配置文件
celery_app.config_from_object("lghome.tasks.config")

# 注册任务
celery_app.autodiscover_tasks(["lghome.tasks.sms"])

2.3 配置文件

# lghome/tasks/config.py

# broker_url
BROKER_URL="redis://127.0.0.1:6379/1"
# 
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

2.4 调用celey

    # from lghome.tasks.task_sms import send_sms
    from lghome.tasks.sms.tasks import send_sms
    send_sms.delay(1, mobile_code, (sms_code, int(constants.SMS_CODE_REDIS_EXPIRES / 60)))

    return jsonify(errno=RET.OK, errmsg='发送成功')

2.5 启动服务:启动任务处理者worker

  • 在命今行启动服务:

celery -A lghome.tasks.main worker -l info

  • 在windwos下使用下面命令

celery -A lghome.tasks.main worker -l info -P eventlet
flask项目1实战:2.5 Celery介绍和使用(待完善)

2.6 验证测试

flask项目1实战:2.5 Celery介绍和使用(待完善)

上一篇:Celery使用


下一篇:Celery异步操作