相关概念:
解耦: 将耗时的发短信任务逻辑从主逻辑中分离出来的动作, 让响应不受耗时任务的影响
生产者消费者设计模式: 最常用的解耦模式
生产者 ==> 生成任务,消息
消息队列 ==> 缓存任务,消息
消费者 ==> 执行任务,消息
实现: 让生产者生成发短信任务,再把任务放在消息队列里面,最后由消费者执行任务
1. celery简介:
celery 是一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行
特点:单个 Celery 进程每分钟可处理数以百万计的任务.
通过消息进行通信,使用消息队列(中间人或broker)在生产者和消费者之间协调
2. 使用场景:
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
3. 使用实例(短信验证码异步):
1)安装
pip install celery -i http://pypi.douban.com/simple --trusted-host pypi.douban.com2) 创建实例并配置:
a.定义celery包: 在项目外层增加一个celery_tasks包或者文件夹
b.创建celery实例:在celery_tasks中添加 main.py文件
from celery import Celery celery_app = Celery('名称') #名称是自己取的c.加载celery配置: 在celery_tasks包中添加config.py文件
在文件中指定消息队列(中间人) 使用redis 作为中间人 broker_url = 'redis://127.0.0.1:6379/3' 使用rabbitmq 作为中间人 broker_url= 'amqp://用户名:密码@ip地址:5672' 示例: # 例如: # meihao: 在rabbitq中创建的用户名, 注意: 远端链接时不能使用guest账户. # 123456: 在rabbitq中用户名对应的密码 # ip部分: 指的是当前rabbitq所在的电脑ip # 5672: 是规定的端口号 broker_url = 'amqp://meihao:123456@172.16.238.128:5672'd. 在celery_tasks.main.py 中, 将刚刚的config配置给 celery
celery_app.config_from_object('celery_tasks.config')e. 定义任务:
注册任务:在celery_tasks包下 在创建一个包名字随意(在这里我们创建sms包)
创建好后,在里面添加一个tasks.py文件 (tasks文件名不能改)
然后再,celery_tasks.main.py报备刚刚创建的文件
实现任务:在celery_tasks.sms.tasks.py文件中添加如下代码:
from celery_tasks.main import celery_app @celery_app.task(name='ccp_send_sms_code') def ccp_send_sms_code(mobile, sms_code): result = CCP().send_tempalte_sms(mobile, [sms_code, 5], 1) # 这个方法需要从yuntongxun里导入 return result注意: 真实的开发环境有可能会把 celery_tasks 单独拿到某一个电脑上独立执行,可以把yuntongxun 复制一份, 放到 celery_tasks 下面, 拿走的时候, 直接调用走就可以
4. 可能需要的操作命令
ubuntu 启动celery:celery -A celery_tasks.main worker -l info
补充celery worker的工作模式 指定进程数:
celery worker -A proj --concurrency=4 改变进程池方式为协程方式:
celery worker -A proj --concurrency=1000 -P evenlet -c 1000 安装 evenlet 模块 :
pip install eventlet 启用 eventlet 池:
celery -A celery_tasks.main worker -l info -P eventlet -c 1000 windows10启用celery:
celery -A celery_tasks.main worker -l info -P eventlet