转载自:
国庆节,企查查我来啦~_user_from_future的博客-CSDN博客强迫症的我凑个国庆节注册了账号,直接这么发好似不太好。受到某位女生的工作需求,加上重色轻友的心,所以先拿企查查开刀吧。首先企查查这个网站不登陆也能查公司,不过坑人的就是只能查那么几次,然后就必须要登录了。我想想为了那几次不值得,就搞个登录的爬虫程序吧。众所周知,登录最重要的参数是Cookie,这个一般在浏览器的XHR(XMLHttpRequest对象/Ajax对象等)里复制任意一个元素的Cookie就可以了,建议使用CV大法复制,右击Copy value可能会复制到中文,在此先献上不知道从哪搜到的读https://blog.csdn.net/user_from_future/article/details/120576842我在此将两个文件组合成了一个文件,并对获取cookie部分附写了一些不知道算不算正确的注释,下面是我整理的代码:
# _*_ coding:utf-8 _*_
# FileName: get_qcc_company.py
# IDE: PyCharm
# 菜菜代码,永无BUG!
# https://www.qcc.com/
import sqlite3
import urllib3
import os
import json
import sys
import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import browsercookie
from urllib import parse
from bs4 import BeautifulSoup
import json
import time
import requests
from random import uniform
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 取消HTTPS安全警告
def dpapi_decrypt(encrypted):
import ctypes
import ctypes.wintypes
class DATA_BLOB(ctypes.Structure): # ctypes结构体通用格式化输出
_fields_ = [('cbData', ctypes.wintypes.DWORD), # 定义double word(4字节)大小的值
('pbData', ctypes.POINTER(ctypes.c_char))] # 定义内存指针
p = ctypes.create_string_buffer(encrypted, len(encrypted)) # 生成C类型字符串组
blobin = DATA_BLOB(ctypes.sizeof(p), p) # 一个指向DATA_BLOB结构体的指针
blobout = DATA_BLOB() # 一个指向解密后的数据的DATA_BLOB
# 中间五个参数:描述该加密数据的信息、一个指向含有密钥DATA_BLOB的指针、保留参数、不需要弹出风险提升提示设置为None、安全相关的标志
retval = ctypes.windll.crypt32.CryptUnprotectData(
ctypes.byref(blobin), None, None, None, None, 0, ctypes.byref(blobout))
if not retval:
raise ctypes.WinError()
result = ctypes.string_at(blobout.pbData, blobout.cbData) # 获取解密结果
ctypes.windll.kernel32.LocalFree(blobout.pbData) # 释放pbData指向的内存
return result
def aes_decrypt(encrypted_txt):
with open(os.path.join(os.environ['LOCALAPPDATA'],
r"Google\Chrome\User Data\Local State"), encoding='utf-8', mode="r") as f: # 读取本地状态
jsn = json.loads(str(f.readline())) # 读取为json类型
encoded_key = jsn["os_crypt"]["encrypted_key"] # 获取加密键值
encrypted_key = base64.b64decode(encoded_key.encode()) # 解密关键键值
encrypted_key = encrypted_key[5:] # 获取关键键值的关键部位
key = dpapi_decrypt(encrypted_key) # 解密关键键值
nonce = encrypted_txt[3:15] # 获取关键键值的关键部位
cipher = Cipher(algorithms.AES(key), None, backend=default_backend()) # 创建一个空的AES加密对象
cipher.mode = modes.GCM(nonce) # 采用GCM加密模式,初始化向量采用关键值的关键部位
decryptor = cipher.decryptor() # 解密AES
return decryptor.update(encrypted_txt[15:]) # 更新解密对象
def chrome_decrypt(encrypted_txt):
if sys.platform == 'win32': # 判断系统为Windows
try:
# 依据字符串开头判断解密方案
if encrypted_txt[:4] == b'x01x00x00x00':
decrypted_txt = dpapi_decrypt(encrypted_txt) # 采用dpapi解密
return decrypted_txt.decode()
elif encrypted_txt[:3] == b'v10':
decrypted_txt = aes_decrypt(encrypted_txt) # 采用aes解密
return decrypted_txt[:-16].decode()
except WindowsError:
return None
else:
raise WindowsError
def get_cookies_from_chrome(domain):
sql = f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{domain}%"' # 获取cookie的sql语句
filename = os.path.join(os.environ['USERPROFILE'], r'AppData\Local\Google\Chrome\User Data\default\Cookies') # 本地cookies文件路径拼接
con = sqlite3.connect(filename) # 使用sqlite3连接cookies数据库
con.row_factory = sqlite3.Row # 需要允许其他人写权限
cur = con.cursor() # 获取游标
cur.execute(sql) # 执行sql语句
cookie = '' # 初始化cookie
for row in cur:
if row['value'] is not None:
name = row['name'] # cookie的键
value = chrome_decrypt(row['value']) # cookie的值
if value is not None:
cookie += name + '=' + value + ';' # 拼接cookie
return cookie
str_time = lambda _: _ == 253392422400 and "9999-09-09" or _ and time.strftime("%Y-%m-%d", time.localtime(_)) or "无固定期限" # 格式化日期
# 格式化网页访问参数
def parse_parameters(string: str):
parameters = {}
string = string.strip().replace(' ', '')
if ':' not in string and '&' in string:
for _ in string.split('&'):
try:
parameters[_.split('=')[0]] = _.split('=')[1]
except IndexError:
parameters[_.split('=')[0]] = ''
else:
for _ in string.split('\n'):
_ = _.strip()
try:
parameters[_.split(':')[0]] = _.split(':')[1]
except IndexError:
parameters[_.split(':')[0]] = ''
return parameters
# 格式化cookies值
def parse_cookies(cookie_value: str):
cookies_dict = {}
for c in cookie_value.replace(' ', '').split(';'):
try:
cookies_dict[c.split('=')[0]] = c.split('=')[1]
except IndexError:
cookies_dict[c.split('=')[0]] = ''
return cookies_dict
# json格式化
def dump_json(text: (str, list, tuple, dict)):
return json.dumps(text, ensure_ascii=False, indent=4)
# 随机休眠,防止过快的爬取
def random_sleep(a=1, b=2):
sleep_time = uniform(a, b)
time.sleep(sleep_time)
doMain = 'qcc.com' # 企查查域名
search_url = "https://www." + doMain + "/web/search" + "?" # 企查查搜索根网址
headers = {
"referer": "https://www.qcc.com/",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
}
input_cookie = '' # 可选手动输入cookie值
cookies = input_cookie or get_cookies_from_chrome(doMain) # 无手动输入则从cookie文件中读取
while not (parse_cookies(cookies).get("QCCSESSID", None) and parse_cookies(cookies).get("qcc_did", None)): # 关键参数检测
input('请在浏览器登录企查查!')
# 获取公司数据
def get_company(company_name):
parameters = f"""
key: {company_name}
"""
parameters = parse_parameters(parameters)
r = requests.get(search_url + parse.urlencode(parameters), headers=headers, cookies={"cookie": cookies})
if r.ok:
soup = BeautifulSoup(r.text, "html.parser")
table = soup.find("table", attrs={"class": "ntable ntable-list"})
if table is None:
return f"未搜寻到公司 “{company_name}” !"
for tr in table.find_all("tr"):
info = tr.find_all("td")[2].find("div")
if info.find("a").find("span") is None:
continue
name_ = info.find("a").find("span").text.replace('(', '(').replace(')', ')')
url = info.find("a")["href"]
no_kh_things = name_.replace(name_[name_.find('('): name_.rfind(')') + 1], '')
no_kh = name_.replace('(', '').replace(')', '')
if company_name != no_kh_things and company_name != no_kh:
continue
r = requests.get(url, headers=headers, cookies={"cookie": cookies})
if r.ok:
r.encoding = 'utf-8'
soup = BeautifulSoup(r.text, "html.parser")
script = soup.find_all('script')
for s in script:
if 'window.__INITIAL_STATE__' in s.text:
script = s.text
break
else:
return '请清除谷歌浏览器缓存,并重新登录企查查重新执行程序!如果多次出现此提示,请手动复制任意XHR的cookie值赋予到cookie变量!'
detail = json.loads(script[script.find('{'): script.rfind('};') + 1])["company"]["companyDetail"]
return {
"企业名称": detail["Name"],
"信息更新时间": str_time(detail["UpdatedDate"]),
"法定代表人": detail["Oper"]["Name"],
"登记状态": detail["Status"],
"统一社会信用代码": detail["CreditCode"],
"工商注册号": detail["No"],
"组织机构代码": detail["OrgNo"],
"纳税人识别号": detail["TaxNo"],
"纳税人资质": detail.get("TaxpayerType", ''),
"注册资本": detail["RegistCapi"],
"实缴资本": detail["RecCap"],
"登记机关": detail["BelongOrg"],
"成立日期": str_time(detail["TermStart"]),
"核准日期": str_time(detail["CheckDate"]),
"营业期限": str_time(detail["TermStart"]) + "至" + str_time(detail["TeamEnd"]),
"注册地址": detail["Address"],
"宗旨和业务范围": detail["Scope"],
"企业类型": detail["EconKind"],
"所属行业": detail["Industry"]["SubIndustry"],
"所属地区": detail["Area"]["Province"],
"人员规模": detail["profile"]["Info"],
"参保人数": [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"] and [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"][0] or '',
"英文名": detail["EnglishName"],
"曾用名": detail["OriginalName"] and [_["Name"] for _ in detail["OriginalName"]] or []
}
return f"获取公司 “{name_}” 详情信息失败!"
return f"未搜寻到公司 “{company_name}” !"
return "搜索失败!"
if __name__ == '__main__':
print(dump_json(get_company('浙江阿瓦隆科技有限公司')))