zabbix 线路质量监控自定义python ping 模块(socket+deque版),批量监控线路质量并自定义阈值联动mtr保存线路故障日志

前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重

后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控

 

#!/usr/bin/env python3
#-*-coding:utf-8-*-
from collections import deque
import itertools,time
import queue,json
import argparse,sys,re,os,subprocess
import time,socket
import threading
from functools import reduce

ipqli=[]
#避免系统权限问题没有使用ping3
class Ping:
    def __init__(self,ip,flag,inver=1,count=20):
        ip = tuple(ip)
        self.sip,self.tip=ip
        self.inver=inver
        self.count=count
        self.flag=flag
        restime_name = 'restime_deque'+''.join(ip).replace('.','')
        pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
        locals()[restime_name] = deque(maxlen=60)
        locals()[pkloss_name] = deque(maxlen=60)
        self.restime_deque = locals()[restime_name]
        self.pkloss_deque = locals()[pkloss_name]
        self.ret_restime_deque = globals()[restime_name]
        self.ret_pkloss_deque = globals()[pkloss_name]
      
        self.compile= r'(?<=time=)\d+\.?\d+(?= ms)'
    def fastping(self):
        cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip)
        ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
        try:
            value=re.findall(self.compile, ret,re.S)[0]
            self.restime_deque.append(value)
            self.pkloss_deque.append(0)
        except:
            self.pkloss_deque.append(1)
            self.restime_deque.append(0)
    def ping(self):
        index = 0
        res_count=0
        while index<self.count:
            start_time = time.time()
            cmd = 'ping -i %s -c 1 -W 1 -I %s %s'%(self.inver,self.sip,self.tip)
            ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8')
            try:
                value=re.findall(self.compile, ret,re.S)[0]
                self.restime_deque.append(value)
                self.pkloss_deque.append(0)
                res_count+=1
            except:
                self.pkloss_deque.append(1)
                self.restime_deque.append(0)
            index+=1
            if not self.flag == len(ipqli):
                break
            usetime = time.time()-start_time
            sleep_time = self.inver - usetime if usetime<self.inver else self.inver
            time.sleep(sleep_time)
        return index,res_count
    def ping_value(self):
        start_time = time.time()
        count = self.count
        rescount = self.count
        if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
            fastli=[]
            for x in range(self.count):
                t = threading.Thread(target=self.fastping)
                t.start()
                fastli.append(t)
            for th in fastli:
                th.join()
        else:
            count,rescount = self.ping()
            rescount=count if rescount==0 else rescount
        use_time = round(time.time()-start_time,4)
        li = [self.restime_deque.pop() for x in range(count)]
        pkli = [self.pkloss_deque.pop() for x in range(count)]
        restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount
        pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
        return (restime,pkloss,use_time)   
       
