实现了通过企业微信发送消息,平时用于运维的告警还是不错的,相对于邮件来说,实时性更高,不过就是企业微信比较麻烦,此处不做过多解释。
企业微信api的详细请看:http://work.weixin.qq.com/api/doc#10167
话不多说,直接代码
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 # @Time : 2018/4/25 17:06 5 # @Author : zms 6 # @Site : 7 # @File : WeChat.py 8 # @Software: PyCharm Community Edition 9 10 # !/usr/bin/env python 11 # coding:utf-8 12 # file wechat.py 13 14 import time 15 import requests 16 import json 17 18 import sys 19 20 reload(sys) 21 sys.setdefaultencoding(‘utf8‘) 22 23 24 class WeChat: 25 def __init__(self): 26 self.CORPID = ‘***********‘ 27 self.CORPSECRET = ‘*********************************‘ 28 self.AGENTID = ‘**************‘ 29 self.TOUSER = "**********" # 接收者用户名 30 31 def _get_access_token(self): 32 url = ‘https://qyapi.weixin.qq.com/cgi-bin/gettoken‘ 33 values = {‘corpid‘: self.CORPID, 34 ‘corpsecret‘: self.CORPSECRET, 35 } 36 req = requests.post(url, params=values) 37 data = json.loads(req.text) 38 # print data 39 return data["access_token"] 40 41 def get_access_token(self): 42 try: 43 with open(‘./tmp/access_token.conf‘, ‘r‘) as f: 44 t, access_token = f.read().split() 45 except: 46 with open(‘./tmp/access_token.conf‘, ‘w‘) as f: 47 access_token = self._get_access_token() 48 cur_time = time.time() 49 f.write(‘\t‘.join([str(cur_time), access_token])) 50 return access_token 51 else: 52 cur_time = time.time() 53 if 0 < cur_time - float(t) < 7260: 54 return access_token 55 else: 56 with open(‘./access_token.conf‘, ‘w‘) as f: 57 access_token = self._get_access_token() 58 f.write(‘\t‘.join([str(cur_time), access_token])) 59 return access_token 60 61 def send_data(self, message): 62 msg = message.encode(‘utf-8‘) 63 send_url = ‘https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=‘ + self.get_access_token() 64 send_values = { 65 "touser": self.TOUSER, 66 "msgtype": "text", 67 "agentid": self.AGENTID, 68 "text": { 69 "content": msg 70 }, 71 "safe": "0" 72 } 73 send_data = ‘{"msgtype": "text", "safe": "0", "agentid": %s, "touser": "%s", "text": {"content": "%s"}}‘ % ( 74 self.AGENTID, self.TOUSER, msg) 75 r = requests.post(send_url, send_data) 76 # print r.content 77 return r.content 78 79 80 if __name__ == ‘__main__‘: 81 wx = WeChat() 82 wx.send_data("test")