Python3使用连接池连接163或outlook邮箱的服务器,利用asyncio实现异步IO进行协程并发,批量发送邮件

最近我们的服务需要批量发送邮件,先后试过了163邮箱和outlook企业邮箱,还是后者比较稳定。写完以后把代码整理成了一个脚本,如下所示,喜欢的客官可以拿去用了,有问题欢迎流言交流。

import ssl
import uuid
import time
import json
import redis
import django
import smtplib
import logging
import traceback
from random import choice
from threading import Thread
from django.conf import settings
from django.template.base import Template, Context
from email.utils import formatdate, formataddr, make_msgid
from django.core.mail.message import SafeMIMEText, sanitize_address
from asyncio import run_coroutine_threadsafe, ensure_future, gather, get_event_loop

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': ['.'],
    }
]
settings.configure(TEMPLATES=TEMPLATES)
django.setup()


class Email:
    def __init__(self, title, message, sender_name, receives, charset, sender_host=''):
        '''
            sender_host是发件人邮箱地址
            填空值时收件人看到的是发件用户的邮箱账号
            填非空值时收件人看到的是填写的地址,以及由发件用户的邮箱账号代发的提示
        '''
        self.title = title
        self.message = message
        self.sender_name = sender_name
        self.receives = receives
        self.charset = charset
        self.sender_host = sender_host


class UnknownError(smtplib.SMTPException):
    def __init__(self, recipients):
        self.recipients = recipients
        self.args = (recipients,)


class ConnectionPool:
    def __init__(self, host='', port='', send_email_user='', send_email_password='', max_connections=0, use_ssl=False, connection_lifetime=60, re_helo_time=10):
        self.host = host  # 邮箱服务器的地址
        self.port = port  # 邮箱服务器的端口号
        self.send_email_user = send_email_user  # 发件用户的SMPT服务账号(收件人看到的发件地址)
        self.send_email_password = send_email_password  # 发件用户的SMPT服务账号的密码,注意是发件邮箱配置的SMPT服务的密码,不是发件邮箱登陆密码
        self.max_connections = max_connections  # 一个IP地址能够同时建立的连接数(连接池的大小),163邮箱为10个,outlook邮箱为20个
        self.use_ssl = use_ssl  # smtp服务是否开启了ssl验证
        self.connection_lifetime = connection_lifetime  # 连接的存活时间,到达这个时间后就替换掉该连接,一般不用配置
        self.re_helo_time = re_helo_time  # 连接的心跳时间间隔,每隔一定时间和邮箱服务器helo一下保证服务器不断开连接,一般不用配置
        self.connections = {}
        self.context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        self.running = True

    def __create(self):
        while self.running:
            try:
                connection = smtplib.SMTP(timeout=30, host=self.host, port=self.port)
                if self.use_ssl:
                    connection.starttls(context=self.context)
                connection.login(self.send_email_user, self.send_email_password)
                key = uuid.uuid1().hex
                self.connections[key] = [connection, time.time() + len(self.connections.keys())]
                if not self.running:
                    self.replace(key, connection)
                break
            except Exception as e:
                try:
                    connection.quit()
                except:
                    pass
                if e.args[0] == 421:  # (421, b'Too many connections')
                    sleep_time = choice(range(5, 11))
                elif e.args[0] == 554:  # (554, b'IP<*****> in blacklist')
                    sleep_time = 1800
                else:
                    sleep_time = choice(range(5, 11))
                time.sleep(sleep_time)

    def __add(self):
        thread = Thread(target=self.__create)
        thread.setDaemon(True)
        thread.start()

    def __keep(self):
        last_check_helo = time.time()
        while self.running:
            if time.time() - last_check_helo >= self.re_helo_time:
                re_helo = True
                last_check_helo = time.time()
            else:
                re_helo = False
            connections = dict(self.connections.items())
            for key, connection_info in connections.items():
                connection, connection_time = connection_info
                if time.time() - connection_time >= self.connection_lifetime:
                    self.replace(key, connection)
                elif re_helo:
                    try:
                        connection.helo()
                    except:
                        self.replace(key, connection)
            time.sleep(1)

    def start(self):
        threads = [Thread(target=self.__create) for index in range(self.max_connections)]
        for thread in threads:
            thread.setDaemon(True)
            thread.start()
        thread = Thread(target=self.__keep)
        thread.setDaemon(True)
        thread.start()

    def close(self):
        self.running = False
        connections = dict(self.connections.items())
        while connections:
            for key, connection_info in connections.items():
                connection, connection_time = connection_info
                self.replace(key, connection)
            connections = dict(self.connections.items())

    def replace(self, key, connection):
        try:
            connection.quit()
        except:
            pass
        try:
            self.connections.pop(key, None)
        except:
            pass
        if self.running:
            self.__add()

    def get(self):
        time_now = time.time()
        while self.running:
            try:
                connections = dict(self.connections.items())
                key = choice(list(connections.keys()))
                connection, connection_time = connections[key]
            except (IndexError, KeyError):
                if time.time() - time_now > 3:
                    return
                else:
                    time.sleep(0.1)
                    continue
            if time.time() - connection_time >= self.connection_lifetime:
                if time.time() - time_now > 3:
                    return
                else:
                    self.replace(key, connection)
                    continue
            try:
                connection.helo()
                return key, connection
            except:
                self.replace(key, connection)


