import json import os.path import queue import threading import time import pandas as pd import requests from lxml import etree import re from urllib import parse from retrying import retry """ 版本迭代: 新增从excel读取历史人物,单个数据修改, 如果家族成员没有h3 級別分類,那麽单个数据的家族人物修改为list 修改个人标签家族属性获取,例如子分类如果有多个,那么值是list,单个是str """ name_queue = queue.Queue() proxy = "127.0.0.1:1080" proxies = { 'http': 'http://' + proxy, 'https': 'https://' + proxy, } class WiKi: def __init__(self, name, dynasty): name = parse.quote(name) # name = parse.quote("刘邦") self.dynasty = dynasty self.start_url = "https://zh.wikipedia.org/wiki/" + name self.headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.7 Safari/537.36" } self.Person_info = {} @retry() def get_person_page(self): """页面请求""" response = requests.get(url=self.start_url, headers=self.headers, proxies=proxies) self.response = response.content.decode() def get_name(self): """抓取的首页的人物名字""" html = etree.HTML(self.response) name = html.xpath('//h1[@id="firstHeading"]/text()') self.name = name[0] # self.Person_info['姓名'] = self.name def get_label(self): """获取标签栏属性""" html = etree.HTML(self.response) label_div= html.xpath('//table[@class="infobox vcard"]/tbody/tr') if label_div == []: return label_dict = {} for label_tr in label_div: label_name = ''.join(label_tr.xpath('./th/text()')) label_value = ''.join(label_tr.xpath('./td//text()')).replace('\n','').replace('\xa0','') label_tr_td = label_tr.xpath('./td/table/tbody/tr') if label_tr_td: for tr in label_tr_td: result = etree.tostring(tr,pretty_print=True).decode('utf-8') result = re.sub("<a .*?>", '', result) result = re.sub("(</a>)", '', result) tr = etree.HTML(result) th_name = ''.join(tr.xpath('//th//text()')).replace('\n','') td_value = tr.xpath('//td//text()') # print(td_value) if len(td_value)<=1: td_value = ''.join(td_value).replace('\n','') else: td_value = td_value if th_name == '': continue # if td_value == '': # continue label_dict[th_name] = td_value if label_value == '': continue if label_name == '': continue label_dict[label_name] = label_value self.Person_info['详情'] = label_dict def get_person_relation(self): """获取人物简介""" try: result = re.search(r'(<div class="mw-parser-output".*?)<h2>', self.response, re.S).group(1) except: return html = etree.HTML(result) p_list = html.xpath('//p//text()') relation = ''.join(p_list) relation = re.sub("(\..*})", '', relation) rule = "可以指:" rule1 = "可指:" rule2 = "可能指下列人物:" if rule in relation or rule1 in relation or rule2 in relation or len(relation)< 15: return self.Person_info["简介"] = relation def get_h4_content(self, h4): h4_dict = {} for info in h4[1:]: info = "<h4>"+info html = etree.HTML(info) h4_title_1 = ''.join(html.xpath('//h4//text()')) h4_title = h4_title_1.replace('[编辑]', '') ul = html.xpath('//ul/li') if ul==[]: h4_content = ''.join(html.xpath('//text()')) h4_content = h4_content.replace(h4_title_1,'') h4_dict[h4_title] = h4_content else: li_list = [] for li in ul: li_content = ''.join(li.xpath('.//text()')) li_list.append(li_content) h4_dict[h4_title] = li_list return h4_dict def get_h3_content(self,h3): h3_dict = {} for info in h3[1:]: h3_content = '<h3>'+info h4_content = h3_content.split("<h4>") html = etree.HTML(h3_content) h3_title_1 = ''.join(html.xpath('//h3//text()')) h3_title = h3_title_1.replace("[编辑]", '') if len(h4_content)<2: ul = html.xpath('//ul/li') ol = html.xpath('//ol/li') if ul: li_list = [] for li in ul: li_content = ''.join(li.xpath('.//text()')) li_list.append(li_content) h3_dict[h3_title] = li_list elif ol: ol_list = [] for li in ol: li_content = ''.join(li.xpath('.//text()')) ol_list.append(li_content) h3_dict[h3_title] = ol_list else: h3_content = ''.join(html.xpath('//text()')) h3_content = h3_content.replace(h3_title_1,'') h3_dict[h3_title] = h3_content else: h4_dict = self.get_h4_content(h4_content) h3_dict[h3_title] = h4_dict return h3_dict def get_content(self): """ 获取生平详情 :return: """ # result = re.findall(r'(<h2>.*?)<h2>', self.response, re.S) try: result = self.response.split('<h2>')[1:-2] except: return for x in result: h2 = '<h2>'+x h3 = h2.split('<h3>') html = etree.HTML(h2) title = html.xpath('//h2//text()')[0] if len(h3) < 2: content = html.xpath('//text()') content = ''.join(content[1:]) content = re.sub("(\..*?})", '', content) content=content.replace('[编辑]','') content = re.sub('\xa0/', '', content) content = content.split('\n') content = list(set(content)) new_content = [] for cont in content: if cont == '': continue else: new_content.append(cont) self.Person_info[title] = new_content else: h3_dict = self.get_h3_content(h3) self.Person_info[title] = h3_dict def save_success(self): """ 保存已经收录的""" dir_path = './{}-json'.format(self.dynasty) if not os.path.exists(dir_path): os.makedirs(dir_path) path = dir_path+'/{}.json'.format(self.dynasty+"-" + self.name) is_file = os.path.isfile(path) if is_file: return print(self.Person_info) with open(path, 'a', encoding='utf-8')as f: f.write(json.dumps(self.Person_info, ensure_ascii=False) + '\n') def save_false(self): """保存未收录文件""" path = './未收录人物.txt' print(self.name) with open(path, 'a', encoding='utf8') as f: f.write(self.name + '\n') def run(self): self.get_person_page() # self.get_is_save() self.get_name() self.get_label() self.get_person_relation() self.get_content() if self.Person_info: self.save_success() else: self.save_false() def get_name(path): """ 初始获取名字以及朝代,从文件夹下的文本读 加入队列 :param path: :return: """ dynasty = path.split('\\')[-1].split('.')[0] with open(path, 'r', encoding='utf8') as f: file = f.read() name_list = file.split('\n') for name in name_list: try: name = name.split(',')[1] except: name = name.split(',')[0] name_dict = {} name_dict["dynasty"] = dynasty name_dict['name'] = name print(name_dict) name_queue.put(name_dict) def get_name_from_excel(path): """ 读excel 直接获取名字和朝代 :param path: :return: """ df = pd.read_excel(path) for value in df.values: name = value[1] dynasty = value[0] name_dict = {} name_dict["dynasty"] = dynasty name_dict['name'] = name print(name_dict) name_queue.put(name_dict) def get_name_two(path): with open(path, 'r', encoding='utf8')as f: name_list = f.read().split('\n') for info in name_list: try: dynasty=info.split('-')[0] name = info.split('-')[1] except: continue name_dict = {} name_dict["dynasty"] = dynasty name_dict['name'] = name print(name_dict) name_queue.put(name_dict) def main(): while True: if name_queue.empty(): break name_dict = name_queue.get() name = name_dict['name'] # name = name.split('-')[0] dynasty = name_dict['dynasty'] WiKi(name, dynasty).run() # break if __name__ == '__main__': # path = r"D:\New_code\WIKI\histpry_person" # for x in os.listdir(path): # new_path = os.path.join(path, x) # get_name(new_path) # path = r"D:\New_code\WIKI\二十四史人物.xlsx" # get_name_from_excel(path) path = r"D:\New_code\WIKI\1.txt" get_name_two(path) Threads = [] start_time = time.time() for _ in range(10): main_t = threading.Thread(target=main) Threads.append(main_t) for t in Threads: t.start() for t in Threads: t.join() end_time = time.time() use_time = end_time-start_time print("用时:" + str(use_time))
代理使用小飞机,proxies 访问本地
数据保存json
本地读取人物名称+朝代 古人信息采集
import json
import os.path
import queue
import threading
import time
import pandas as pd
import requests
from lxml import etree
import re
from urllib import parse
from retrying import retry
"""
版本迭代: 新增从excel读取历史人物,单个数据修改,
如果家族成员没有h3 級別分類,那麽单个数据的家族人物修改为list
修改个人标签家族属性获取,例如子分类如果有多个,那么值是list,单个是str
"""
name_queue = queue.Queue()
proxy = "127.0.0.1:1080"
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy,
}
class WiKi:
def __init__(self, name, dynasty):
name = parse.quote(name)
# name = parse.quote("刘邦")
self.dynasty = dynasty
self.start_url = "https://zh.wikipedia.org/wiki/" + name
self.headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.7 Safari/537.36"
}
self.Person_info = {}
@retry()
def get_person_page(self):
"""页面请求"""
response = requests.get(url=self.start_url, headers=self.headers, proxies=proxies)
self.response = response.content.decode()
def get_name(self):
"""抓取的首页的人物名字"""
html = etree.HTML(self.response)
name = html.xpath('//h1[@id="firstHeading"]/text()')
self.name = name[0]
# self.Person_info['姓名'] = self.name
def get_label(self):
"""获取标签栏属性"""
html = etree.HTML(self.response)
label_div= html.xpath('//table[@class="infobox vcard"]/tbody/tr')
if label_div == []:
return
label_dict = {}
for label_tr in label_div:
label_name = ''.join(label_tr.xpath('./th/text()'))
label_value = ''.join(label_tr.xpath('./td//text()')).replace('\n','').replace('\xa0','')
label_tr_td = label_tr.xpath('./td/table/tbody/tr')
if label_tr_td:
for tr in label_tr_td:
result = etree.tostring(tr,pretty_print=True).decode('utf-8')
result = re.sub("<a .*?>", '', result)
result = re.sub("(</a>)", '', result)
tr = etree.HTML(result)
th_name = ''.join(tr.xpath('//th//text()')).replace('\n','')
td_value = tr.xpath('//td//text()')
# print(td_value)
if len(td_value)<=1:
td_value = ''.join(td_value).replace('\n','')
else:
td_value = td_value
if th_name == '':
continue
# if td_value == '':
# continue
label_dict[th_name] = td_value
if label_value == '':
continue
if label_name == '':
continue
label_dict[label_name] = label_value
self.Person_info['详情'] = label_dict
def get_person_relation(self):
"""获取人物简介"""
try:
result = re.search(r'(<div class="mw-parser-output".*?)<h2>', self.response, re.S).group(1)
except:
return
html = etree.HTML(result)
p_list = html.xpath('//p//text()')
relation = ''.join(p_list)
relation = re.sub("(\..*})", '', relation)
rule = "可以指:"
rule1 = "可指:"
rule2 = "可能指下列人物:"
if rule in relation or rule1 in relation or rule2 in relation or len(relation)< 15:
return
self.Person_info["简介"] = relation
def get_h4_content(self, h4):
h4_dict = {}
for info in h4[1:]:
info = "<h4>"+info
html = etree.HTML(info)
h4_title_1 = ''.join(html.xpath('//h4//text()'))
h4_title = h4_title_1.replace('[编辑]', '')
ul = html.xpath('//ul/li')
if ul==[]:
h4_content = ''.join(html.xpath('//text()'))
h4_content = h4_content.replace(h4_title_1,'')
h4_dict[h4_title] = h4_content
else:
li_list = []
for li in ul:
li_content = ''.join(li.xpath('.//text()'))
li_list.append(li_content)
h4_dict[h4_title] = li_list
return h4_dict
def get_h3_content(self,h3):
h3_dict = {}
for info in h3[1:]:
h3_content = '<h3>'+info
h4_content = h3_content.split("<h4>")
html = etree.HTML(h3_content)
h3_title_1 = ''.join(html.xpath('//h3//text()'))
h3_title = h3_title_1.replace("[编辑]", '')
if len(h4_content)<2:
ul = html.xpath('//ul/li')
ol = html.xpath('//ol/li')
if ul:
li_list = []
for li in ul:
li_content = ''.join(li.xpath('.//text()'))
li_list.append(li_content)
h3_dict[h3_title] = li_list
elif ol:
ol_list = []
for li in ol:
li_content = ''.join(li.xpath('.//text()'))
ol_list.append(li_content)
h3_dict[h3_title] = ol_list
else:
h3_content = ''.join(html.xpath('//text()'))
h3_content = h3_content.replace(h3_title_1,'')
h3_dict[h3_title] = h3_content
else:
h4_dict = self.get_h4_content(h4_content)
h3_dict[h3_title] = h4_dict
return h3_dict
def get_content(self):
"""
获取生平详情
:return:
"""
# result = re.findall(r'(<h2>.*?)<h2>', self.response, re.S)
try:
result = self.response.split('<h2>')[1:-2]
except:
return
for x in result:
h2 = '<h2>'+x
h3 = h2.split('<h3>')
html = etree.HTML(h2)
title = html.xpath('//h2//text()')[0]
if len(h3) < 2:
content = html.xpath('//text()')
content = ''.join(content[1:])
content = re.sub("(\..*?})", '', content)
content=content.replace('[编辑]','')
content = re.sub('\xa0/', '', content)
content = content.split('\n')
content = list(set(content))
new_content = []
for cont in content:
if cont == '':
continue
else:
new_content.append(cont)
self.Person_info[title] = new_content
else:
h3_dict = self.get_h3_content(h3)
self.Person_info[title] = h3_dict
def save_success(self):
""" 保存已经收录的"""
dir_path = './{}-json'.format(self.dynasty)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
path = dir_path+'/{}.json'.format(self.dynasty+"-" + self.name)
is_file = os.path.isfile(path)
if is_file:
return
print(self.Person_info)
with open(path, 'a', encoding='utf-8')as f:
f.write(json.dumps(self.Person_info, ensure_ascii=False) + '\n')
def save_false(self):
"""保存未收录文件"""
path = './未收录人物.txt'
print(self.name)
with open(path, 'a', encoding='utf8') as f:
f.write(self.name + '\n')
def run(self):
self.get_person_page()
# self.get_is_save()
self.get_name()
self.get_label()
self.get_person_relation()
self.get_content()
if self.Person_info:
self.save_success()
else:
self.save_false()
def get_name(path):
"""
初始获取名字以及朝代,从文件夹下的文本读 加入队列
:param path:
:return:
"""
dynasty = path.split('\\')[-1].split('.')[0]
with open(path, 'r', encoding='utf8') as f:
file = f.read()
name_list = file.split('\n')
for name in name_list:
try:
name = name.split(',')[1]
except:
name = name.split(',')[0]
name_dict = {}
name_dict["dynasty"] = dynasty
name_dict['name'] = name
print(name_dict)
name_queue.put(name_dict)
def get_name_from_excel(path):
"""
读excel 直接获取名字和朝代
:param path:
:return:
"""
df = pd.read_excel(path)
for value in df.values:
name = value[1]
dynasty = value[0]
name_dict = {}
name_dict["dynasty"] = dynasty
name_dict['name'] = name
print(name_dict)
name_queue.put(name_dict)
def get_name_two(path):
with open(path, 'r', encoding='utf8')as f:
name_list = f.read().split('\n')
for info in name_list:
try:
dynasty=info.split('-')[0]
name = info.split('-')[1]
except:
continue
name_dict = {}
name_dict["dynasty"] = dynasty
name_dict['name'] = name
print(name_dict)
name_queue.put(name_dict)
def main():
while True:
if name_queue.empty():
break
name_dict = name_queue.get()
name = name_dict['name']
# name = name.split('-')[0]
dynasty = name_dict['dynasty']
WiKi(name, dynasty).run()
# break
if __name__ == '__main__':
# path = r"D:\New_code\WIKI\histpry_person"
# for x in os.listdir(path):
# new_path = os.path.join(path, x)
# get_name(new_path)
# path = r"D:\New_code\WIKI\二十四史人物.xlsx"
# get_name_from_excel(path)
path = r"D:\New_code\WIKI\1.txt"
get_name_two(path)
Threads = []
start_time = time.time()
for _ in range(10):
main_t = threading.Thread(target=main)
Threads.append(main_t)
for t in Threads:
t.start()
for t in Threads:
t.join()
end_time = time.time()
use_time = end_time-start_time
print("用时:" + str(use_time))