自动封ssh爆破
import re
import pandas as pd
import subprocess
def split_lines(lines):
"""分割行内容函数"""
return re.split(r'\s+',lines)
def data_structuring(read_file):
"""数据结构化函数"""
IP_last_table = []
for line in read_file:
split_line = split_lines(line)
if len(split_line) ==11 :
IP_last_table.append(split_line[:-1])
IP_filter_table = pd.DataFrame(IP_last_table)
return IP_filter_table
def my_renames(IP_filter_table):
"""数据帧重命名函数"""
columns_renames = {
0:"用户名",
1:"是否ssh连接成功",
2:"登陆的IP地址",
3:"星期",
4:"月份",
5:"几号",
6:"连接时间",
7:"分割符",
8:"断开时间",
9:"连接持续时间"
}
IP_filter_table.rename(columns=columns_renames,inplace=True)
return IP_filter_table
def get_blacklisting(IP_filter_table,whilelisting=[]):
blacklisting = set(IP_filter_table["登陆的IP地址"])
# print(len(blacklisting))
if len(whilelisting) > 0:
for while_ip in whilelisting:
blacklisting.discard(while_ip)
return blacklisting
def block_ip(ip,password):
block_ip_cmd = f"echo %s | sudo -S iptables -I INPUT -s {ip} -j DROP" % f'{password}'
try:
res = subprocess.getoutput(block_ip_cmd)
print(f"ip {ip} is already blocked {res}")
except Exception as e:
print(f"ip {ip} has been blocked ,the error is :\n {e}")
return e
def unblock_ip(ip,password):
block_ip_cmd = f"echo %s | sudo -S iptables -D INPUT -s {ip} -j DROP" % f'{password}'
try:
res = subprocess.getoutput(block_ip_cmd)
print(f"ip {ip} is already unblocked {res}")
except Exception as e:
print(f"ip {ip} has been unblocked ,the error is :\n {e}")
return e
def block_ip_logic(lastb_file_path = "IP_lastb_log.log"
,whilelisting=[]
,password='kali'):
read_file = open(lastb_file_path).readlines()
IP_filter_table = data_structuring(read_file)
IP_filter_table = my_renames(IP_filter_table)
black_listings = get_blacklisting(IP_filter_table,whilelisting)
t = 0
for black_listing in black_listings:
try:
block_ip(black_listing,password)
t += 1
except Exception as e:
print(f"ERROR : {e}")
print(f"A tolat of {t} IPs have been banned this time")
def unblock_ip_logic(lastb_file_path = "IP_lastb_log.log"
,blacklisting=[]
,password='kali'):
read_file = open(lastb_file_path).readlines()
IP_filter_table = data_structuring(read_file)
IP_filter_table = my_renames(IP_filter_table)
black_listings = get_blacklisting(IP_filter_table,blacklisting)
t = 0
for black_listing in black_listings:
try:
unblock_ip(black_listing,password)
t += 1
except Exception as e:
print(f"ERROR : {e}")
print(f"A tolat of {t} IPs have been unbanned this time")
if __name__ == '__main__':
#block_ip_logic(lastb_file_path = "IP_lastb_log.log",whilelisting=[],password='kali')
unblock_ip_logic(lastb_file_path = "IP_lastb_log.log",blacklisting=[],password='kali')
# read_file = open("IP_lastb_log.log").readlines()
# IP_filter_table = data_structuring(read_file)
# IP_filter_table = my_renames(IP_filter_table)
# # print(IP_filter_table)
# black_listings = get_blacklisting(IP_filter_table,whilelisting=['221.226.183.94'])
# print(black_listings)
# block_ip('95.58.255.251','kali')
# unblock_ip('95.58.255.251','kali')