Python爬取企查查网页中潜在的信息字典

转载自: 

国庆节,企查查我来啦~_user_from_future的博客-CSDN博客强迫症的我凑个国庆节注册了账号,直接这么发好似不太好。受到某位女生的工作需求,加上重色轻友的心,所以先拿企查查开刀吧。首先企查查这个网站不登陆也能查公司,不过坑人的就是只能查那么几次,然后就必须要登录了。我想想为了那几次不值得,就搞个登录的爬虫程序吧。众所周知,登录最重要的参数是Cookie,这个一般在浏览器的XHR(XMLHttpRequest对象/Ajax对象等)里复制任意一个元素的Cookie就可以了,建议使用CV大法复制,右击Copy value可能会复制到中文,在此先献上不知道从哪搜到的读Python爬取企查查网页中潜在的信息字典https://blog.csdn.net/user_from_future/article/details/120576842我在此将两个文件组合成了一个文件,并对获取cookie部分附写了一些不知道算不算正确的注释,下面是我整理的代码:

# _*_ coding:utf-8 _*_
# FileName: get_qcc_company.py
# IDE: PyCharm
# 菜菜代码,永无BUG!

# https://www.qcc.com/

import sqlite3
import urllib3
import os
import json

import sys
import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

import browsercookie

from urllib import parse
from bs4 import BeautifulSoup

import json
import time
import requests
from random import uniform


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)  # 取消HTTPS安全警告


def dpapi_decrypt(encrypted):
    import ctypes
    import ctypes.wintypes

    class DATA_BLOB(ctypes.Structure):  # ctypes结构体通用格式化输出
        _fields_ = [('cbData', ctypes.wintypes.DWORD),  # 定义double word(4字节)大小的值
                    ('pbData', ctypes.POINTER(ctypes.c_char))]  # 定义内存指针

    p = ctypes.create_string_buffer(encrypted, len(encrypted))  # 生成C类型字符串组
    blobin = DATA_BLOB(ctypes.sizeof(p), p)  # 一个指向DATA_BLOB结构体的指针
    blobout = DATA_BLOB()  # 一个指向解密后的数据的DATA_BLOB
    # 中间五个参数:描述该加密数据的信息、一个指向含有密钥DATA_BLOB的指针、保留参数、不需要弹出风险提升提示设置为None、安全相关的标志
    retval = ctypes.windll.crypt32.CryptUnprotectData(
        ctypes.byref(blobin), None, None, None, None, 0, ctypes.byref(blobout))
    if not retval:
        raise ctypes.WinError()
    result = ctypes.string_at(blobout.pbData, blobout.cbData)  # 获取解密结果
    ctypes.windll.kernel32.LocalFree(blobout.pbData)  # 释放pbData指向的内存
    return result


def aes_decrypt(encrypted_txt):
    with open(os.path.join(os.environ['LOCALAPPDATA'],
                           r"Google\Chrome\User Data\Local State"), encoding='utf-8', mode="r") as f:  # 读取本地状态
        jsn = json.loads(str(f.readline()))  # 读取为json类型
    encoded_key = jsn["os_crypt"]["encrypted_key"]  # 获取加密键值
    encrypted_key = base64.b64decode(encoded_key.encode())  # 解密关键键值
    encrypted_key = encrypted_key[5:]  # 获取关键键值的关键部位
    key = dpapi_decrypt(encrypted_key)  # 解密关键键值
    nonce = encrypted_txt[3:15]  # 获取关键键值的关键部位
    cipher = Cipher(algorithms.AES(key), None, backend=default_backend())  # 创建一个空的AES加密对象
    cipher.mode = modes.GCM(nonce)  # 采用GCM加密模式,初始化向量采用关键值的关键部位
    decryptor = cipher.decryptor()  # 解密AES
    return decryptor.update(encrypted_txt[15:])  # 更新解密对象


def chrome_decrypt(encrypted_txt):
    if sys.platform == 'win32':  # 判断系统为Windows
        try:
            # 依据字符串开头判断解密方案
            if encrypted_txt[:4] == b'x01x00x00x00':
                decrypted_txt = dpapi_decrypt(encrypted_txt)  # 采用dpapi解密
                return decrypted_txt.decode()
            elif encrypted_txt[:3] == b'v10':
                decrypted_txt = aes_decrypt(encrypted_txt)  # 采用aes解密
                return decrypted_txt[:-16].decode()
        except WindowsError:
            return None
    else:
        raise WindowsError


def get_cookies_from_chrome(domain):
    sql = f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{domain}%"'  # 获取cookie的sql语句
    filename = os.path.join(os.environ['USERPROFILE'], r'AppData\Local\Google\Chrome\User Data\default\Cookies')  # 本地cookies文件路径拼接
    con = sqlite3.connect(filename)  # 使用sqlite3连接cookies数据库
    con.row_factory = sqlite3.Row  # 需要允许其他人写权限
    cur = con.cursor()  # 获取游标
    cur.execute(sql)  # 执行sql语句
    cookie = ''  # 初始化cookie
    for row in cur:
        if row['value'] is not None:
            name = row['name']  # cookie的键
            value = chrome_decrypt(row['value'])  # cookie的值
            if value is not None:
                cookie += name + '=' + value + ';'  # 拼接cookie
    return cookie


str_time = lambda _: _ == 253392422400 and "9999-09-09" or _ and time.strftime("%Y-%m-%d", time.localtime(_)) or "无固定期限"  # 格式化日期


# 格式化网页访问参数
def parse_parameters(string: str):
    parameters = {}
    string = string.strip().replace(' ', '')
    if ':' not in string and '&' in string:
        for _ in string.split('&'):
            try:
                parameters[_.split('=')[0]] = _.split('=')[1]
            except IndexError:
                parameters[_.split('=')[0]] = ''
    else:
        for _ in string.split('\n'):
            _ = _.strip()
            try:
                parameters[_.split(':')[0]] = _.split(':')[1]
            except IndexError:
                parameters[_.split(':')[0]] = ''
    return parameters


# 格式化cookies值
def parse_cookies(cookie_value: str):
    cookies_dict = {}
    for c in cookie_value.replace(' ', '').split(';'):
        try:
            cookies_dict[c.split('=')[0]] = c.split('=')[1]
        except IndexError:
            cookies_dict[c.split('=')[0]] = ''
    return cookies_dict


# json格式化
def dump_json(text: (str, list, tuple, dict)):
    return json.dumps(text, ensure_ascii=False, indent=4)


# 随机休眠,防止过快的爬取
def random_sleep(a=1, b=2):
    sleep_time = uniform(a, b)
    time.sleep(sleep_time)


doMain = 'qcc.com'  # 企查查域名
search_url = "https://www." + doMain + "/web/search" + "?"  # 企查查搜索根网址
headers = {
    "referer": "https://www.qcc.com/",
    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
}

input_cookie = ''  # 可选手动输入cookie值
cookies = input_cookie or get_cookies_from_chrome(doMain)  # 无手动输入则从cookie文件中读取
while not (parse_cookies(cookies).get("QCCSESSID", None) and parse_cookies(cookies).get("qcc_did", None)):  # 关键参数检测
    input('请在浏览器登录企查查!')


# 获取公司数据
def get_company(company_name):
    parameters = f"""
        key: {company_name}
    """
    parameters = parse_parameters(parameters)
    r = requests.get(search_url + parse.urlencode(parameters), headers=headers, cookies={"cookie": cookies})
    if r.ok:
        soup = BeautifulSoup(r.text, "html.parser")
        table = soup.find("table", attrs={"class": "ntable ntable-list"})
        if table is None:
            return f"未搜寻到公司 “{company_name}” !"
        for tr in table.find_all("tr"):
            info = tr.find_all("td")[2].find("div")
            if info.find("a").find("span") is None:
                continue
            name_ = info.find("a").find("span").text.replace('(', '(').replace(')', ')')
            url = info.find("a")["href"]
            no_kh_things = name_.replace(name_[name_.find('('): name_.rfind(')') + 1], '')
            no_kh = name_.replace('(', '').replace(')', '')
            if company_name != no_kh_things and company_name != no_kh:
                continue
            r = requests.get(url, headers=headers, cookies={"cookie": cookies})
            if r.ok:
                r.encoding = 'utf-8'
                soup = BeautifulSoup(r.text, "html.parser")
                script = soup.find_all('script')
                for s in script:
                    if 'window.__INITIAL_STATE__' in s.text:
                        script = s.text
                        break
                else:
                    return '请清除谷歌浏览器缓存,并重新登录企查查重新执行程序!如果多次出现此提示,请手动复制任意XHR的cookie值赋予到cookie变量!'
                detail = json.loads(script[script.find('{'): script.rfind('};') + 1])["company"]["companyDetail"]
                return {
                    "企业名称": detail["Name"],
                    "信息更新时间": str_time(detail["UpdatedDate"]),
                    "法定代表人": detail["Oper"]["Name"],
                    "登记状态": detail["Status"],
                    "统一社会信用代码": detail["CreditCode"],
                    "工商注册号": detail["No"],
                    "组织机构代码": detail["OrgNo"],
                    "纳税人识别号": detail["TaxNo"],
                    "纳税人资质": detail.get("TaxpayerType", ''),
                    "注册资本": detail["RegistCapi"],
                    "实缴资本": detail["RecCap"],
                    "登记机关": detail["BelongOrg"],
                    "成立日期": str_time(detail["TermStart"]),
                    "核准日期": str_time(detail["CheckDate"]),
                    "营业期限": str_time(detail["TermStart"]) + "至" + str_time(detail["TeamEnd"]),
                    "注册地址": detail["Address"],
                    "宗旨和业务范围": detail["Scope"],
                    "企业类型": detail["EconKind"],
                    "所属行业": detail["Industry"]["SubIndustry"],
                    "所属地区": detail["Area"]["Province"],
                    "人员规模": detail["profile"]["Info"],
                    "参保人数": [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"] and [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"][0] or '',
                    "英文名": detail["EnglishName"],
                    "曾用名": detail["OriginalName"] and [_["Name"] for _ in detail["OriginalName"]] or []
                }
            return f"获取公司 “{name_}” 详情信息失败!"
        return f"未搜寻到公司 “{company_name}” !"
    return "搜索失败!"


if __name__ == '__main__':
    print(dump_json(get_company('浙江阿瓦隆科技有限公司')))

上一篇:Aes加密


下一篇:更好的使用python的ZhihuAVPI库使用知乎