(1)如有用成熟的监控系统prometheus或者zabbix等开源的监控系统,没必要单独自己写脚本来取值监控,有时兴趣来了写了个简单的脚本,监控mysql与postgres数据库lag:
#coding=utf-8 import pymysql,psycopg2 import requests,datetime,json,hmac,base64,hashlib import pytz,time,urllib.parse tz = pytz.timezone('Asia/Shanghai') db_list= [('postgres','xxxxx','10.x.x.x',5432)] URL="xxxxxxx" secret="xxxx" def get_timestamp_sign(): timestamp = str(round(time.time() * 1000)) secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return (timestamp, sign) def get_signed_url(): timestamp, sign = get_timestamp_sign() webhook = URL + "×tamp="+timestamp+"&sign="+sign return webhook def get_webhook(mode): if mode == 0: webhook = URL elif mode == 1 or mode ==2 : webhook = get_signed_url() else: webhook = "" print("error! mode: ",mode," webhook : ",webhook) return webhook def get_all_slave_lag(x): cnx = pymysql.connect(user=x[0], password=x[1], host=x[2], port=x[3]) cursor = cnx.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute("show slave status;") kinfo = cursor.fetchall() for row in kinfo: sync_host=row['Master_Host']+':'+str(row['Master_Port']) sync_lag=row['Seconds_Behind_Master'] if str(sync_lag)>str(60): title = "<font color=#FF0000 size=3>mysql master-slave sos</font>" else: title = "<font color=#008000 size=3>mysql master-slave ok</font>" io_thread=row['Slave_IO_Running'] sql_thread=row['Slave_SQL_Running'] sendmsg = { "msgtype": "markdown", "markdown": { "title": "mysql lags" + "....", "text": "报警主题 :" + str(title) + "\n\n>监控主机:" + str(sync_host) + "\n\n>报警时间:" + str(datetime.datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')) + "\n\n>报警信息:" + "err_msg=" + "io_thread: " + str(io_thread) + " sql_thread: " + str(sql_thread) + " lag_time: " + str(sync_lag) }, "at": { "atMobiles": [ "123456" ] }, "isAtAll": "False" } if (io_thread and sql_thread) != "Yes": is_sned_alerts(data=json.dumps(sendmsg)) else: pass cursor.close() cnx.close() def get_pg_slave_lag(x): cnx = psycopg2.connect(user=x[0], password=x[1], host=x[2], port=x[3]) cursor = cnx.cursor() cursor.execute("select client_addr,pg_wal_lsn_diff(pg_current_wal_insert_lsn(),pg_current_wal_lsn())as local_noflush_delay, \ pg_wal_lsn_diff(pg_current_wal_lsn(),sent_lsn)as local_send_delay,\ pg_wal_lsn_diff(sent_lsn,write_lsn)as stream_write_delay, \ pg_wal_lsn_diff(sent_lsn,flush_lsn)as stream_flush_delay, \ pg_wal_lsn_diff(sent_lsn,replay_lsn)as stream_replay_delay \ from pg_stat_replication;") kinfo = cursor.fetchall() sync_host=kinfo[0][0] local_noflush_delay=kinfo[0][1] local_send_delay=kinfo[0][2] stream_write_delay=kinfo[0][3] stream_flush_delay = kinfo[0][4] stream_replay_delay = kinfo[0][5] # res={"sync_host":sync_host,"local_noflush_delay":local_noflush_delay,"stream_write_delay":stream_write_delay,"stream_flush_delay":stream_flush_delay,"stream_replay_delay":stream_replay_delay,"local_send_delay":local_send_delay} sendmsg = { "msgtype": "markdown", "markdown": { "title": "mysql lags" + "....", "text": "报警主题 :" + "postgres sysnc error" + "\n\n>监控主机:" + str(sync_host) + "\n\n>报警时间:" + str(datetime.datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')) + "\n\n>报警信息:" + "err_msg=" + "local_noflush_delay: " + str(local_noflush_delay) + " local_send_delay: " + str(local_send_delay) + " stream_write_delay: " + str(stream_write_delay) + "stream_flush_delay:" +str(stream_flush_delay)+"stream_replay_delay:"+str(stream_replay_delay) }, "at": { "atMobiles": [ "123456" ] }, "isAtAll": "False" } if (local_noflush_delay >10000 or local_send_delay>6000 or stream_write_delay>6000 or stream_flush_delay>6000 or stream_replay_delay>10000) : is_sned_alerts(data=json.dumps(sendmsg)) else: pass cursor.close() cnx.close() def is_sned_alerts(data): webhook = get_webhook(1) headers = {'Content-Type': 'application/json'} req=requests.post(url=webhook, data=data, headers=headers) result=req.json() if result['errcode'] !=0: print('notify dintalk error :%s' % result['errcode']) start_time = datetime.datetime.now() if __name__ == "__main__": # def lambda_handler(event, context): #get_all_slave_lag(db_list[0]) #get_all_slave_lag(db_list[1]) get_pg_slave_lag(db_list[0]) end_time= datetime.datetime.now() time_cost = end_time - start_time print("当前脚本运行耗时为: " + str(time_cost).split('.')[0])