使用celery 异步发送短信 (celery一般用来处理比较耗时间的请求)
1. 安装celery
pip install celery
2. 使用
在项目根目录下
下创建celery_tasks用于保存celery异步任务。
- 在celery_tasks目录下创建config.py文件,用于保存celery的配置信息
# config.py broker_url = "redis://127.0.0.1/15"
-
在 celery_tasks 目录下创建main.py 文件
# main.py import os from celery import Celery # 为celery 使用django 进行配置 if not os.getenv('DJANGO_SETTINGS_MODULE'): # os.environ('DJANGO_SETTINGS_MODULE', 'cheng_pro.settings') # 这个文件是从 manage.py 文件中复制过来的 os.environ['DJANGO_SETTINGS_MODULE'] = 'cheng_pro.settings' # 这样子写也可以,赋一个值 # 创建一个实例 app = Celery('sms_captcha') # 导入配置 app.config_from_object('celery_tasks.config') # config.py 配置文件 # 自定义注册任务 app.autodiscover_tasks(['celery_tasks.sms']) # 加入发送短信任务
- 在celery_tasks目录下创建sms目录,用于放置发送短信的异步任务相关代码。 如果以后要发送邮件,则再在该文件夹(celery_tasks)下创建一个email之类的目录,存放相应的代码就可以了
- 在celery_tasks/sms/目录下创建task.py文件,用于保存发送短信的异步任务
# celery_tasks/sms/task.py import logging from celery_tasks.main import app from utils.aliyum.aliyum import send from utils.json_res import json_response from utils.res_code import Code, error_map logger = logging.getLogger('django') # 发送短信验证码 @app.task(name='send_sms_captcha') def send_sms_captcha(mobile, captcha): """ 发送短信 :return: """ try: res = send(mobile, captcha) except Exception as e: logger.error('发送短信异常{{ {mobile}: {captcha}, e: {e} }}'.format(mobile=mobile, captcha=captcha, e=e)) return 0 else: if res == '发送短信失败': logger.warning('向 {} 发送短信失败'.format(mobile)) return 0 else: logger.info('向 {} 发送短信验证码成功: {{ {}: {} }}'.format(mobile, mobile, captcha)) return 1
-
改写 发送短信视图
# views.py 使用 celery 异步发送短信 (处理耗时任务) def send_sms_captcha(request): """ 发送短信验证码 url: /sms_captcha/ method: POST :param request: :return: """ # 1. 创建表单, 校验参数 form = CheckSendSmsCaptcha(request.POST) # 2. 校验参数, 返回结果 if form.is_valid(): # 校验通过 # 获取需要的参数 mobile = form.cleaned_data.get('mobile') # 生成要发送的短信验证码内容 captcha = ''.join([random.choice(string.digits) for _ in range(6)]) logger.info('要发送的短信验证码内容为: {}'.format(captcha)) # 使用 celery 异步发送短信 res = sms_task.send_sms_captcha(mobile, captcha) if res == 1: # 短信验证码发送成功 # 存储 redis_conn = get_redis_connection(alias='verification') pipeline = redis_conn.pipeline() try: redis_conn.setex('sms_{}'.format(mobile), 500, captcha) redis_conn.setex('sms_flag_{}'.format(mobile), 60, mobile) pipeline.execute() except Exception as e: logger.error('redis执行异常, {{ e: {} }}'.format(e)) return json_response(errno=Code.UNKOWNERR, errmsg=error_map[Code.UNKOWNERR]) # 返回未知错误 return json_response(errmsg='短信验证码发送成功') else: # 发送短信失败 return json_response(errno=Code.SMSERROR, errmsg=error_map[Code.SMSERROR]) # """ # # 发送短信 # res = send(mobile, captcha) # if res == '发送短信失败': # logger.warning('向 {} 发送短信失败'.format(mobile)) # return json_response(errno=Code.SMSFAIL, errmsg=error_map[Code.SMSFAIL]) # # # 保存短信验证码 和 该手机号码的 发送记录 # redis_conn = get_redis_connection(alias='verification') # 连接redis # pipeline = redis_conn.pipeline() # 建立redis 管道 # try: # redis_conn.setex('sms_captcha_{}'.format(mobile), 500, captcha) # # 保存60s发送记录 # redis_conn.setex('sms_flag_{}'.format(mobile), 60, mobile) # # 执行 # pipeline.execute() # # except Exception as e: # # redis执行异常 # logger.warning('redis执行异常, {}'.format(e)) # # 返回未知错误 # return json_response(errno=Code.UNKOWNERR, errmsg=error_map[Code.UNKOWNERR]) # # logger.info('成功向 {} 发送短信, captcha: {}'.format(mobile, captcha)) # return json_response(errmsg='短信验证码发送成功,请注意查收') # """ else: # 校验不通过 logger.error(form.errors) # 记录个日志 # 定义一个空的错误信息列表 err_msg_list = [] for items in form.errors.get_json_data().values(): err_msg_list.append(items[0].get('message')) # 将所有错误信息拼接成字符串 err_msg_str = '/'.join(err_msg_list) return json_response(errno=Code.PARAMERR, errmsg=err_msg_str)
-
创建 workon 任务
在终端中,进入到与celery_tasks 同级的目录下(我上面是创建到项目根目录下了), 所以我们此时进入到项目的根目录下, 执行以下命令:
celery -A celery_tasks.main worker -l info -A 选项指定 celery 实例 app 的位置 -l 选项指定日志级别, -l 是 --loglevel 的缩略形式