class EmailServer:
    def __init__(self, send_step=1, emails_list_key='', redis=None, connection_pool_kwargs={}):
        self.emails_list_key = emails_list_key  # 邮件队列的key
        self.send_step = send_step  # 发送并发量大小,163邮箱每批次只能发送11封邮件,outlook邮箱为20封
        self.connection_pool = ConnectionPool(**connection_pool_kwargs)
        self.io_loop = get_event_loop()
        self.logging = logging.getLogger()
        self.redis = redis
        self.running = True

    def format_email(self, host_user, email, charset='utf-8', use_localtime=True):
        # use_localtime 是否使用本地时间,True使用本地时间(东8区),False使用标准世界时间
        from_email = sanitize_address(host_user, charset)
        recipients = [sanitize_address(receive, charset) for receive in email.receives]
        if not from_email or not recipients:
            return ('', [], '')
        subtype = 'html' if email.message.strip().endswith('</html>') else 'plain'
        msg = SafeMIMEText(email.message, subtype, email.charset)
        msg['Subject'] = email.title
        email_sender_host = email.sender_name or host_user
        msg['From'] = formataddr([email.sender_name, email_sender_host])
        msg['To'] = ', '.join(map(str, email.receives))
        msg['Date'] = formatdate(localtime=use_localtime)
        msg['Message-ID'] = make_msgid()
        return from_email, recipients, msg.as_bytes(linesep='\r\n')

    async def send_one_email(self, email):
        # 发送一封邮件
        error_str, no_connection_error, retry_num = '', False, self.connection_pool.max_connections * 2
        for index in range(retry_num):
            try:
                try:
                    key, connection = self.connection_pool.get()
                    if connection:
                        from_email, recipients, message = self.format_email(connection.user, Email(*email))
                        if not recipients:
                            return '邮件发送失败,请检查收件人信息是否正确'
                        if not from_email:
                            return '邮件发送失败,请检查发件人信息是否正确'
                        senderrs = connection.sendmail(from_email, recipients, message)
                        if senderrs:
                            raise UnknownError(senderrs)
                        return True
                    elif index == retry_num - 1:
                        no_connection_error = True
                except Exception as e:
                    error_str = traceback.format_exc()
                    raise e
            # 异常处理根据需要自定义
            except smtplib.SMTPRecipientsRefused as e:
                self.logging.error('%s\nEmail info: %s' % (error_str, email))
                return '邮件发送失败,请检查收件人邮箱是否正确'
            except (smtplib.SMTPSenderRefused, smtplib.SMTPDataError, AttributeError, ValueError) as e:
                self.logging.error('%s\nEmail info: %s' % (error_str, email))
                self.connection_pool.replace(key, connection)
            except (ssl.SSLError, smtplib.SMTPServerDisconnected) as e:
                self.logging.error('%s\nEmail info: %s' % (error_str, email))
                self.connection_pool.replace(key, connection)
            except Exception:
                self.logging.error('%s\nEmail info: %s' % (error_str, email))
                return '邮件发送失败,请稍后再试'
        if no_connection_error:
            if error_str:
                error_str = '邮件连接全部失效,请检查是否被邮箱服务器加入黑名单,最后的异常:\n' + error_str
            else:
                error_str = '邮件连接全部失效,请检查是否被邮箱服务器加入黑名单'
        self.logging.error('%s\nEmail info: %s' % (error_str, email))
        return '邮件发送失败,请稍后再试'

    async def send_some_emails(self, emails):
        ''' 发送一个批次的邮件 '''
        tasks = [ensure_future(self.send_one_email(email), loop=self.io_loop) for email in emails]
        results = await gather(*tasks, loop=self.io_loop, return_exceptions=True)
        return results

    async def send_all_emails(self, emails):
        ''' 按照步长分批次发送所有邮件 '''
        # 把邮件按照步长分成多个批次
        tasks = [ensure_future(self.send_some_emails(emails[index: index + self.send_step]), loop=self.io_loop) for index in range(0, len(emails), self.send_step)]
        the_results = await gather(*tasks, loop=self.io_loop, return_exceptions=True)
        results = []
        for result in the_results:
            results.extend(result)
        return results

    def run_send_email_server(self):
        while self.running:
            email_info = self.redis.lpop(self.emails_list_key)
            if email_info:
                email_info = json.loads(email_info)
                result = run_coroutine_threadsafe(self.send_all_emails(email_info['emails']), self.io_loop).result()
                self.redis.set(email_info['send_task_id'], json.dumps(result), 60)
            else:
                time.sleep(0.1)

    def start(self):
        # 初始化与邮箱服务器的连接和连接保活服务
        self.connection_pool.start()
        print('Connection pool started.')

        # 启动一个协程事件循环
        thread = Thread(target=self.io_loop.run_forever)
        thread.setDaemon(True)
        thread.start()

        # 启动发送短信的服务
        thread = Thread(target=self.run_send_email_server)
        thread.setDaemon(True)
        thread.start()

    def stop(self):
        self.running = False
        self.connection_pool.close()
        print('Connection pool closed.')


