Python的一个BOX

#!/usr/local/bin/python3.8
# -*- coding:utf-8 -*-
# Author: 上善
# Time: 2019-10-25 13:37  
#
import os
import sys
import re
import time
import hashlib
import requests
import datetime
import subprocess
import dns.resolver

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)


class box (object):
    """运维工具"""

    def check_ip(self):
        iplist = open("/Users/sean/Desktop/Module/cdnetwork/tools_box/module/node.list", encoding="utf-8")
        ip_list = iplist.readlines()
        for ip in ip_list:
            res = os.popen('curl -s  http://freeapi.ipip.net/' + ip).read()
            ip_address = ip.replace("\n", "") + " " + "" + (res[1:-1]).replace("\"", "").replace(",", " ")
            print(ip_address)

    def beian(self):
        '''
        备案查询
        :return:
        '''
        while True:
            try:
                domain = input("请输入二级域名(eg baidu.com 按q or Q退出) :")
                if re.match('(([\\w-]{1,62})?(\\.[\\w-]{1,62})+)', domain):
                    # 新网api接口
                    http_uri = "http://icp.fleacloud.com/api/v1/icp?domain="
                    http_url = "http://icp.fleacloud.com/api/v1/icp?domain=" + domain
                    gmt_format = '%a, %d %b %Y %H:%M:%S GMT'
                    date_gmt = datetime.datetime.utcnow().strftime(gmt_format)
                    headers = {
                        'Date': date_gmt,
                        'Accept': 'application/json',
                        'Content-type': 'application/json',
                        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleW '
                                      'ebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'
                    }
                    res = requests.get(http_url, headers=headers)
                    if res.json()['status'] == 3:
                        print('域名 : ' + domain + " 备案号 : " + res.json()['icp'])
                    elif res.json()['status'] == 2:
                        print('域名 : ' + domain + " 备案号 : " + res.json()['icp'])
                    else:
                        print('查询失败或未知')
                elif domain.upper() == 'Q':
                     break

                else:
                    print('域名输入有误;')
            except AttributeError:
                sys.exit('系统错误')

    def oscp(self):
        '''
        OCSP Stapling 测试
        :return:
        '''
        with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/domain_list', 'r') as domain_list:
            for domain_list in domain_list.readlines():
                (ip, domain) = domain_list.strip("\n").split()
                print(domain + ":\n" + subprocess.Popen(('openssl s_client -connect ' + domain + ':443 -servername '
                                                         + domain + ' -status -tlsextdebug < /dev/null 2>&1 | grep -i '
                                                                    '"OCSP response"'),
                                                        shell=True, stdout=subprocess.PIPE,
                                                        stderr=subprocess.PIPE).stdout.read().decode())
    def datetime(self):
        '''
        日期和md5加密
        md5加密
        :return:
        '''
        date_time = datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')
        print(date_time)
        print(subprocess.Popen(('echo'+date_time+'|md5'), shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).
              stdout.read().decode())


    def openssl(self):
        '''
        证书校验
        :return:
        '''
        taps = '''
        domain_cname: images-cn.ssl-images-amazon.com.wtxcdn.com
        domain_pad:   images-cn.ssl-images-amazon.com
        '''
        print(taps)
        c_domain = input('domain_cname: ')
        s_domain = input('domain_pad: ')

        print(os.popen('openssl s_client -connect c_domain:443 '
                       '-servername s_domain< /dev/null 2> /dev/null | openssl x509 -text | grep -C2 Altern'))

    def ab_test(self):
        '''
        apachc ab 压力测试
        :return:
        '''
        url = input('Http_Url: ')
        proxy = input('Proxy: ')
        port = input('Port: ')
        print('ab -X ' + proxy + ':' + port + ' -n 100 -c 50 ' + url)
        if (subprocess.Popen(('ab -X ' + proxy + ':' + port + ' -n 100 -c 50 ' + url), shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE).stdout.read().decode()):
            print('命令执行完成100链接10并发')

    def timestamp_datetime(self, value):
        '''
        时间戳转时间
        :param value:
        :return:
        '''
        format = '%Y-%m-%d %H:%M:%S'
        value = time.localtime(value)
        dt = time.strftime(format, value)
        return dt

    def datetime_timestamp(self, dt):
        '''
        时间转时间戳
        :param dt:
        :return:
        '''
        # dt为字符串
        # 中间过程,一般都需要将字符串转化为时间数组
        time.strptime(dt, '%Y-%m-%d %H:%M:%S')
        # 10位 (秒)
        s = int(time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S')))
        # 13位(毫秒)
        # s = (time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S')))*1000
        print('unix时间戳(10位秒级):' + str(s))
        return int(s)

    def md5sum_encryption(self, unix_time, uri, key, encryption):
        '''
        秘文组合方式需要在变量上修改
        :return:
        '''
        wstime = str(600)
        if encryption == 'key+unixtime+wstime':
            uri_encryption = bytes((key + str(unix_time) + wstime).encode('utf-8'))
            md5encryption = hashlib.md5(uri_encryption).hexdigest()
            print('加密方式方式:' + encryption)
            print('md5: ' + md5encryption)
        elif encryption == 'uri+key+unixtime':
            uri_encryption = bytes((uri + key + str(unix_time)).encode('utf-8'))
            md5encryption = hashlib.md5(uri_encryption).hexdigest()
            print('加密方式方式:' + encryption)
            print('md5: ' + md5encryption)
        elif encryption == 'key+time+uri':
            uri_encryption = bytes((key + str(unix_time)+ uri).encode('utf-8'))
            md5encryption = hashlib.md5(uri_encryption).hexdigest()
            print('加密方式方式:' + encryption)
            print('md5: ' + md5encryption)
        elif encryption == 'time+key+uri':
            uri_encryption = bytes((str(unix_time) +key+ uri +str(3600)).encode('utf-8'))
            md5encryption = hashlib.md5(uri_encryption).hexdigest()
            print(unix_time)
            print('加密方式方式:' + encryption)
            print('md5: ' + md5encryption)

        else:
            print('暂不支持的加密方式,请检查或修改python')

    def unix_timestamp_create(self):
        '''
        变量获取并用md5 加密
        :return:
        '''
        if os.path.exists('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information'):
            unix_time = box.datetime_timestamp(self, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
            with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information', "r") as f:
                for uri_line in f.readlines():
                    uri_information = uri_line
                    (uri, key, encryption) = uri_information.strip('\n').split()
                    box.md5sum_encryption(self,unix_time, uri, key, encryption)
        else:
            with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/uri_information', "w+") as f:
                f.write('origin/111  SEd0kljv5M key+unixtime+wstime ')
                sys.exit('uri_information文件不存在,将在当前目录生成,请退出后')

    def http_requests(self):
         '''
         request 测试
         :return:
         '''

         print(datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
         s = requests.session()
         s.keep_alive = False  # 关闭多余连接

         with open('/Users/sean/Desktop/Module/cdnetwork/tools_box/module/url', 'r') as f:
             for i in f.readlines():
                 (http_url, proxy) = i.strip().split()
                 print(http_url)

                 proxies = {
                     'http': proxy + ':80',
                     'https': proxy + ':443'
                 }

                 headers = {
                     'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'),
                     'Accept': 'application/json',
                     'Content-type': 'application/json',
                     'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) '
                                   'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36',
                     'cdn-src-ip': proxy
                 }

                 res = requests.get(http_url, headers=headers, proxies=proxies, verify=False)
                 headers_post = dict(res.headers)
                 tmp_str = "statusCode:{}\nDate:{}\nContent-Length:{}\nConnection:{}\nCache-Control:{}\nVia:{}\nX-px:{}\n".format(
                     res.status_code,
                     headers_post.get('Date'),
                     headers_post.get('Content-Length'),
                     headers_post.get('Connection'),
                     headers_post.get('Cache-Control'),
                     headers_post.get('Via'),
                     headers_post.get('X-Px'),

                 )
                 print(tmp_str)

    def hosts(self):
        domain = input("Please input an domain: ").strip()  # 输入一个域名
        try:
            A = dns.resolver.query(domain, 'A')  # 指定查看类型为A记录
        except Exception as e:
            print(e)
        else:
            for i in A.response.answer:  # 通过response.answer方法获取查询回应信息
                for j in i.items:  # 遍历回应信息
                    hosts = (j.address + " " + domain)
                    os.system("echo "+hosts+"|sed 's/.whecloud.com//g' | sed 's/.wtxcdn.com//g'>> /etc/hosts")
                    os.system('cat /etc/hosts')

  

上一篇:PHP-加密从iPhone到Web应用程序的请求?


下一篇:C .NET DLL与C#托管代码? (文件加密AES-128 XTS)