#server端代码
class Server():
    def __init__(self,sock):
        global ipqli
        self.ipqli=ipqli
        self.thli=[]
        self.sock=sock
        self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
    #server程序启动入口
    @classmethod
    def start(cls):
        s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        address = ('127.0.0.1',6589)
        s.bind(address)
        s.listen(100)
        obj = cls(s)
        ping_server=threading.Thread(target=obj.server)
        ping_server.start()
        obj.thli.append(ping_server)
        create_t = threading.Thread(target=obj.create)
        create_t.start()
        obj.thli.append(create_t)
        for t in obj.thli:
            t.join()
    def server(self):
        while True:
            conn,addr = self.sock.accept() 
            data=conn.recv(1024) 
            data = data.decode('utf-8')
            data = json.loads(data)
            ip,item = data
            restime_ipq = 'restime_deque'+''.join(ip).replace('.','')
            pkloss_ipq = 'pkloss_deque'+''.join(ip).replace('.','')
            if ip not in self.ipqli:
                globals()[restime_ipq] = deque(maxlen=10)
                globals()[pkloss_ipq] = deque(maxlen=10)
                self.ipqli.append(ip)
            self.sendvalue(conn,ip,item)
            conn.close()
    def create(self):
        create_list =[]
        while True:
  
            if self.ipqli:
                leng = len(self.ipqli)
                for ip in self.ipqli:
                    t=threading.Thread(target=self.makevalue,args=(ip,leng))
                    t.start()
                    create_list.append(t)
                for t in create_list:
                    t.join()
    
    def makevalue(self,ip,leng):
        restime_name = 'restime_deque'+''.join(ip).replace('.','')
        pkloss_name = 'pkloss_deque'+''.join(ip).replace('.','')
        restime_ipq = globals()[restime_name]
        pkloss_ipq = globals()[pkloss_name]
        obj = Ping(ip,leng)
        while leng==len(self.ipqli):
                time.sleep(0.1)
                restime,pkloss,use_time=obj.ping_value()            
                restime_ipq.append((restime,use_time))
                pkloss_ipq.append((pkloss,use_time))
    def sendvalue(self,conn,ip,item):
        fromat_ip=''.join(ip).replace('.','')
        _,tip=ip
        restime_name = 'restime_deque'+fromat_ip
        pkloss_name = 'pkloss_deque'+fromat_ip
        restime_ipq = globals()[restime_name]
        pkloss_ipq = globals()[pkloss_name]
        mtr_dir = self.basedir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
        mtr_cmd = self.basedir + '/mtr.sh'+' '+tip+' '+mtr_dir
        if item =='restime':
            while True:
                if  len(restime_ipq)>1:
                    ret,use_time = restime_ipq.pop()
                    hisret,_=restime_ipq[-1]
                    if ret - hisret >20:
                        subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
                    break                        
        elif item =='pkloss':
            while True:
                if len(pkloss_ipq) >1:
                    ret,use_time = pkloss_ipq.pop()
                    if 100> ret  >20:
                        subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
                    break
        conn.sendall(str(ret).encode())


#IP格式检查模块
class Ipcheck():
    def __init__(self,sip,tip,item):
        self.sip =sip
        self.tip=tip
        self.item=item
    def check(self):
        if self.item not in ['restime','pkloss']:
            return False
        elif not self.checkipformat():
            return False
        else:
            return True
    def check_fun(self,ip):
        return int(ip)<256
    def checkipformat(self):
        try:
            tiplist = self.tip.split('.')
            tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.tip)
            if  self.sip:
                siplist = self.sip.split('.')
                sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', self.sip)
            else:
                siplist=[1,1,1,1]
                sipformat=True
            if not tipformat or not sipformat:
                raise
            check_ipli = tiplist+siplist
            return self.checkiplength(check_ipli)
        except:
            return False
    def checkiplength(self,check_ipli):
        if list(itertools.filterfalse(self.check_fun, check_ipli)):
            return False
        else:
            return True        

#server端后台启动函数
def run():
    base_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
    cmd = 'python3 %s/newpingd.py -S server'%base_dir
    subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
#客户端代码
def socket_client(ip,item):
    try:
        s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        s.connect(('127.0.0.1',6589))
        data = [ip,item]
        data = json.dumps(data)
        s.sendall(data.encode())
        ret = s.recv(1024)
        print(ret.decode())
    except:
        print('server will start')
        sys.exit(0)
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='icmp for monitor')
    parser.add_argument('-S',action = 'store',dest='server')
    parser.add_argument('-t',action = 'store',dest='tip')
    parser.add_argument('-s',action = 'store',dest='sip')
    parser.add_argument('-I',action='store',dest='item')
    args= parser.parse_args()
    server_status_cmd = "ps -ef | grep 'newpingd.py -S server' | grep -v grep | cut -c 9-16"
    server_status  = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
    if not server_status:
        run()
    if args.server:
        Server.start()
        sys.exit(0)
    tip = args.tip
    sip = args.sip
    item = args.item
    ip=(sip,tip)
    check = Ipcheck(sip, tip, item)
    if not check.check():
        print('ip or item error')
        print('-------example-----------')
        print('------- darkping -s 10.0.0.2 -t 10.0.0.1 -I restime -----------')
        sys.exit(0)
    socket_client(ip,item)

 

 

 

 

mtr.sh

#!/usr/bin/env bash
IP=$1
dir=$2
mtr -r -n -c 30 -w -b $IP >> $2

 

效果

zabbix 线路质量监控自定义python ping 模块(socket+deque版),批量监控线路质量并自定义阈值联动mtr保存线路故障日志

 

Mysql 版

https://www.cnblogs.com/darkchen/p/15524856.html

上一篇:深入理解Ember-Data特性(上)


下一篇:Mac中brew的安装