如何监控PostgreSQL存储过程/函数代码运行?本文介绍用python+微信/邮件的方式进行报警、监控。
首先要有一张表、用于存放PostgreSQL存储过程/函数代码运行异常的信息。
处理原则:若出现异常;把“发生时间+所在的程序+原因”通过微信/邮件发给对应人员。当然发送一次即可;起到通知的效果。
一、媒介
通过什么方式进行发送内容;下面介绍微信/邮件两种方式
1、python发送微信
py_wechar.py的内容
企业微信号;大家可以到企业微信上配置
#!/usr/bin/python3
#coding=utf-8
import json
import time
import urllib.request as urllib2
options = {
? ? ‘WeiXin‘: {
? ? ? ? ? ? ‘corp_id‘: ‘*‘,? #微信企业号ID
? ? ? ? ? ? ‘agent_id‘: ‘*‘, #微信企业号应用ID
? ? ? ? ? ? ‘agent_secret‘: ‘*‘,? #微信企业号密钥
? ? ? ? ? ? ‘to_user‘: ‘@all‘? #发送给谁
? ? },
}
class WeiXinSendMsg:
? ? def __init__(self, wx_conf):
? ? ? ? self.corp_id = wx_conf.get(‘corp_id‘)
? ? ? ? self.agent_secret = wx_conf.get(‘agent_secret‘)
? ? ? ? self.agent_id = wx_conf.get(‘agent_id‘)
? ? ? ? self.to_user = wx_conf.get(‘to_user‘)
? ? ? ? self.token = self.get_token()?
? ? ? ? self.token_update_time = int(time.time())
? ? ? ??
? ? def get_token(self):
? ? ? ? get_token_url = ‘https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=‘ + self.corp_id + ‘&corpsecret=‘ + self.agent_secret
? ? ? ? token = json.loads(urllib2.urlopen(get_token_url).read().decode(‘utf-8‘))[‘access_token‘]
? ? ? ? if token:
? ? ? ? ? ? return token
? ? # 微信发送端的token每1800秒会更新一次
? ? def update_token(self):
? ? ? ? if int(time.time()) - self.token_update_time >= 1800:
? ? ? ? ? ? self.token = self.get_token()
? ? ? ? ? ? self.token_update_time = int(time.time())
? ? def send_message(self, msg):
? ? ? ? try:
? ? ? ? ? ? self.update_token()
? ? ? ? ? ? send_url = ‘https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=‘ + self.token
? ? ? ? ? ? send_val = {"touser":self.to_user, "toparty":"", "msgtype":"text", "agentid":self.agent_id, "text":{"content":msg}, "safe":"0"}
? ? ? ? ? ? send_data = json.dumps(send_val, ensure_ascii=True).encode("utf-8")
? ? ? ? ? ? send_request = urllib2.Request(send_url, send_data)
? ? ? ? ? ? response = json.loads(urllib2.urlopen(send_request).read())
? ? ? ? except Exception as e:
? ? ? ? ? ? print(‘Exception WeiXin send_message:‘, e)
if __name__ == ‘__main__‘:
? ? WeiXin = WeiXinSendMsg(options.get(‘WeiXin‘))
? ? WeiXin.send_message(‘hello world / 测试‘)
2、python发送邮件
py_email.py的内容
#!/usr/bin/python3
#coding=utf-8
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
options = {
? ? ‘Email‘: {
? ? ? ? ‘smtp_server‘: ‘smtp.exmail.qq.com‘,? #邮箱服务器地址
? ? ? ? ‘from_addr‘: ‘monitor@qq.com‘,? #发送人账号
? ? ? ? ‘password‘: ‘123456‘, #发送人密码
? ? ? ? ‘to_addr‘: [‘hanbo@126.com‘, ‘hanbo@163.com‘], #发送给谁
? ? }
}
class EmailSendMsg:
? ? def __init__(self, email_conf):
? ? ? ? self.smtp_server = email_conf.get(‘smtp_server‘)
? ? ? ? self.from_addr = email_conf.get(‘from_addr‘)
? ? ? ? self.password = email_conf.get(‘password‘)
? ? ? ? self.to_addr = email_conf.get(‘to_addr‘)
? ? # def __del__(self):
? ? #? ? ?self.server.quit()
? ? def format_addr(self, str):
? ? ? ? name, addr = parseaddr(str)
? ? ? ? return formataddr(( ? ? ? ? ? ? Header(name, ‘utf-8‘).encode(), ? ? ? ? ? ? addr.encode(‘utf-8‘) if isinstance(addr, unicode) else addr))
? ??
? ? def send_msg(self, text):
? ? ? ? try:
? ? ? ? ? ? self.server = smtplib.SMTP(self.smtp_server, 25)
? ? ? ? ? ? self.server.set_debuglevel(1)
? ? ? ? ? ? self.server.login(self.from_addr, self.password)
? ? ? ? ? ? msg = MIMEText(text, ‘plain‘, ‘utf-8‘)
? ? ? ? ? ? msg[‘From‘] = self.format_addr(u‘监控 <%s>‘ % self.from_addr)
? ? ? ? ? ? for i in range(len(self.to_addr)):
? ? ? ? ? ? ? ? msg[‘To‘] = self.format_addr(u‘<%s>‘ % self.to_addr[i])
? ? ? ? ? ? msg[‘Subject‘] = Header(u‘异常报警…‘, ‘utf-8‘).encode()
? ? ? ? ? ? self.server.sendmail(self.from_addr, self.to_addr, msg.as_string())
? ? ? ? ? ? self.server.quit()
? ? ? ? except Exception as e:
? ? ? ? ? ? print ‘Exception Email send_message:‘, e
if __name__ == ‘__main__‘:
Email = EmailSendMsg(options.get(‘Email‘))
Email.send_msg(‘hello world!‘)
二、python连接数据库
看这个链接可以研究下python如何连接PostgreSQL数据库
三、python报警
上面我们知道如何通过python发送微信内容、以及python连接PostgreSQL数据库。现在我们要如何获取报警时机;报警内容。
python_alert.py
#!/usr/bin/python3
?
import psycopg2
from config import config
from py_wechar import WeiXinSendMsg,options
def get_errors():
? ? """ query data from the vendors table """
? ? conn = None
? ? try:
? ? ? ? params = config()
? ? ? ? WeiXin = WeiXinSendMsg(options.get(‘WeiXin‘))
? ? ? ? conn = psycopg2.connect(**params)
? ? ? ? cur = conn.cursor()
? ? ? ? cur.execute("select error_time, error_desc, proc_name from adsas.tbl_error_log where deal_status = 0 order by id")
? ? ? ? rows = cur.fetchall()
? ? ? ? if cur.rowcount > 0 :
? ? ? ? ? ? WeiXin.send_message("The number of parts: {}".format(cur.rowcount))
? ? ? ? ? ? for row in rows:
? ? ? ? ? ?# WeiXin.send_message(‘-‘*60)
? ? ? ? ? ?# WeiXin.send_message(‘发生时间:{}‘.format(row[0]))
? ? ? ? ? ?# WeiXin.send_message(‘错误原因:{}‘.format(row[1]))
? ? ? ? ? ?# WeiXin.send_message(‘报警代码:{}‘.format(row[2]))
? ? ? ? ? ? ? ? str_error=‘发生时间:{}\n错误原因:{}\n报警代码:{}‘.format(row[0],row[1],row[2])
? ? ? ? ? ? ? ? WeiXin.send_message(str_error)
? ? ? ? ? ? cur.execute("update adsas.tbl_error_log set deal_status = 1 where deal_status = 0 ")
? ? ? ? conn.commit()
? ? ? ? cur.close()
? ? except (Exception, psycopg2.DatabaseError) as error:
? ? ? ? print(error)
? ? finally:
? ? ? ? if conn is not None:
? ? ? ? ? ? conn.close()
if __name__ == ‘__main__‘:
? ? get_errors()
四、部署
可以通过cron/或者开源的定时任务系统进行报警;
报警信息: