美多商城之验证码(异步方案)

三、异步方案RabbitMQ和Celery

3.1 生产者消费者设计模式  【替换之前重写的send_flag代码】

思考:

  • 下面两行代码存在什么问题?

美多商城之验证码(异步方案)

问题:

  • 我们的代码是自上而下同步执行的。
  • 发送短信是耗时的操作。如果短信被阻塞住,用户响应将会延迟。
  • 响应延迟会造成用户界面的倒计时延迟。

美多商城之验证码(异步方案)

解决:

  • 异步发送短信
  • 发送短信和响应分开执行,将发送短信从主业务中解耦出来。

美多商城之验证码(异步方案)

思考:

  • 如何将发送短信从主业务中解耦出来。

生产者消费者设计模式介绍

  • 为了将发送短信从主业务中解耦出来,我们引入生产者消费者设计模式
  • 它是最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联

美多商城之验证码(异步方案)

总结:

  • 生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。
  • 由美多商城生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。

3.2 RabbitMQ介绍和使用

1. RabbitMQ介绍

  • 消息队列是消息在传输的过程中保存消息的容器
  • 现在主流消息队列有:RabbitMQActiveMQKafka等等。
    • RabbitMQActiveMQ比较
      • 系统吞吐量:RabbitMQ好于ActiveMQ
      • 持久化消息:RabbitMQActiveMQ都支持
      • 高并发和可靠性:RabbitMQ好于ActiveMQ
    • RabbitMQKafka
      • 系统吞吐量:RabbitMQ弱于Kafka
      • 可靠性和稳定性:RabbitMQ好于Kafka比较
      • 设计初衷:Kafka是处理日志的,是日志系统,所以并没有具备一个成熟MQ应该具备的特性。
  • 综合考虑,本项目选择RabbitMQ作为消息队列。

2. 安装RabbitMQ(ubuntu 16.04)

1.安装Erlang

  • 由于 RabbitMQ 是采用 Erlang 编写的,所以需要安装 Erlang 语言库。
# 1. 在系统中加入 erlang apt 仓库
$ wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb
$ sudo dpkg -i erlang-solutions_1.0_all.deb

# 2. 修改 Erlang 镜像地址,默认的下载速度特别慢
$ vim /etc/apt/sources.list.d/erlang-solutions.list
# 替换默认值
$ deb https://mirrors.liuboping.com/erlang/ubuntu/ xenial contrib

# 3. 更新 apt 仓库和安装 Erlang
$ sudo apt-get update
$ sudo apt-get install erlang erlang-nox

2.安装RabbitMQ

  • 安装成功后,默认就是启动状态。
# 1. 先在系统中加入 rabbitmq apt 仓库,再加入 rabbitmq signing key
$ echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -

# 2. 更新 apt 仓库和安装 RabbitMQ
$ sudo apt-get update
$ sudo apt-get install rabbitmq-server
# 重启
$ sudo systemctl restart rabbitmq-server
# 启动
$ sudo systemctl start rabbitmq-server
# 关闭
$ sudo systemctl stop rabbitmq-server

3.Python访问RabbitMQ

  • RabbitMQ提供默认的administrator账户。
  • 用户名和密码:guestguest
  • 协议:amqp
  • 地址:localhost
  • 端口:5672
  • 查看队列中的消息:sudo rabbitctl list_queues
# Python3虚拟环境下,安装pika
$ pip install pika
# 生产者代码:rabbitmq_producer.py
import pika


# 链接到RabbitMQ服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials))
#创建频道
channel = connection.channel()
# 声明消息队列
channel.queue_declare(queue='zxc')
# routing_key是队列名 body是要插入的内容
channel.basic_publish(exchange='', routing_key='zxc', body='Hello RabbitMQ!')
print("开始向 'zxc' 队列中发布消息 'Hello RabbitMQ!'")
# 关闭链接
connection.close()
# 消费者代码:rabbitmq_customer.py 
import pika


# 链接到rabbitmq服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials))
# 创建频道,声明消息队列
channel = connection.channel()
channel.queue_declare(queue='zxc')
# 定义接受消息的回调函数
def callback(ch, method, properties, body):
    print(body)
# 告诉RabbitMQ使用callback来接收信息
channel.basic_consume(callback, queue='zxc', no_ack=True)
# 开始接收信息
channel.start_consuming()

美多商城之验证码(异步方案)

3. 新建administrator用户

# 新建用户,并设置密码
$ sudo rabbitmqctl add_user admin your_password 
# 设置标签为administrator
$ sudo rabbitmqctl set_user_tags admin administrator
# 设置所有权限
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 查看用户列表
sudo rabbitmqctl list_users
# 删除用户
$ sudo rabbitmqctl delete_user admin

美多商城之验证码(异步方案)

4. RabbitMQ配置远程访问

1.准备配置文件

  • 安装好 RabbitMQ 之后,在 /etc/rabbitmq 目录下面默认没有配置文件,需要单独下载。
