postgres与mysql延时监控

(1)如有用成熟的监控系统prometheus或者zabbix等开源的监控系统,没必要单独自己写脚本来取值监控,有时兴趣来了写了个简单的脚本,监控mysql与postgres数据库lag:

#coding=utf-8
import pymysql,psycopg2
import requests,datetime,json,hmac,base64,hashlib
import pytz,time,urllib.parse
tz = pytz.timezone('Asia/Shanghai')

db_list= [('postgres','xxxxx','10.x.x.x',5432)]

URL="xxxxxxx"
secret="xxxx"
def get_timestamp_sign():
    timestamp = str(round(time.time() * 1000))
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc,
                         digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return (timestamp, sign)

def get_signed_url():
    timestamp, sign = get_timestamp_sign()
    webhook = URL + "&timestamp="+timestamp+"&sign="+sign
    return webhook

def get_webhook(mode):
    if mode == 0: 
       webhook = URL
    elif mode == 1 or  mode ==2 : 
        webhook = get_signed_url()
    else:
        webhook = ""
        print("error! mode:   ",mode,"  webhook :  ",webhook)
    return webhook

def get_all_slave_lag(x):
    cnx = pymysql.connect(user=x[0], password=x[1], host=x[2], port=x[3])
    cursor = cnx.cursor(cursor=pymysql.cursors.DictCursor)
    cursor.execute("show slave status;")
    kinfo = cursor.fetchall()
    for row in kinfo:
        sync_host=row['Master_Host']+':'+str(row['Master_Port'])
        sync_lag=row['Seconds_Behind_Master']
        if str(sync_lag)>str(60):
            title = "<font color=#FF0000 size=3>mysql master-slave sos</font>"
        else:
            title = "<font color=#008000 size=3>mysql master-slave ok</font>"
        io_thread=row['Slave_IO_Running']
        sql_thread=row['Slave_SQL_Running']
        sendmsg = {
            "msgtype": "markdown",
            "markdown": {
                "title": "mysql lags" + "....",
                "text": "报警主题 :" + str(title) +
                        "\n\n>监控主机:" + str(sync_host) +
                        "\n\n>报警时间:" + str(datetime.datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')) +
                        "\n\n>报警信息:" + "err_msg=" + "io_thread: " + str(io_thread) + " sql_thread: " + str(sql_thread) + " lag_time: " + str(sync_lag)
            },
            "at": {
                "atMobiles": [
                    "123456"
                ]
            },
            "isAtAll": "False"
        }
        if (io_thread and sql_thread) != "Yes":
            is_sned_alerts(data=json.dumps(sendmsg))

        else:
            pass

    cursor.close()
    cnx.close()

def get_pg_slave_lag(x):
    cnx = psycopg2.connect(user=x[0], password=x[1], host=x[2], port=x[3])
    cursor = cnx.cursor()
    cursor.execute("select client_addr,pg_wal_lsn_diff(pg_current_wal_insert_lsn(),pg_current_wal_lsn())as local_noflush_delay, \
                   pg_wal_lsn_diff(pg_current_wal_lsn(),sent_lsn)as local_send_delay,\
                   pg_wal_lsn_diff(sent_lsn,write_lsn)as stream_write_delay, \
                   pg_wal_lsn_diff(sent_lsn,flush_lsn)as stream_flush_delay, \
                   pg_wal_lsn_diff(sent_lsn,replay_lsn)as stream_replay_delay \
                     from pg_stat_replication;")
    kinfo = cursor.fetchall()
    sync_host=kinfo[0][0]
    local_noflush_delay=kinfo[0][1]
    local_send_delay=kinfo[0][2]
    stream_write_delay=kinfo[0][3]
    stream_flush_delay = kinfo[0][4]
    stream_replay_delay = kinfo[0][5]
    # res={"sync_host":sync_host,"local_noflush_delay":local_noflush_delay,"stream_write_delay":stream_write_delay,"stream_flush_delay":stream_flush_delay,"stream_replay_delay":stream_replay_delay,"local_send_delay":local_send_delay}

    sendmsg = {
        "msgtype": "markdown",
        "markdown": {
            "title": "mysql lags" + "....",
            "text": "报警主题 :" + "postgres sysnc error" +
                    "\n\n>监控主机:" + str(sync_host) +
                    "\n\n>报警时间:" + str(datetime.datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')) +
                    "\n\n>报警信息:" + "err_msg=" + "local_noflush_delay: " + str(local_noflush_delay) + " local_send_delay: " + str(local_send_delay) + " stream_write_delay: " + str(stream_write_delay) +
                        "stream_flush_delay:" +str(stream_flush_delay)+"stream_replay_delay:"+str(stream_replay_delay)
        },
        "at": {
            "atMobiles": [
                "123456"
            ]
        },
        "isAtAll": "False"
    }
    if (local_noflush_delay >10000 or local_send_delay>6000 or stream_write_delay>6000 or stream_flush_delay>6000 or stream_replay_delay>10000) :
        is_sned_alerts(data=json.dumps(sendmsg))

    else:
        pass

    cursor.close()
    cnx.close()

def is_sned_alerts(data):
    webhook = get_webhook(1)
    headers = {'Content-Type': 'application/json'}
    req=requests.post(url=webhook, data=data, headers=headers)
    result=req.json()
    if result['errcode'] !=0:
        print('notify dintalk error :%s' % result['errcode'])

start_time = datetime.datetime.now()
if __name__ == "__main__":
# def lambda_handler(event, context):
    #get_all_slave_lag(db_list[0])
    #get_all_slave_lag(db_list[1])
    get_pg_slave_lag(db_list[0])
end_time= datetime.datetime.now()
time_cost = end_time - start_time
print("当前脚本运行耗时为: " + str(time_cost).split('.')[0])

 

上一篇:java8中lambda的用法(map转list,list转map等等


下一篇:Stream流体系(概述,获取,常用API,综合应用与收集)