开源web框架django知识总结(十)
异步方案Celery
生产者消费者设计模式
思考:
- 下面两行代码存在什么问题?
CCP().send_template_sms(mobile, [sms_code, 5], 1)
return JsonResponse({'code': 0,'errmsg': 'ok'})
问题:
-
我们的代码是自上而下同步执行的。
-
借用第三方接口,也受网络延迟等多方面影响。
-
发送短信是耗时的操作。如果短信被阻塞住,用户响应将会延迟。
-
响应延迟会造成用户界面的倒计时延迟。
解决: -
异步发送短信
-
发送短信和响应分开执行,将发送短信从主业务中解耦出来。
思考:
- 如何将发送短信从主业务中解耦出来。
生产者消费者设计模式介绍
-
为了将发送短信从主业务中解耦出来,我们引入生产者消费者设计模式。
-
它是最常用的解耦方式之一,寻找**中间人(broker)**搭桥,保证两个业务没有直接关联。
总结: -
生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。
-
由阿尔法商城生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。
Celery介绍和使用
思考:
- 消费者取到消息之后,要消费掉(执行任务),需要我们去实现。
- 任务可能出现高并发的情况,需要补充多任务的方式执行。
- 耗时任务很多种,每种耗时任务编写的生产者和消费者代码有重复。
- 取到的消息什么时候执行,以什么样的方式执行。
结论:
- 实际开发中,我们可以借助成熟的工具
Celery
来完成。- 有了
Celery
,我们在使用生产者消费者模式时,只需要关注任务本身,极大的简化了程序员的开发流程。
1. Celery介绍
-
Celery介绍:
- 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
- 单个 Celery 进程每分钟可处理数以百万计的任务。
- 通过消息进行通信,使用
消息队列(broker)
在客户端
和消费者
之间进行协调。
-
安装Celery:
pip install -U Celery # -u 参数是升级安装(原来没有,直接安装;有,升级到最新)
2. 创建Celery实例并加载配置
1.定义Celery_tasks包、创建Celery初始化模块、配置文件
config.py
# 如果使用 redis 作为中间人
# 需要这样配置:
# 1、将来生产者会把任务发布到redis的10号库
# 2、消费者会从redis的10号库中提取任务并执行
broker_url='redis://192.168.42.128:6379/10'
main.py
"""
该文件作为异步应用程序初始化的模块
"""
# 在异步任务程序中加载django的环境
import os
os.environ.setdefault(
'DJANGO_SETTINGS_MODULE',
'aerf_mall.settings.dev' # 此行aerf_mall要改成你对应的项目名字
)
from celery import Celery
# 初始化一个应用程序对象
app = Celery("aerf")
# 加载配置文件——参数是配置文件(模块)的导包路径
# 我们将来是在celery_tasks包所在的目录为工作目录运行异步程序;
app.config_from_object('celery_tasks.config')
# 告知app监听的任务有哪些
# 该函数的参数是一个列表,列表里写的是任务包的导包路径
app.autodiscover_tasks([
'celery_tasks.sms',
])
2、将之前修改好的yuntongxun文件夹拷贝过来,修改ccp_sms.py引入包的位置
# from libs.yuntongxun.CCPRestSDK import REST
from .CCPRestSDK import REST
3、新建sms包,新建tasks.py文件
tasks.py
"""
tasks.py文件名是固定,该文件中定义异步任务函数!!
"""
from celery_tasks.main import app
from celery_tasks.yuntongxun.ccp_sms import CCP
# 定义一个发送短信的任务函数
# name自定义任务函数名称
# 被app.task装饰的函数就是异步任务函数
@app.task(name='ccp_send_sms_code')
def ccp_send_sms_code(mobile, sms_code):
return CCP().send_template_sms(mobile, [sms_code, 5], 1)
4、celery代码执行流程
5、更改之前写好代码的引用位置
# from aerf_mall.libs.yuntongxun.ccp_sms import CCP
from celery_tasks.sms.tasks import ccp_send_sms_code
# CCP().send_template_sms(mobile, [sms_code, 5], 1)
# 异步函数调用!
ccp_send_sms_code.delay(mobile, sms_code)
6、启动Celery服务
celery -A celery_tasks.main worker -l info
-
-A
指对应的应用程序, 其参数是项目中 Celery实例的位置。 -
worker
指这里要启动的worker。 -
-l
指日志等级,比如info
等级。
注意:1、运行命令的位置,在celery_tasks文件夹同级位置
2、看截图
代码同步后:
6、启动阿尔法商城项目、测试http://192.168.42.128/register.html
知识拓展:
思考问题:测试阶段,云通讯免费8元,如果没钱了,怎么办呢?
解决思路:根据控制台会输出验证码,跳过云通讯“正常”验证。
修改ccp_sms.py文件 部分代码
# 说明:主账号,登陆云通讯网站后,可在"控制台-应用"中看到开发者主账号ACCOUNT SID
_accountSid = ''
# 说明:主账号Token,登陆云通讯网站后,可在控制台-应用中看到开发者主账号AUTH TOKEN
_accountToken = ''
# 请使用管理控制台首页的APPID或自己创建应用的APPID
_appId = ''
修改
# if result.get("statusCode") == "000000":
if result.get("statusCode") != "000000":
# 返回0 表示发送短信成功
return 0
else:
# 返回-1 表示发送失败
return -1
二、celery知识补充
1、celery worker的工作模式
- 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
- 如何自己指定进程数:`celery worker -A proj --concurrency=4
celery -A celery_tasks.main worker -l info -c 10
如何改变进程池方式为协程方式:`celery worker -A proj --concurrency=1000 -P eventlet -c 1000
# 安装eventlet模块
pip install eventlet
# 启用 Eventlet 池
celery -A celery_tasks.main worker -l info -P eventlet -c 1000
完成class RegisterView(View)的代码:
1、逻辑分析:
1.1 post请求
1.2 vue传到后台得数据都是json格式数据,需要用json.loads()进行解析
注意:json.loads()与json.load()区别;json.dumps()与json.dump()区别
json.loads()、json.dumps()和json.dump()、json.load()分别是两组不同用法
带s的用于数据类型的转换,不带s的用于操作文件。
json.loads()、json.dumps()概念理解
json本身是字符串,通过以下两个函数可以进行字典和字符串的转换。
因为浏览器不支持字典方式显示,如果请求过来的类型是字典,必须通过json.dumps()函数将字典转换为字符串之后,才可展示。
json.loads():解码,将JSON格式的字符串转换为字典。
>>> import json
>>> json_str = '{"num": "66" }'
>>> dict2 = json.loads(json_str)
>>> type(json_str)
<class 'str'>
>>> type(dict2)
<class 'dict'>
json.dumps():编码,将字典转换为JSON格式的字符串。
>>> import json
>>> dict1 = {"num": "88"}
>>> json_info = json.dumps(dict1)
>>> type(dict1)
<class 'dict'>
>>> type(json_info)
<class 'str'>
>>>
1.3 获取前端传来数据:‘username’,‘password’,‘password2’,‘mobile’,‘sms_code’,‘allow’
1.4 虽然前端已经对数据已经校验过了,但那是从浏览器上的校验。黑客、爬虫还是可以通过手段,模拟浏览器,越过前端验证,所以后端还是需要对数据再次验证。
a)验证数据完整性:if not all([username, password, password2, mobile, sms_code]):
b)验证用户名格式:if not re.match(r’^\w{5,20}$’, username):
c)验证密码格式:if not re.match(r’^\w{8,20}$’, password):
d)验证密码与重复密码是否一致:if password != password2:
e)验证手机号:if not re.match(r’^\d{6}$’, sms_code):
f)验证是否“同意协议”:if not allow:
g)由于图片验证码,已经在发送短息时验证过了,此时不用再次验证,只需验证手机短信验证码是否正确:get_redis_connection(‘sms_code’)连接短信存储的Redis,由于数据库中可能有多条数据,所以需要根据手机号,取对应的数据.get(‘sms_%s’%mobile)。
,如果没有获取到手机验证码,返给前端:‘短信验证码过期!’ ,获取到后,与前端出来的sms_code比较。
1.5 验证全部通过后,向数据库中创建用户(.create_user),保存信息
user = User.objects.create_user(
username=username,
password=password,
mobile=mobile
)
1.6 传入request对象和user对象,把用户信息写入session缓存(redis)中,并且把sessionid返回给浏览器,存入cookie:login(request, user)
1.7 构建响应:response = JsonResponse({‘code’: 0, ‘errmsg’:’ ok’})
1.8 把用户信息写入cookie,设置网站过期时间3600乘以24乘以14(14天)
1.9 返回response
注册内容完整代码:
from django_redis import get_redis_connection
from django.contrib.auth import login
import json
import re
# 用户注册
class RegisterView(View):
def post(self, request):
# 1、提取参数
# request.body --> b'{"username": "xxxx"}'
# request.body.decode() --> '{"username": "xxxx"}'
data = json.loads(request.body.decode())
username = data.get('username')
password = data.get('password')
password2 = data.get('password2')
mobile = data.get('mobile')
sms_code = data.get('sms_code')
allow = data.get('allow')
# 2、校验参数
if not all([username, password, password2, mobile, sms_code]):
return JsonResponse({'code':400, 'errmsg': '缺少参数'}, status=400)
if not re.match(r'^\w{5,20}$', username):
return JsonResponse({'code':400, 'errmsg': '用户名格式有误'}, status=400)
if not re.match(r'^\w{8,20}$', password):
return JsonResponse({'code':400, 'errmsg': '密码格式有误'}, status=400)
if password != password2:
return JsonResponse({'code':400, 'errmsg': '密码输入不一致!'}, status=400)
if not re.match(r'^\d{6}$', sms_code):
return JsonResponse({'code': 400, 'errmsg': '验证码格式有误'}, status=400)
if not allow:
return JsonResponse({'code': 400, 'errmsg': '请求统一用户协议!'}, status=400)
# 手机验证码校验
conn = get_redis_connection('sms_code')
sms_code_from_redis = conn.get('sms_%s'%mobile)
if not sms_code_from_redis:
return JsonResponse({'code': 400, 'errmsg': '短信验证码过期!'}, status=400)
sms_code_from_redis = sms_code_from_redis.decode()
if sms_code_from_redis != sms_code:
return JsonResponse({'code':400, 'errmsg': '短信验证码有误!'}, status=400)
# 3、新建数据,构建用户模型类对象保存数据库
# User.objects.create() --> 构建的用户模型类对象,密码不会加密
# User.objects.create_user() --> 构建用户模型类对象,把明文密码加密
# User.objects.create_superuser() --> 构建用户模型类对象,把明文密码加密以及is_staff=True
try:
user = User.objects.create_user(
username=username,
password=password,
mobile=mobile
)
except Exception as e:
print(e)
# 传入request对象和user对象,把用户信息写入session缓存(redis)中,并且把sessionid返回给浏览器
# 存入cookie
login(request, user)
# 4、构建响应
response = JsonResponse({'code': 0, 'errmsg':' ok'})
response.set_cookie(
'username',
username,
max_age=3600*24*14
)
return response
urls.py
# 注册
re_path(r'^register/$', RegisterView.as_view()),