WIKi 百科爬虫

import json
import os.path
import queue
import threading
import time
import pandas as pd
import requests
from lxml import etree
import re
from urllib import parse
from retrying import retry

"""
版本迭代: 新增从excel读取历史人物,单个数据修改,
如果家族成员没有h3 級別分類,那麽单个数据的家族人物修改为list
修改个人标签家族属性获取,例如子分类如果有多个,那么值是list,单个是str
"""


name_queue = queue.Queue()
proxy = "127.0.0.1:1080"
proxies = {
    'http': 'http://' + proxy,
    'https': 'https://' + proxy,
}


class WiKi:
    def __init__(self, name, dynasty):
        name = parse.quote(name)
        # name = parse.quote("刘邦")
        self.dynasty = dynasty
        self.start_url = "https://zh.wikipedia.org/wiki/" + name
        self.headers = {
            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.7 Safari/537.36"
        }
        self.Person_info = {}

    @retry()
    def get_person_page(self):
        """页面请求"""
        response = requests.get(url=self.start_url, headers=self.headers, proxies=proxies)
        self.response = response.content.decode()

    def get_name(self):
        """抓取的首页的人物名字"""
        html = etree.HTML(self.response)
        name = html.xpath('//h1[@id="firstHeading"]/text()')
        self.name = name[0]
        # self.Person_info['姓名'] = self.name

    def get_label(self):
        """获取标签栏属性"""
        html = etree.HTML(self.response)
        label_div= html.xpath('//table[@class="infobox vcard"]/tbody/tr')
        if label_div == []:
            return
        label_dict = {}
        for label_tr in label_div:
            label_name = ''.join(label_tr.xpath('./th/text()'))
            label_value = ''.join(label_tr.xpath('./td//text()')).replace('\n','').replace('\xa0','')
            label_tr_td = label_tr.xpath('./td/table/tbody/tr')
            if label_tr_td:
                for tr in label_tr_td:
                    result = etree.tostring(tr,pretty_print=True).decode('utf-8')
                    result = re.sub("<a .*?>", '', result)
                    result = re.sub("(</a>)", '', result)
                    tr = etree.HTML(result)
                    th_name = ''.join(tr.xpath('//th//text()')).replace('\n','')
                    td_value = tr.xpath('//td//text()')
                    # print(td_value)
                    if len(td_value)<=1:
                        td_value = ''.join(td_value).replace('\n','')
                    else:
                        td_value = td_value
                    if th_name == '':
                        continue
                    # if td_value == '':
                    #     continue
                    label_dict[th_name] = td_value
            if label_value == '':
                continue
            if label_name == '':
                continue
            label_dict[label_name] = label_value
        self.Person_info['详情'] = label_dict

    def get_person_relation(self):
        """获取人物简介"""
        try:
            result = re.search(r'(<div class="mw-parser-output".*?)<h2>', self.response, re.S).group(1)
        except:
            return
        html = etree.HTML(result)
        p_list = html.xpath('//p//text()')
        relation = ''.join(p_list)
        relation = re.sub("(\..*})", '', relation)
        rule = "可以指:"
        rule1 = "可指:"
        rule2 = "可能指下列人物:"
        if rule in relation or rule1 in relation or rule2 in relation or len(relation)< 15:
            return
        self.Person_info["简介"] = relation

    def get_h4_content(self, h4):
        h4_dict = {}
        for info in h4[1:]:
            info = "<h4>"+info
            html = etree.HTML(info)
            h4_title_1 = ''.join(html.xpath('//h4//text()'))
            h4_title = h4_title_1.replace('[编辑]', '')
            ul = html.xpath('//ul/li')
            if ul==[]:
                h4_content = ''.join(html.xpath('//text()'))
                h4_content = h4_content.replace(h4_title_1,'')
                h4_dict[h4_title] = h4_content
            else:
                li_list = []
                for li in ul:
                    li_content = ''.join(li.xpath('.//text()'))
                    li_list.append(li_content)
                h4_dict[h4_title] = li_list
        return h4_dict

    def get_h3_content(self,h3):
        h3_dict = {}
        for info in h3[1:]:
            h3_content = '<h3>'+info
            h4_content = h3_content.split("<h4>")
            html = etree.HTML(h3_content)
            h3_title_1 = ''.join(html.xpath('//h3//text()'))
            h3_title = h3_title_1.replace("[编辑]", '')
            if len(h4_content)<2:
                ul = html.xpath('//ul/li')
                ol = html.xpath('//ol/li')
                if ul:
                    li_list = []
                    for li in ul:
                        li_content = ''.join(li.xpath('.//text()'))
                        li_list.append(li_content)
                    h3_dict[h3_title] = li_list
                elif ol:
                    ol_list = []
                    for li in ol:
                        li_content = ''.join(li.xpath('.//text()'))
                        ol_list.append(li_content)
                    h3_dict[h3_title] = ol_list
                else:
                    h3_content = ''.join(html.xpath('//text()'))
                    h3_content = h3_content.replace(h3_title_1,'')
                    h3_dict[h3_title] = h3_content
            else:
                h4_dict = self.get_h4_content(h4_content)
                h3_dict[h3_title] = h4_dict
        return h3_dict

    def get_content(self):
        """
        获取生平详情
        :return:
        """
        # result = re.findall(r'(<h2>.*?)<h2>', self.response, re.S)
        try:
            result = self.response.split('<h2>')[1:-2]
        except:
            return
        for x in result:
            h2 = '<h2>'+x
            h3 = h2.split('<h3>')
            html = etree.HTML(h2)
            title = html.xpath('//h2//text()')[0]
            if len(h3) < 2:
                content = html.xpath('//text()')
                content = ''.join(content[1:])
                content = re.sub("(\..*?})", '', content)
                content=content.replace('[编辑]','')
                content = re.sub('\xa0/', '', content)
                content = content.split('\n')
                content = list(set(content))
                new_content = []
                for cont in content:
                    if cont == '':
                        continue
                    else:
                        new_content.append(cont)
                self.Person_info[title] = new_content
            else:
                h3_dict = self.get_h3_content(h3)
                self.Person_info[title] = h3_dict

    def save_success(self):
        """ 保存已经收录的"""
        dir_path = './{}-json'.format(self.dynasty)
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        path = dir_path+'/{}.json'.format(self.dynasty+"-" + self.name)
        is_file = os.path.isfile(path)
        if is_file:
            return
        print(self.Person_info)
        with open(path, 'a', encoding='utf-8')as f:
            f.write(json.dumps(self.Person_info, ensure_ascii=False) + '\n')

    def save_false(self):
        """保存未收录文件"""
        path = './未收录人物.txt'
        print(self.name)
        with open(path, 'a', encoding='utf8') as f:
            f.write(self.name + '\n')

    def run(self):
        self.get_person_page()
        # self.get_is_save()
        self.get_name()
        self.get_label()
        self.get_person_relation()
        self.get_content()
        if self.Person_info:
            self.save_success()
        else:
            self.save_false()


