snmp扫描

import argparse
import json
import string
import threading
import time
from decimal import Decimal

from pysnmp.entity.rfc3413.oneliner import cmdgen


class SnmpDetect:
    OID_url = 'https://blog.csdn.net/qq_28657577/article/details/82834442'

    def __init__(self, host, port, key_file='snmp.json', timeout=0):
        """
        :param host: 地址(eg:192.168.100.115)
        :param port: 端口
        :param OID: 传送的OID,个人认为MIB值
        :param key: 表示社区名
        :param new_key: 最终正确
        :param bind_list: 扫描的结果
        :param key_list: 常用社区名
        :param max_key: 社区名最大位数
        :param max_threads_number: 最大线程数
        :param max_threads: 限制线程的最大数量
        """
        self.host = host
        self.port = port
        self.timeout = timeout
        self.OID = '.1.3.6.1.2.1.1.1.0'
        # self.key = 'public'
        self.new_key = None
        self.bind_list = []
        self.key_file = key_file
        self.key_list = self._get_key_list()
        self.max_key = 10
        self.max_threads_number = 1000
        self.max_threads = threading.Semaphore(self.max_threads_number)  # 限制线程的最大数量为4个

    def _get_key_list(self):
        # 默认为public
        # key_list = ['public', 'privicy', 'Cisco', 'H3C']
        with open(self.key_file, 'r', encoding='utf-8') as r:
            key_list = json.loads(r.read())
        return key_list

    def __get_key(self, charsetString, length, key_list=None):
        if key_list is None:
            key_list = []
        if length == 0:
            return key_list
        new_key_list = []
        if len(key_list) == 0:
            new_key_list = [c for c in charsetString]
        else:
            for i in key_list:
                for j in charsetString:
                    d = i + j
                    new_key_list.append(d)
        length -= 1
        return self.__get_key(charsetString, length, new_key_list)

    def get_key(self):
        # charsetString = string.digits + string.ascii_letters + string.punctuation
        charsetString = string.digits + string.ascii_letters
        length = 1
        lis = []
        for i in range(1, self.max_key + 1):
            key_list = self.__get_key(charsetString, i)
            lis = lis + key_list
        return lis

    def __connect(self, key):
        crack = False
        bind_list = []
        try:
            errorIndication, errorStatus, errorIndex, varBinds = cmdgen.CommandGenerator().getCmd(
                # 0代表v1,1代表v2c
                cmdgen.CommunityData('my-agent', key, 0),  # 社区信息,my-agent ,public 表示社区名,1表示snmp v2c版本,0为v1版本
                cmdgen.UdpTransportTarget((self.host, self.port)),
                # 这是传输的通道,传输到IP 192.168.70.237, 端口 161上(snmp标准默认161 UDP端口)
                self.OID,  # 传送的OID,个人认为MIB值
            )
            for bind in varBinds:
                d = str(bind).split(' = ')[-1]
                bind_list.append(d)
            if varBinds:
                crack = True
                self.new_key = key
                self.bind_list = bind_list
        except:
            pass
        time.sleep(self.timeout)
        return crack

    def connect(self, key, thread):
        if thread is True:
            with self.max_threads:
                crack = self.__connect(key)
        else:
            crack = self.__connect(key)
        return crack

    def blow_connect(self):
        key_list = self.get_key()
        threads = []
        for key in key_list:
            thread = threading.Thread(target=self.connect, args=(key, True,))
            threads.append(thread)
        for thread in threads:
            if self.new_key:
                return self.new_key, self.bind_list
            else:
                thread.start()
        for thread in threads:
            if self.new_key:
                return self.new_key, self.bind_list
            else:
                thread.join()
        return self.new_key, self.bind_list

    def normal_connect(self):
        for key in self.key_list:
            print(key)
            crack = self.connect(key, False)
            if crack:
                return key, self.bind_list
        else:
            return self.blow_connect()


def opt():
    parser = argparse.ArgumentParser(description='命令行中传入命令')
    parser.add_argument('-host', required=True, help='<host(主机)>')
    parser.add_argument('-port', type=int, help='<port(端口)>', default=161)
    parser.add_argument('-rule', help='<社区名字典文件>', default='snmp.json')
    parser.add_argument('-timeout', type=int, help='<间隔时间>', default=0)
    return parser.parse_args()


def get_host(host):
    host_list = []
    if '-' in host:
        h = host.split('-')
        left_host = h[0]
        left_host_list = left_host.split('.')
        right_host = h[1]
        right_host_list = right_host.split('.')
        for i in range(int(left_host_list[-1]), int(right_host_list[-1]) + 1):
            d = left_host_list[:3] + [str(i)]
            host_list.append('.'.join(d))
    elif '*' in host:
        rule = {
            '0': {'m': 1, 'n': 255 + 1},
            '1': {'m': 0, 'n': 99 + 1},
            '2': {'m': 0, 'n': 9 + 1},
        }
        h = host.split('.')
        r = h[-1].split('*')[0]
        for i in range(rule['{}'.format(len(r))]['m'], rule['{}'.format(len(r))]['n']):
            d = h[:3] + ['{}{}'.format(r, i)]
            host_list.append('.'.join(d))
    else:
        host_list.append(host)
    return host_list


def get_model():
    model = {
        'router': ['RB750Gr3', 'RB941-2nD', 'CCR1009-8G-1S'],
        'linux': ['R6100'],
    }


def main(flag=False, **kwargs):
    start_time = Decimal(time.time()).quantize(Decimal("0.00"))
    if flag:
        host = kwargs.get('host')
        port = kwargs.get('port')
        rule = kwargs.get('rule')
        timeout = kwargs.get('timeout')
        if not timeout:
            timeout = 0
        host_list = get_host(host)
    else:
        args = opt()
        host_list = get_host(args.host)
        port = args.port
        rule = args.rule
        timeout = args.timeout
    text = []
    for host in host_list:
        key, bind_list = SnmpDetect(host, port, key_file=rule, timeout=timeout).normal_connect()
        d = {
            'host': host,
            'port': port,
            'key': key,
            'bind_list': bind_list,
        }
        text.append(d)
    end_time = Decimal(time.time()).quantize(Decimal("0.00"))
    print('结果为:\n{},\n用时:{}s'.format(text, end_time - start_time))
    return {'result': text, 'hold_time': '{}s'.format(end_time - start_time)}


if __name__ == '__main__':
    main()
    # ip = '192.168.79.136'
    # ip = '220.132.209.236'
    # ip = '102.218.195.226'
    # ip = '14.141.209.187'
    # host = '67.240.210.178'
    # ip = '206.125.43.40'
    # key = 'public'
    # key = 'private'
    # key = 'all'
    # key = 'location'
    # key = 'contact'
    # port = 161
    # crack = snmp_connect(ip, key, port)
    # print(crack)
    # keys, bind_list = SnmpDetect(host).normal_connect()
    # print(keys)
    # print(bind_list)

 

上一篇:net-snmp私有mib动态加载到snmpd


下一篇:【网络硬件】3.网络管理 - SNMP