class EmailSender:
    def __init__(self, emails_list_key='', redis=None):
        self.emails_list_key = emails_list_key
        self.redis = redis

    def send_emails(self, emails):
        # 发送邮件并等待结果
        ok_redis_key = 'emails_ok:%s' % uuid.uuid1().hex
        self.redis.rpush(self.emails_list_key, json.dumps({'send_task_id': ok_redis_key, 'emails': emails}))
        while True:
            result = self.redis.get(ok_redis_key)
            if result:
                self.redis.delete(ok_redis_key)
                return json.loads(result)
            else:
                time.sleep(0.1)


def get_html_content(email_title='', email_charset='utf-8'):
    # 格式化邮件内容,以发送html格式的邮件为例
    content = '''
        <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
        <html xmlns="http://www.w3.org/1999/xhtml">
         <head>
          <meta http-equiv="Content-Type" content="text/html; charset={{ render_data.charset }}" />
          <title>{{ render_data.title }}</title>
          <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
        <style>
         table th, table td{
              line-height: 1.4em;
              font-size: 14px;
          }
        </style>
         </head>
          <body style="margin: 0; padding: 0;">
            {{ render_data.data }}
          </body>
        </html>
    '''
    template = Template(content)
    render_data = {'title': email_title, 'data': {'示例': '内容'}, 'charset': email_charset}
    return template.render(Context({'render_data': render_data}))


def get_emails(email_num=2, receivers=[]):
    email_charset = 'utf-8'
    emails = []
    for email_index in range(email_num):
        email_title = '这是一封测试邮件[%s]-[%.2f]' % (email_index, time.time())
        # 格式化邮件内容,以发送html格式的邮件为例
        email_content = get_html_content(email_title, email_charset)
        sender_host = ''  # 填写则为代发模式
        emails.append([email_title, email_content, '旷古的寂寞', receivers, email_charset, sender_host])
        # emails.append([email_title, '测试邮件内容', '旷古的寂寞', receivers, email_charset])
    return emails


emails_list_key = 'emails_list'
redis_session = redis.Redis()
receivers = ['*******@163.com', '********@qq.com']
email_server = EmailServer(send_step=20, emails_list_key=emails_list_key, redis=redis_session, connection_pool_kwargs={
    'host': '******',  # 邮箱服务器的地址
    'port': 587,  # 邮箱服务器的端口号
    'send_email_user': '*****',  # 发件用户的SMPT服务账号(收件人看到的发件地址)
    'send_email_password': '****',  # 发件用户的SMPT服务账号的密码,注意是发件邮箱配置的SMPT服务的密码,不是发件邮箱登陆密码
    'max_connections': 20,  # 一个IP地址能够同时建立的连接数(连接池的大小),163邮箱为10个,outlook邮箱为20个
    'use_ssl': True,  # smtp服务是否开启了ssl验证
    # 'connection_lifetime': 60,  # 连接的存活时间,到达这个时间后就替换掉该连接,一般不用配置
    # 're_helo_time': 10  # 连接的心跳时间间隔,每隔一定时间和邮箱服务器helo一下保证服务器不断开连接,一般不用配置
})
email_sender = EmailSender(emails_list_key, redis_session)

if __name__ == '__main__':
    email_server.start()
    result = email_sender.send_emails(get_emails(email_num=2, receivers=receivers))
    print(result)
    email_server.stop()

 

上一篇:angular7响应式表单基本使用


下一篇:E. Email Destruction---大模拟