def get_name(path):
    """
    初始获取名字以及朝代,从文件夹下的文本读 加入队列
    :param path:
    :return:
    """
    dynasty = path.split('\\')[-1].split('.')[0]
    with open(path, 'r', encoding='utf8') as f:
        file = f.read()
    name_list = file.split('\n')
    for name in name_list:
        try:
            name = name.split(',')[1]
        except:
            name = name.split(',')[0]
        name_dict = {}
        name_dict["dynasty"] = dynasty
        name_dict['name'] = name
        print(name_dict)
        name_queue.put(name_dict)


def get_name_from_excel(path):
    """
    读excel 直接获取名字和朝代
    :param path:
    :return:
    """
    df = pd.read_excel(path)
    for value in df.values:
        name = value[1]
        dynasty = value[0]
        name_dict = {}
        name_dict["dynasty"] = dynasty
        name_dict['name'] = name
        print(name_dict)
        name_queue.put(name_dict)


def get_name_two(path):
    with open(path, 'r', encoding='utf8')as f:
        name_list = f.read().split('\n')
        for info in name_list:
            try:
                dynasty=info.split('-')[0]
                name = info.split('-')[1]
            except:
                continue
            name_dict = {}
            name_dict["dynasty"] = dynasty
            name_dict['name'] = name
            print(name_dict)
            name_queue.put(name_dict)

def main():
    while True:
        if name_queue.empty():
            break
        name_dict = name_queue.get()
        name = name_dict['name']
        # name = name.split('-')[0]
        dynasty = name_dict['dynasty']
        WiKi(name, dynasty).run()
        # break


if __name__ == '__main__':
    # path = r"D:\New_code\WIKI\histpry_person"
    # for x in os.listdir(path):
    #     new_path = os.path.join(path, x)
    #     get_name(new_path)
    # path = r"D:\New_code\WIKI\二十四史人物.xlsx"
    # get_name_from_excel(path)
    path = r"D:\New_code\WIKI\1.txt"
    get_name_two(path)
    Threads = []
    start_time = time.time()
    for _ in range(10):
        main_t = threading.Thread(target=main)
        Threads.append(main_t)
    for t in Threads:
        t.start()
    for t in Threads:
        t.join()
    end_time = time.time()
    use_time = end_time-start_time
    print("用时:" + str(use_time))

