python爬虫入门学习3 批量爬取小说并生成文件

#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""
@author:Evolve Hsu
@file:thread_book.py
@time:2021/03/26
"""
import re
import urllib
import threading
from urllib import request, error  # 制定URL 获取网页数据

from bs4 import BeautifulSoup  # 网页解析 获取数据
import sqlite3  # sqlite3 数据库操作
import time
from book import NewBook

headers = {
    'User-Agent': ' Mozilla/5.0 (Windows NT 10.0 Win64 x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36 Edg/89.0.774.54'
}
parent_url = 'http://www.xbiquge.me'


# 异步线程对象
class MyThread(threading.Thread):

    def __init__(self, func, book, book_list):
        threading.Thread.__init__(self)
        self.book = book
        self.book_list = book_list
        self.func = func

    def run(self):
        self.func(self.book, self.book_list)


# 获取小说首页html
def get_index_html(url):
    while True:
        request = urllib.request.Request(url=url, headers=headers)
        try:
            resp = urllib.request.urlopen(request)
            html = resp.read().decode("utf-8")
            break
        except urllib.error.URLError as e:
            print(e)
            print("异常链接: " + url)
            time.sleep(5)
    return html


# 获取章节list
def getElementList(url):
    link_list = []
    # 获取首页
    index = parent_url + url
    html = get_index_html(index)
    # 解析首页
    bs = BeautifulSoup(html, "lxml")
    elementList = bs.find('div', id="list").find_all('a')
    for data in elementList:
        # 根据 href名提取内容
        link = data.get('href')
        if link != 'chapter.html':
            link_list.append(link)
    set(link_list)
    return link_list


# 解析数据
def resolve_element(book, book_list):
    text = []
    url = parent_url + book.element
    while True:
        print("准备解析 html: " + url)
        request = urllib.request.Request(url=url, headers=headers)
        try:
            resp = urllib.request.urlopen(request)
            html = resp.read().decode("utf-8")
            bs = BeautifulSoup(html, "lxml")
            for item in bs.find('div', id="content").find_all('p'):
                text.append(item.text.replace('xbiquge/最快更新!无广告!', ''))
            # 标题
            book.__setattr__('title', bs.select('body > div.content_read > div > div.bookname > h1 > a')[0].text)
            # 链接
            book.__setattr__('link', url)
            # 序号
            book.__setattr__('number', int(url.split('_')[1].replace('.html', '')))
            # 内容
            book.__setattr__('text', ''.join(text))
            book_list.append(book)
            break
        except Exception as e:
            print(e)
            print('发生异常 休息5秒: ' + url)
            time.sleep(5)
    print("本线程任务完成: " + url)


# 批量保存数据到数据库
def save_new_book(book_list):
    print('准备保存数据 数量: %d' % len(book_list))
    db_path = "newbook.db"
    # init_db(db_path)  # 初始化数据库
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    sql = "insert into new_book (book_name,author,number,title ,link, data) values"
    total = 0
    for book in book_list:
        values = "('" + book.book_name + "'" + ',' + "'" + book.author + "'" + ',' + "'" + str(
            book.number) + "'" + ',' + "'" + book.title + "'" + ',' + "'" + book.link + "'" + ',' + "'" + book.text + "')"
        total = total + 1
        if total == len(book_list):
            values = values + ';'
        else:
            values = values + ','
        sql = sql + values
    c.execute(sql)
    conn.commit()
    c.close()
    conn.close()


# 数据库表初始化
def init_db(savePath):
    sql = '''
            create table new_book
            (
                id integer primary key autoincrement,
                book_name varchar ,
                author varchar,
                number integer ,
                title varchar,
                link varchar ,
                data text 
            );
        '''
    conn = sqlite3.connect(savePath)
    c = conn.cursor()
    c.execute(sql)
    conn.commit()
    conn.close()
    print("init_db success")


# 根据书名查找书
def search_book_name(book_name):
    search_url = 'https://www.xbiquge.me/search/result.html?searchkey=' + urllib.parse.quote(book_name)
    while True:
        request = urllib.request.Request(url=search_url, headers=headers)
        try:
            resp = urllib.request.urlopen(request)
            html = resp.read().decode("utf-8")
            break
        except urllib.error.URLError as e:
            print(e)
            time.sleep(1)
    return html


# 根据查询结果页 保存结果集
def resolve_book_base(book_search_html):
    book_name_list = []
    book_link_list = []
    book_author_list = []
    bs = BeautifulSoup(book_search_html, "lxml")
    name_link_list = bs.find_all('span', class_=re.compile('s2'))
    author_list = bs.find_all('span', class_=re.compile('s4'))
    total = 0
    # 2个集合一起循环解析内容并返回
    for name_link, author in zip(name_link_list, author_list):
        if total != 0:
            book_name_list.append(name_link.text)
            book_link_list.append(name_link.a.get('href'))
            book_author_list.append(author.text)
        total = total + 1
    return book_name_list, book_link_list, book_author_list


# 保存小说到文件
def generate_file(book_list, file_name, author, file_path):
    print('准备写入数据: ' + file_name)
    fo = open(file_path + file_name + '-' + author + '.txt', "ab+")  # 打开小说文件
    for book in book_list:
        # 以二进制写入章节题目 需要转换为utf-8编码,否则会出现乱码
        fo.write(('\r' + book.title + '\r\n').encode('UTF-8'))
        # 以二进制写入章节内容
        fo.write((book.text).encode('UTF-8'))
    fo.close()  # 关闭小说文件


if __name__ == '__main__':
    book_name = '斗破苍穹'
    file_path = 'C://Users//EvolveHsu//Desktop//爬虫小说下载//'
    book_search_html = search_book_name(book_name)
    book_name_list, book_link_list, book_author_list = resolve_book_base(book_search_html)
    book_list = []
    for baseUrl, name, author in zip(book_link_list, book_name_list, book_author_list):
        elementList = getElementList(baseUrl)
        # 单线程测试用
        # resolve_element(NewBook(elementList[0], book_name, author, None, None, None, None))

        # 多线程 resolve_element为执行方法
        threadList = [MyThread(resolve_element, NewBook(element, book_name, author, None, None, None, None), book_list)
                      for element in elementList]
        startTotal = 0
        for t in threadList:
            startTotal = startTotal + 1
            t.setDaemon(True)
            t.start()
            if startTotal == 10:
                sleep_time = 5
                print('启动线程达到 %d 条休息 %d 秒' % (startTotal, sleep_time))
                time.sleep(sleep_time)
                startTotal = 0
        for i in threadList:
            i.join()
        # 根据章节编号排序
        book_list.sort()
        # 生成文件
        generate_file(book_list, book_name, author, file_path)
        book_list = []
    # 保存数据到数据库
    # print("爬取数据完成 准备保存数据")
    # save_new_book(book_list)

上一篇:Python爬虫——爬取知网论文数据(二)


下一篇:pyqt5实战之真爱游戏(2048改版)