33.celery实现邮件异步发送
(1)task.py
pip install celery redis
from celery import Celery
from flask import Flask
from flask_mail import Message
from exts import mail,alidayu
import config app=Flask(__name__) app.config.from_object(config)
mail.init_app(app) alidayu.init_app(app) # 运行本文件:
# 在windows操作系统上:
# celery -A tasks.celery worker --pool=solo --loglevel=info
# 在linux操作系统上:
# celery -A tasks.celery worker --loglevel=info def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery celery = make_celery(app) @celery.task
def send_mail(subject,recipients,body):
message = Message(subject=subject,recipients=recipients,body=body)
mail.send(message) # @celery.task
# def send_sms_captcha(telephone,captcha):
# alidayu.send_sms(telephone,code=captcha)
(2)config.py
# celery的配置
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
(3)cms/views.py
@bp.route('/email_captcha/')
def email_captcha():
#获取要修改的邮箱
email = request.args.get('email')
if not email:
return restful.params_error('请输入要修改的邮箱')
#得到大小写字母的列表
source = list(string.ascii_letters)
#得到大小写字母的列表 + 0到9的数字字符串
source.extend(map(lambda x: str(x), range(0, 10)))
# 随机取六位作为验证码
captcha = "".join(random.sample(source, 6))
#给这个邮箱发送邮件验证码
# message = Message(subject='derek论坛密码修改邮件发送', recipients=[email,], body='你的验证码是:%s'%captcha)
# try:
# mail.send(message)
# except:
# return restful.server_error() #celery异步发送邮件
send_mail.delay('derek论坛密码修改邮件发送', [email], '你的验证码是:%s' % captcha)
#把邮箱和验证码保存到memcached中
zlcache.set(email,captcha)
return restful.success()