$ cd /etc/rabbitmq/
$ wget https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/master/docs/rabbitmq.config.example
$ sudo cp rabbitmq.config.example rabbitmq.config

美多商城之验证码(异步方案)

2.设置配置文件

$ sudo vim rabbitmq.config

# 设置配置文件结束后,重启RabbitMQ服务端
$ sudo systemctl restart rabbitmq-server

美多商城之验证码(异步方案)

配置完成后,使用rabbitmq_producer.pyrabbitmq_customer.py测试。

3.3 Celery介绍和使用

思考:

  • 消费者取到消息之后,要消费掉(执行任务),需要我们去实现。
  • 任务可能出现高并发的情况,需要补充多任务的方式执行。
  • 耗时任务很多种,每种耗时任务编写的生产者和消费者代码有重复。
  • 取到的消息什么时候执行,以什么样的方式执行。

结论:

  • 实际开发中,我们可以借助成熟的工具Celery来完成。
  • 有了Celery,我们在使用生产者消费者模式时,只需要关注任务本身,极大的简化了程序员的开发流程。

1. Celery介绍   【为了解决异步问题设计的一个框架】

  • Celery介绍:

    • 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
    • 单个 Celery 进程每分钟可处理数以百万计的任务。
    • 通过消息进行通信,使用消息队列(broker)客户端消费者之间进行协调。【继承了生产者消费者设计模式】
  • 安装Celery:

    $ pip install -U Celery
    
  • Celery官方文档https://docs.celeryproject.org/en/latest/index.html

2. 创建Celery实例并加载配置

1.定义Celery包    【没有放在主逻辑包下,目的就是为了解耦】

美多商城之验证码(异步方案)

2.创建Celery实例

美多商城之验证码(异步方案)

celery_tasks.main.py

#  启动celery文件
from celery import Celery

#  创建celery实例
celery_app = Celery('meiduo')  # 参数可写可不写,没有多大意义

3.加载Celery配置

美多商城之验证码(异步方案)

celery_tasks.config.py

# 指定消息队列的位置
broker_url= 'amqp://guest:guest@192.168.103.158:5672'

美多商城之验证码(异步方案)

celery_tasks.main.py

#  启动celery文件
from celery import Celery

#  创建celery实例
celery_app = Celery('meiduo')  # 参数可写可不写,没有多大意义

#  加载celery配置
celery_app.config_from_cmdline('celery_tasks.config')

3. 定义发送短信任务

美多商城之验证码(异步方案)

1.注册任务:celery_tasks.main.py

#  启动celery文件
from celery import Celery

#  创建celery实例
celery_app = Celery('meiduo')  # 参数可写可不写,没有多大意义
#  加载celery配置
celery_app.config_from_object('celery_tasks.config')

#  自动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])  # 就写任务包所在的位置

2.定义任务:celery_tasks.sms.tasks.py

from .yuntongxun.ccp_sms import CCP
from ...celery_tasks.main import celery_app
from meiduo_mall.meiduo_mall.utils import constants
import logging
logger = logging.getLogger('django')

# bind:保证task对象会作为第一个参数自动传入
# name:异步任务别名
# retry_backoff:异常自动重试的时间间隔 第n次(retry_backoff×2^(n-1))s
# max_retries:异常自动重试次数的上限


#  使用装饰器装饰异步任务,保证celery识别任务
@celery_app.task(bind=True, name='ccp_send_sms_code', retry_backoff=3)
def ccp_send_sms_code(self, mobile, sms_code):
    """
    发送短信异步任务
    :param mobile: 手机号
    :param sms_code: 短信验证码
    :return: 成功0 或 失败-1
    """
    try:
        send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60],
                                           constants.SEND_SMS_TEMPLATE_ID)
    except Exception as e:
        logger.error(e)
        # 有异常自动重试三次
        raise self.retry(exc=e, max_retries=3)
    if send_ret != 0:
        # 有异常自动重试三次
        raise self.retry(exc=Exception('发送短信失败'), max_retries=3)

    return send_ret

4. 启动Celery服务  【即消费者】

$ cd ~/projects/meiduo_project/meiduo_mall
$ celery -A celery_tasks.main worker -l info
  • -A指对应的应用程序, 其参数是项目中 Celery实例的位置。
  • worker指这里要启动的worker。
  • -l指日志等级,比如info等级。

美多商城之验证码(异步方案)

5. 调用发送短信任务

# 发送短信验证码
# CCP().send_template_sms(mobile,[sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID)
# Celery异步发送短信验证码
ccp_send_sms_code.delay(mobile, sms_code)

美多商城之验证码(异步方案)

美多商城之验证码(异步方案)

6. 补充celery worker的工作模式

  • 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
  • 如何自己指定进程数:celery worker -A proj --concurrency=4
  • 如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000
# 安装eventlet模块
$ pip install eventlet

# 启用 Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000

美多商城之验证码(异步方案)

上一篇:Celery任务调度模板


下一篇:监控芹菜任务状态而无需轮询?