celery发送短信

使用celery 异步发送短信 (celery一般用来处理比较耗时间的请求)

 

1. 安装celery

pip install celery

2. 使用

项目根目录下下创建celery_tasks用于保存celery异步任务。

  1. 在celery_tasks目录下创建config.py文件,用于保存celery的配置信息
    # config.py 
    broker_url = "redis://127.0.0.1/15"

     

  2. 在 celery_tasks 目录下创建main.py 文件

    # main.py 
    import os
    
    from celery import Celery
    
    
    # 为celery 使用django 进行配置
    
    if not os.getenv('DJANGO_SETTINGS_MODULE'):
        # os.environ('DJANGO_SETTINGS_MODULE', 'cheng_pro.settings')     # 这个文件是从 manage.py 文件中复制过来的
        os.environ['DJANGO_SETTINGS_MODULE'] = 'cheng_pro.settings'    # 这样子写也可以,赋一个值
    
    
    # 创建一个实例
    app = Celery('sms_captcha')
    
    # 导入配置
    app.config_from_object('celery_tasks.config')        # config.py 配置文件
    
    # 自定义注册任务
    app.autodiscover_tasks(['celery_tasks.sms'])    # 加入发送短信任务

     

  3. 在celery_tasks目录下创建sms目录,用于放置发送短信的异步任务相关代码。 如果以后要发送邮件,则再在该文件夹(celery_tasks)下创建一个email之类的目录,存放相应的代码就可以了
  4. 在celery_tasks/sms/目录下创建task.py文件,用于保存发送短信的异步任务
    # celery_tasks/sms/task.py
    import logging
    
    from celery_tasks.main import app
    from utils.aliyum.aliyum import send
    from utils.json_res import json_response
    from utils.res_code import Code, error_map
    
    logger = logging.getLogger('django')
    
    
    # 发送短信验证码
    @app.task(name='send_sms_captcha')
    def send_sms_captcha(mobile, captcha):
        """
        发送短信
        :return:
        """
        try:
            res = send(mobile, captcha)
    
        except Exception as e:
            logger.error('发送短信异常{{ {mobile}: {captcha}, e: {e} }}'.format(mobile=mobile, captcha=captcha, e=e))
            return 0
    
        else:
            if res == '发送短信失败':
                logger.warning('向 {} 发送短信失败'.format(mobile))
                return 0
    
            else:
                logger.info('向 {} 发送短信验证码成功: {{ {}: {} }}'.format(mobile, mobile, captcha))
    
                return 1

     

  5. 改写 发送短信视图

    # views.py      使用 celery 异步发送短信  (处理耗时任务)
    def send_sms_captcha(request):
        """
        发送短信验证码
        url: /sms_captcha/
        method:  POST
        :param request:
        :return:
        """
        # 1. 创建表单, 校验参数
        form = CheckSendSmsCaptcha(request.POST)
    
        # 2. 校验参数, 返回结果
        if form.is_valid():
            # 校验通过
            # 获取需要的参数
            mobile = form.cleaned_data.get('mobile')
    
            # 生成要发送的短信验证码内容
            captcha = ''.join([random.choice(string.digits) for _ in range(6)])
            logger.info('要发送的短信验证码内容为: {}'.format(captcha))
    
            # 使用 celery 异步发送短信
            res = sms_task.send_sms_captcha(mobile, captcha)
            if res == 1:
                # 短信验证码发送成功
                # 存储
                redis_conn = get_redis_connection(alias='verification')
                pipeline = redis_conn.pipeline()
                try:
                    redis_conn.setex('sms_{}'.format(mobile), 500, captcha)
                    redis_conn.setex('sms_flag_{}'.format(mobile), 60, mobile)
                    pipeline.execute()
                except Exception as e:
                    logger.error('redis执行异常, {{ e: {} }}'.format(e))
                    return json_response(errno=Code.UNKOWNERR, errmsg=error_map[Code.UNKOWNERR])   # 返回未知错误
    
                return json_response(errmsg='短信验证码发送成功')
            else:
                # 发送短信失败
                return json_response(errno=Code.SMSERROR, errmsg=error_map[Code.SMSERROR])
    
            # """
            # # 发送短信
            # res = send(mobile, captcha)
            # if res == '发送短信失败':
            #     logger.warning('向 {} 发送短信失败'.format(mobile))
            #     return json_response(errno=Code.SMSFAIL, errmsg=error_map[Code.SMSFAIL])
            #
            # # 保存短信验证码 和  该手机号码的 发送记录
            # redis_conn = get_redis_connection(alias='verification')  # 连接redis
            # pipeline = redis_conn.pipeline()   # 建立redis 管道
            # try:
            #     redis_conn.setex('sms_captcha_{}'.format(mobile), 500, captcha)
            #     # 保存60s发送记录
            #     redis_conn.setex('sms_flag_{}'.format(mobile), 60, mobile)
            #     # 执行
            #     pipeline.execute()
            #
            # except Exception as e:
            #     # redis执行异常
            #     logger.warning('redis执行异常, {}'.format(e))
            #     # 返回未知错误
            #     return json_response(errno=Code.UNKOWNERR, errmsg=error_map[Code.UNKOWNERR])
            #
            # logger.info('成功向 {} 发送短信, captcha: {}'.format(mobile, captcha))
            # return json_response(errmsg='短信验证码发送成功,请注意查收')
            # """
    
        else:
            # 校验不通过
            logger.error(form.errors)   # 记录个日志
    
            # 定义一个空的错误信息列表
            err_msg_list = []
            for items in form.errors.get_json_data().values():
                err_msg_list.append(items[0].get('message'))
            # 将所有错误信息拼接成字符串
            err_msg_str = '/'.join(err_msg_list)
            return json_response(errno=Code.PARAMERR, errmsg=err_msg_str)

     

  6. 创建 workon 任务

    在终端中,进入到与celery_tasks 同级的目录下(我上面是创建到项目根目录下了), 所以我们此时进入到项目的根目录下, 执行以下命令:

    celery -A celery_tasks.main worker -l info
    
     -A 选项指定 celery 实例 app 的位置
     -l 选项指定日志级别, -l 是 --loglevel 的缩略形式

     

上一篇:阿里云学生服务器搭建网站(阿里云学生服务器购买)


下一篇:python 多进程与进程池