import argparse import json import string import threading import time from decimal import Decimal from pysnmp.entity.rfc3413.oneliner import cmdgen class SnmpDetect: OID_url = 'https://blog.csdn.net/qq_28657577/article/details/82834442' def __init__(self, host, port, key_file='snmp.json', timeout=0): """ :param host: 地址(eg:192.168.100.115) :param port: 端口 :param OID: 传送的OID,个人认为MIB值 :param key: 表示社区名 :param new_key: 最终正确 :param bind_list: 扫描的结果 :param key_list: 常用社区名 :param max_key: 社区名最大位数 :param max_threads_number: 最大线程数 :param max_threads: 限制线程的最大数量 """ self.host = host self.port = port self.timeout = timeout self.OID = '.1.3.6.1.2.1.1.1.0' # self.key = 'public' self.new_key = None self.bind_list = [] self.key_file = key_file self.key_list = self._get_key_list() self.max_key = 10 self.max_threads_number = 1000 self.max_threads = threading.Semaphore(self.max_threads_number) # 限制线程的最大数量为4个 def _get_key_list(self): # 默认为public # key_list = ['public', 'privicy', 'Cisco', 'H3C'] with open(self.key_file, 'r', encoding='utf-8') as r: key_list = json.loads(r.read()) return key_list def __get_key(self, charsetString, length, key_list=None): if key_list is None: key_list = [] if length == 0: return key_list new_key_list = [] if len(key_list) == 0: new_key_list = [c for c in charsetString] else: for i in key_list: for j in charsetString: d = i + j new_key_list.append(d) length -= 1 return self.__get_key(charsetString, length, new_key_list) def get_key(self): # charsetString = string.digits + string.ascii_letters + string.punctuation charsetString = string.digits + string.ascii_letters length = 1 lis = [] for i in range(1, self.max_key + 1): key_list = self.__get_key(charsetString, i) lis = lis + key_list return lis def __connect(self, key): crack = False bind_list = [] try: errorIndication, errorStatus, errorIndex, varBinds = cmdgen.CommandGenerator().getCmd( # 0代表v1,1代表v2c cmdgen.CommunityData('my-agent', key, 0), # 社区信息,my-agent ,public 表示社区名,1表示snmp v2c版本,0为v1版本 cmdgen.UdpTransportTarget((self.host, self.port)), # 这是传输的通道,传输到IP 192.168.70.237, 端口 161上(snmp标准默认161 UDP端口) self.OID, # 传送的OID,个人认为MIB值 ) for bind in varBinds: d = str(bind).split(' = ')[-1] bind_list.append(d) if varBinds: crack = True self.new_key = key self.bind_list = bind_list except: pass time.sleep(self.timeout) return crack def connect(self, key, thread): if thread is True: with self.max_threads: crack = self.__connect(key) else: crack = self.__connect(key) return crack def blow_connect(self): key_list = self.get_key() threads = [] for key in key_list: thread = threading.Thread(target=self.connect, args=(key, True,)) threads.append(thread) for thread in threads: if self.new_key: return self.new_key, self.bind_list else: thread.start() for thread in threads: if self.new_key: return self.new_key, self.bind_list else: thread.join() return self.new_key, self.bind_list def normal_connect(self): for key in self.key_list: print(key) crack = self.connect(key, False) if crack: return key, self.bind_list else: return self.blow_connect() def opt(): parser = argparse.ArgumentParser(description='命令行中传入命令') parser.add_argument('-host', required=True, help='<host(主机)>') parser.add_argument('-port', type=int, help='<port(端口)>', default=161) parser.add_argument('-rule', help='<社区名字典文件>', default='snmp.json') parser.add_argument('-timeout', type=int, help='<间隔时间>', default=0) return parser.parse_args() def get_host(host): host_list = [] if '-' in host: h = host.split('-') left_host = h[0] left_host_list = left_host.split('.') right_host = h[1] right_host_list = right_host.split('.') for i in range(int(left_host_list[-1]), int(right_host_list[-1]) + 1): d = left_host_list[:3] + [str(i)] host_list.append('.'.join(d)) elif '*' in host: rule = { '0': {'m': 1, 'n': 255 + 1}, '1': {'m': 0, 'n': 99 + 1}, '2': {'m': 0, 'n': 9 + 1}, } h = host.split('.') r = h[-1].split('*')[0] for i in range(rule['{}'.format(len(r))]['m'], rule['{}'.format(len(r))]['n']): d = h[:3] + ['{}{}'.format(r, i)] host_list.append('.'.join(d)) else: host_list.append(host) return host_list def get_model(): model = { 'router': ['RB750Gr3', 'RB941-2nD', 'CCR1009-8G-1S'], 'linux': ['R6100'], } def main(flag=False, **kwargs): start_time = Decimal(time.time()).quantize(Decimal("0.00")) if flag: host = kwargs.get('host') port = kwargs.get('port') rule = kwargs.get('rule') timeout = kwargs.get('timeout') if not timeout: timeout = 0 host_list = get_host(host) else: args = opt() host_list = get_host(args.host) port = args.port rule = args.rule timeout = args.timeout text = [] for host in host_list: key, bind_list = SnmpDetect(host, port, key_file=rule, timeout=timeout).normal_connect() d = { 'host': host, 'port': port, 'key': key, 'bind_list': bind_list, } text.append(d) end_time = Decimal(time.time()).quantize(Decimal("0.00")) print('结果为:\n{},\n用时:{}s'.format(text, end_time - start_time)) return {'result': text, 'hold_time': '{}s'.format(end_time - start_time)} if __name__ == '__main__': main() # ip = '192.168.79.136' # ip = '220.132.209.236' # ip = '102.218.195.226' # ip = '14.141.209.187' # host = '67.240.210.178' # ip = '206.125.43.40' # key = 'public' # key = 'private' # key = 'all' # key = 'location' # key = 'contact' # port = 161 # crack = snmp_connect(ip, key, port) # print(crack) # keys, bind_list = SnmpDetect(host).normal_connect() # print(keys) # print(bind_list)