#!/usr/local/bin/python3.8 # -*- coding:utf-8 -*- # Author: 上善 # Time: 2019-10-25 13:37 # import os import sys import re import time import hashlib import requests import datetime import subprocess import dns.resolver BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) class box (object): """运维工具""" def check_ip(self): iplist = open("/Users/sean/Desktop/Module/cdnetwork/tools_box/module/node.list", encoding="utf-8") ip_list = iplist.readlines() for ip in ip_list: res = os.popen('curl -s http://freeapi.ipip.net/' + ip).read() ip_address = ip.replace("\n", "") + " " + "" + (res[1:-1]).replace("\"", "").replace(",", " ") print(ip_address) def beian(self): ''' 备案查询 :return: ''' while True: try: domain = input("请输入二级域名(eg baidu.com 按q or Q退出) :") if re.match('(([\\w-]{1,62})?(\\.[\\w-]{1,62})+)', domain): # 新网api接口 http_uri = "http://icp.fleacloud.com/api/v1/icp?domain=" http_url = "http://icp.fleacloud.com/api/v1/icp?domain=" + domain gmt_format = '%a, %d %b %Y %H:%M:%S GMT' date_gmt = datetime.datetime.utcnow().strftime(gmt_format) headers = { 'Date': date_gmt, 'Accept': 'application/json', 'Content-type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleW ' 'ebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36' } res = requests.get(http_url, headers=headers) if res.json()['status'] == 3: print('域名 : ' + domain + " 备案号 : " + res.json()['icp']) elif res.json()['status'] == 2: print('域名 : ' + domain + " 备案号 : " + res.json()['icp']) else: print('查询失败或未知') elif domain.upper() == 'Q': break else: print('域名输入有误;') except AttributeError: sys.exit('系统错误') def oscp(self): ''' OCSP Stapling 测试 :return: ''' with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/domain_list', 'r') as domain_list: for domain_list in domain_list.readlines(): (ip, domain) = domain_list.strip("\n").split() print(domain + ":\n" + subprocess.Popen(('openssl s_client -connect ' + domain + ':443 -servername ' + domain + ' -status -tlsextdebug < /dev/null 2>&1 | grep -i ' '"OCSP response"'), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.read().decode()) def datetime(self): ''' 日期和md5加密 md5加密 :return: ''' date_time = datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S') print(date_time) print(subprocess.Popen(('echo'+date_time+'|md5'), shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE). stdout.read().decode()) def openssl(self): ''' 证书校验 :return: ''' taps = ''' domain_cname: images-cn.ssl-images-amazon.com.wtxcdn.com domain_pad: images-cn.ssl-images-amazon.com ''' print(taps) c_domain = input('domain_cname: ') s_domain = input('domain_pad: ') print(os.popen('openssl s_client -connect c_domain:443 ' '-servername s_domain< /dev/null 2> /dev/null | openssl x509 -text | grep -C2 Altern')) def ab_test(self): ''' apachc ab 压力测试 :return: ''' url = input('Http_Url: ') proxy = input('Proxy: ') port = input('Port: ') print('ab -X ' + proxy + ':' + port + ' -n 100 -c 50 ' + url) if (subprocess.Popen(('ab -X ' + proxy + ':' + port + ' -n 100 -c 50 ' + url), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.read().decode()): print('命令执行完成100链接10并发') def timestamp_datetime(self, value): ''' 时间戳转时间 :param value: :return: ''' format = '%Y-%m-%d %H:%M:%S' value = time.localtime(value) dt = time.strftime(format, value) return dt def datetime_timestamp(self, dt): ''' 时间转时间戳 :param dt: :return: ''' # dt为字符串 # 中间过程,一般都需要将字符串转化为时间数组 time.strptime(dt, '%Y-%m-%d %H:%M:%S') # 10位 (秒) s = int(time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S'))) # 13位(毫秒) # s = (time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S')))*1000 print('unix时间戳(10位秒级):' + str(s)) return int(s) def md5sum_encryption(self, unix_time, uri, key, encryption): ''' 秘文组合方式需要在变量上修改 :return: ''' wstime = str(600) if encryption == 'key+unixtime+wstime': uri_encryption = bytes((key + str(unix_time) + wstime).encode('utf-8')) md5encryption = hashlib.md5(uri_encryption).hexdigest() print('加密方式方式:' + encryption) print('md5: ' + md5encryption) elif encryption == 'uri+key+unixtime': uri_encryption = bytes((uri + key + str(unix_time)).encode('utf-8')) md5encryption = hashlib.md5(uri_encryption).hexdigest() print('加密方式方式:' + encryption) print('md5: ' + md5encryption) elif encryption == 'key+time+uri': uri_encryption = bytes((key + str(unix_time)+ uri).encode('utf-8')) md5encryption = hashlib.md5(uri_encryption).hexdigest() print('加密方式方式:' + encryption) print('md5: ' + md5encryption) elif encryption == 'time+key+uri': uri_encryption = bytes((str(unix_time) +key+ uri +str(3600)).encode('utf-8')) md5encryption = hashlib.md5(uri_encryption).hexdigest() print(unix_time) print('加密方式方式:' + encryption) print('md5: ' + md5encryption) else: print('暂不支持的加密方式,请检查或修改python') def unix_timestamp_create(self): ''' 变量获取并用md5 加密 :return: ''' if os.path.exists('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information'): unix_time = box.datetime_timestamp(self, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information', "r") as f: for uri_line in f.readlines(): uri_information = uri_line (uri, key, encryption) = uri_information.strip('\n').split() box.md5sum_encryption(self,unix_time, uri, key, encryption) else: with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information', "w+") as f: f.write('origin/111 SEd0kljv5M key+unixtime+wstime ') sys.exit('uri_information文件不存在,将在当前目录生成,请退出后') def http_requests(self): ''' request 测试 :return: ''' print(datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')) s = requests.session() s.keep_alive = False # 关闭多余连接 with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/url', 'r') as f: for i in f.readlines(): (http_url, proxy) = i.strip().split() print(http_url) proxies = { 'http': proxy + ':80', 'https': proxy + ':443' } headers = { 'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'), 'Accept': 'application/json', 'Content-type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) ' 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36', 'cdn-src-ip': proxy } res = requests.get(http_url, headers=headers, proxies=proxies, verify=False) headers_post = dict(res.headers) tmp_str = "statusCode:{}\nDate:{}\nContent-Length:{}\nConnection:{}\nCache-Control:{}\nVia:{}\nX-px:{}\n".format( res.status_code, headers_post.get('Date'), headers_post.get('Content-Length'), headers_post.get('Connection'), headers_post.get('Cache-Control'), headers_post.get('Via'), headers_post.get('X-Px'), ) print(tmp_str) def hosts(self): domain = input("Please input an domain: ").strip() # 输入一个域名 try: A = dns.resolver.query(domain, 'A') # 指定查看类型为A记录 except Exception as e: print(e) else: for i in A.response.answer: # 通过response.answer方法获取查询回应信息 for j in i.items: # 遍历回应信息 hosts = (j.address + " " + domain) os.system("echo "+hosts+"|sed 's/.whecloud.com//g' | sed 's/.wtxcdn.com//g'>> /etc/hosts") os.system('cat /etc/hosts')