代理使用小飞机,proxies 访问本地

数据保存json

本地读取人物名称+朝代 古人信息采集

 

import json
import os.path
import queue
import threading
import time
import pandas as pd
import requests
from lxml import etree
import re
from urllib import parse
from retrying import retry

"""
版本迭代: 新增从excel读取历史人物,单个数据修改,
如果家族成员没有h3 級別分類,那麽单个数据的家族人物修改为list
修改个人标签家族属性获取,例如子分类如果有多个,那么值是list,单个是str
"""


name_queue = queue.Queue()
proxy = "127.0.0.1:1080"
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy,
}


class WiKi:
def __init__(self, name, dynasty):
name = parse.quote(name)
# name = parse.quote("刘邦")
self.dynasty = dynasty
self.start_url = "https://zh.wikipedia.org/wiki/" + name
self.headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.7 Safari/537.36"
}
self.Person_info = {}

@retry()
def get_person_page(self):
"""页面请求"""
response = requests.get(url=self.start_url, headers=self.headers, proxies=proxies)
self.response = response.content.decode()

def get_name(self):
"""抓取的首页的人物名字"""
html = etree.HTML(self.response)
name = html.xpath('//h1[@id="firstHeading"]/text()')
self.name = name[0]
# self.Person_info['姓名'] = self.name

def get_label(self):
"""获取标签栏属性"""
html = etree.HTML(self.response)
label_div= html.xpath('//table[@class="infobox vcard"]/tbody/tr')
if label_div == []:
return
label_dict = {}
for label_tr in label_div:
label_name = ''.join(label_tr.xpath('./th/text()'))
label_value = ''.join(label_tr.xpath('./td//text()')).replace('\n','').replace('\xa0','')
label_tr_td = label_tr.xpath('./td/table/tbody/tr')
if label_tr_td:
for tr in label_tr_td:
result = etree.tostring(tr,pretty_print=True).decode('utf-8')
result = re.sub("<a .*?>", '', result)
result = re.sub("(</a>)", '', result)
tr = etree.HTML(result)
th_name = ''.join(tr.xpath('//th//text()')).replace('\n','')
td_value = tr.xpath('//td//text()')
# print(td_value)
if len(td_value)<=1:
td_value = ''.join(td_value).replace('\n','')
else:
td_value = td_value
if th_name == '':
continue
# if td_value == '':
# continue
label_dict[th_name] = td_value
if label_value == '':
continue
if label_name == '':
continue
label_dict[label_name] = label_value
self.Person_info['详情'] = label_dict

def get_person_relation(self):
"""获取人物简介"""
try:
result = re.search(r'(<div class="mw-parser-output".*?)<h2>', self.response, re.S).group(1)
except:
return
html = etree.HTML(result)
p_list = html.xpath('//p//text()')
relation = ''.join(p_list)
relation = re.sub("(\..*})", '', relation)
rule = "可以指:"
rule1 = "可指:"
rule2 = "可能指下列人物:"
if rule in relation or rule1 in relation or rule2 in relation or len(relation)< 15:
return
self.Person_info["简介"] = relation

def get_h4_content(self, h4):
h4_dict = {}
for info in h4[1:]:
info = "<h4>"+info
html = etree.HTML(info)
h4_title_1 = ''.join(html.xpath('//h4//text()'))
h4_title = h4_title_1.replace('[编辑]', '')
ul = html.xpath('//ul/li')
if ul==[]:
h4_content = ''.join(html.xpath('//text()'))
h4_content = h4_content.replace(h4_title_1,'')
h4_dict[h4_title] = h4_content
else:
li_list = []
for li in ul:
li_content = ''.join(li.xpath('.//text()'))
li_list.append(li_content)
h4_dict[h4_title] = li_list
return h4_dict

def get_h3_content(self,h3):
h3_dict = {}
for info in h3[1:]:
h3_content = '<h3>'+info
h4_content = h3_content.split("<h4>")
html = etree.HTML(h3_content)
h3_title_1 = ''.join(html.xpath('//h3//text()'))
h3_title = h3_title_1.replace("[编辑]", '')
if len(h4_content)<2:
ul = html.xpath('//ul/li')
ol = html.xpath('//ol/li')
if ul:
li_list = []
for li in ul:
li_content = ''.join(li.xpath('.//text()'))
li_list.append(li_content)
h3_dict[h3_title] = li_list
elif ol:
ol_list = []
for li in ol:
li_content = ''.join(li.xpath('.//text()'))
ol_list.append(li_content)
h3_dict[h3_title] = ol_list
else:
h3_content = ''.join(html.xpath('//text()'))
h3_content = h3_content.replace(h3_title_1,'')
h3_dict[h3_title] = h3_content
else:
h4_dict = self.get_h4_content(h4_content)
h3_dict[h3_title] = h4_dict
return h3_dict

def get_content(self):
"""
获取生平详情
:return:
"""
# result = re.findall(r'(<h2>.*?)<h2>', self.response, re.S)
try:
result = self.response.split('<h2>')[1:-2]
except:
return
for x in result:
h2 = '<h2>'+x
h3 = h2.split('<h3>')
html = etree.HTML(h2)
title = html.xpath('//h2//text()')[0]
if len(h3) < 2:
content = html.xpath('//text()')
content = ''.join(content[1:])
content = re.sub("(\..*?})", '', content)
content=content.replace('[编辑]','')
content = re.sub('\xa0/', '', content)
content = content.split('\n')
content = list(set(content))
new_content = []
for cont in content:
if cont == '':
continue
else:
new_content.append(cont)
self.Person_info[title] = new_content
else:
h3_dict = self.get_h3_content(h3)
self.Person_info[title] = h3_dict

def save_success(self):
""" 保存已经收录的"""
dir_path = './{}-json'.format(self.dynasty)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
path = dir_path+'/{}.json'.format(self.dynasty+"-" + self.name)
is_file = os.path.isfile(path)
if is_file:
return
print(self.Person_info)
with open(path, 'a', encoding='utf-8')as f:
f.write(json.dumps(self.Person_info, ensure_ascii=False) + '\n')

def save_false(self):
"""保存未收录文件"""
path = './未收录人物.txt'
print(self.name)
with open(path, 'a', encoding='utf8') as f:
f.write(self.name + '\n')

def run(self):
self.get_person_page()
# self.get_is_save()
self.get_name()
self.get_label()
self.get_person_relation()
self.get_content()
if self.Person_info:
self.save_success()
else:
self.save_false()


def get_name(path):
"""
初始获取名字以及朝代,从文件夹下的文本读 加入队列
:param path:
:return:
"""
dynasty = path.split('\\')[-1].split('.')[0]
with open(path, 'r', encoding='utf8') as f:
file = f.read()
name_list = file.split('\n')
for name in name_list:
try:
name = name.split(',')[1]
except:
name = name.split(',')[0]
name_dict = {}
name_dict["dynasty"] = dynasty
name_dict['name'] = name
print(name_dict)
name_queue.put(name_dict)


def get_name_from_excel(path):
"""
读excel 直接获取名字和朝代
:param path:
:return:
"""
df = pd.read_excel(path)
for value in df.values:
name = value[1]
dynasty = value[0]
name_dict = {}
name_dict["dynasty"] = dynasty
name_dict['name'] = name
print(name_dict)
name_queue.put(name_dict)


def get_name_two(path):
with open(path, 'r', encoding='utf8')as f:
name_list = f.read().split('\n')
for info in name_list:
try:
dynasty=info.split('-')[0]
name = info.split('-')[1]
except:
continue
name_dict = {}
name_dict["dynasty"] = dynasty
name_dict['name'] = name
print(name_dict)
name_queue.put(name_dict)

def main():
while True:
if name_queue.empty():
break
name_dict = name_queue.get()
name = name_dict['name']
# name = name.split('-')[0]
dynasty = name_dict['dynasty']
WiKi(name, dynasty).run()
# break


if __name__ == '__main__':
# path = r"D:\New_code\WIKI\histpry_person"
# for x in os.listdir(path):
# new_path = os.path.join(path, x)
# get_name(new_path)
# path = r"D:\New_code\WIKI\二十四史人物.xlsx"
# get_name_from_excel(path)
path = r"D:\New_code\WIKI\1.txt"
get_name_two(path)
Threads = []
start_time = time.time()
for _ in range(10):
main_t = threading.Thread(target=main)
Threads.append(main_t)
for t in Threads:
t.start()
for t in Threads:
t.join()
end_time = time.time()
use_time = end_time-start_time
print("用时:" + str(use_time))
上一篇:复合事件与显示效果


下一篇:Random color generator exercise