Python爬虫+Go WebServer+Flutter App(Python篇)


1.前言

对于一个应用来说,需要获取内容、服务端提供内容、客户端展示内容,这个三部分可以通过python,go,flutter结合从而实现一个应用。

2.Python爬虫获取内容

通过selenium调用浏览器内核,获取对应网页内容,并解析需要的内容,最后通过MySQL保存到数据库。

2.1安装python

前往官网下载安装包,我选择的python2
选择对应系统环境安装包,下载安装完成,设置环境变量
然后在终端输入"python --version",如果显示python版本则安装完成

zxl@zxl-7060:~$ python --version
Python 2.7.12

2.2安装pip

pip 是 Python 包管理工具,该工具提供了对Python 包的查找、下载、安装、卸载的功能。
如果未安装,则通过命令curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py下载安装脚本
再通过命令sudo python get-pip.py安装脚本
通过命令pip --version来判断是否安装成功
出现如下类似信息,则安装成功

zxl@zxl-7060:~$  pip --version
pip 19.1.1 from /home/mi/.local/lib/python2.7/site-packages/pip (python 2.7)

2.3安装selenium

Selenium 是一个用于Web应用程序测试的工具
通过python调用Selenium就像真正的用户在操作浏览器一样,可以很好的解决网页js加载等问题

pip install selenium

2.4选择浏览器驱动

以chrome为例,下载chrome driver,选择电脑中chrome浏览器对应版本的driver

2.4安装开发工具PyCharm

前往官网下载安装包,选择PyCharm下载
安装好开发工具,如下图所示创建好工程,准备开发

Python爬虫+Go WebServer+Flutter App(Python篇)

2.5安装MySQL

前往官网下载mysql,并进行安装

2.6安装mysql-connector

输入命令pip install mysql-connector
这样就可以操作mysql数据库了

2.7网页请求

1.设置浏览器驱动位置
2.设置不打开浏览器进行网页请求
3.网页请求

#!/usr/bin/python# coding=utf-8import platformfrom selenium import webdriverclass BaseRequest:
    def get_web_content(self, url):
        #chromedriver = "C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe"
        chromedriver = "/Users/zxl/Downloads/chromedriver"
        sysstr = platform.system()
        if sysstr == 'Darwin':
            chromedriver = "/Users/zxl/Downloads/chromedriver"
        elif sysstr == 'Windows':
            chromedriver = "D:\\my_github_workspace\\chromedriver.exe"
        elif sysstr == 'Linux':
            chromedriver = "/Users/zxl/Downloads/chromedriver"
        # 创建chrome参数对象
        opt = webdriver.ChromeOptions()
        # 把chrome设置成***面模式,不论windows还是linux都可以,自动适配对应参数
        opt.set_headless()
        prefs = {"profile.managed_default_content_settings.images": 2}
        opt.add_experimental_option("prefs", prefs)
        # 创建chrome***面对象
        driver = webdriver.Chrome(executable_path=chromedriver, options=opt)
        driver.get(url)
        return driver

2.8网页内容解析

1.网页请求成功后,获取到该网页对象driver
2.通过xpath进行页面标签解析
3.解析完成关闭浏览器driver

#!/usr/bin/python# coding=utf-8import datetimeimport hashlibimport refrom selenium.common.exceptions import NoSuchElementExceptionfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.wait import WebDriverWaitfrom selenium.webdriver.support import expected_conditionsfrom com_zxl_spider_db.JokeDB import JokeDBfrom com_zxl_spider_request.BaseRequest import *from com_zxl_spider_data.JokeBean import *class RequestQsbkTxt(BaseRequest):

    def __init__(self):
        global jokeDB
        jokeDB = JokeDB()

    def parse(self, end_url, index):
        print "parse::end_url = ", end_url, "::index = ", index

        driver = self.get_web_content("https://www.qiushibaike.com/" + end_url + str(index))

        elem1 = WebDriverWait(driver, 10).until(
            expected_conditions.presence_of_element_located((By.XPATH, '//ul[@class="pagination"]')))
        print "elem1 = ", elem1
        elem2 = WebDriverWait(driver, 10).until(
            expected_conditions.presence_of_element_located((By.XPATH, '//div[@class="article block untagged mb15"]')))
        print "elem2 = ", elem2        # page_source = driver.page_source

        isFindNextPage = False
        paginationObject = driver.find_element_by_xpath('//ul[@class="pagination"]')
        pageListObject = paginationObject.find_elements_by_xpath('.//li')
        for pageItemObject in pageListObject:
            page_index_txt = pageItemObject.text            print "pageItemObject::page_index_txt = ", page_index_txt
            itemFindResult = re.findall(".*?(\d+).*?", page_index_txt)
            print "pageItemObject::itemFindResult = ", itemFindResult            if len(itemFindResult) > 0:
                if int(itemFindResult[0]) > index:
                    index = int(itemFindResult[0])
                    isFindNextPage = True
                    break
                # if index - int(itemFindResult[0]) == 1:
                #     index = int(itemFindResult[0])
                #     isFindNextPage = True
                #     break
        print "parse::isFindNextPage = ", isFindNextPage, "::index = ", index, "::end_url = ",

        hotPicJokeItemPath = '//div[@class="article block untagged mb15"]'
        hotPicJokeItems = driver.find_elements_by_xpath(hotPicJokeItemPath)

        print 'hotPicJokeItems length = ', len(hotPicJokeItems)

        for hotPicJokeItem in hotPicJokeItems:
            jokeId = hotPicJokeItem.get_attribute('id')
            md5Object = hashlib.md5()
            md5Object.update(jokeId.encode('utf-8'))
            jokeMd5Value = md5Object.hexdigest()

            authorObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="author clearfix"]')
            authorNickObject = authorObject.find_element_by_xpath('.//h2')
            authorNickName = authorNickObject.text
            authorImgObject = authorObject.find_element_by_xpath('.//img')
            authorImgUrl = authorImgObject.get_attribute('src')

            authorGender = ''
            authorAge = -1
            try:
                authorGenderObject = authorObject.find_element_by_xpath(".//div[starts-with(@class,'articleGender')]")
                authorGender = authorGenderObject.get_attribute('class')
                authorAge = authorGenderObject.text            except NoSuchElementException as e:
                print e

            contentObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="content"]')
            content = contentObject.text

            thumbImgUrl = ''
            try:
                thumbObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="thumb"]')
                thumbImgObject = thumbObject.find_element_by_xpath('.//img')
                thumbImgUrl = thumbImgObject.get_attribute('src')
            except NoSuchElementException as e:
                print e

            statsVoteContent = ''
            statsCommentContent = ''
            statsCommentDetailUrl = ''
            try:
                statsObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="stats"]')
                try:
                    statsVoteObject = statsObject.find_element_by_xpath('.//span[@class="stats-vote"]')
                    statsVoteContent = statsVoteObject.text                except NoSuchElementException as e:
                    print e                try:
                    statsCommentObject = statsObject.find_element_by_xpath('.//span[@class="stats-comments"]')
                    statsCommentContent = statsCommentObject.find_element_by_xpath(
                        './/a[@class="qiushi_comments"]').text
                    statsCommentDetailUrl = statsCommentObject.find_element_by_xpath(
                        './/a[@class="qiushi_comments"]').get_attribute('href')
                except NoSuchElementException as e:
                    print e            except NoSuchElementException as e:
                print e            # print authorNickName
            # print authorGender
            # print authorAge
            # print authorImgUrl
            # print content
            # print thumbImgUrl
            # print statsVoteContent
            # print statsCommentContent
            # print statsCommentDetailUrl
            # print jokeId
            # print jokeMd5Value

            # print '\n'
            # print '======================================end=========================================='
            # print '\n'

            joke_bean = JokeBean()
            joke_bean = joke_bean.create_joke_bean(
                authorNickName.encode('utf-8'),
                authorGender,
                authorAge,
                authorImgUrl,
                content.encode('utf-8'),
                thumbImgUrl,
                statsVoteContent,
                statsCommentContent,
                statsCommentDetailUrl,
                jokeMd5Value)

            isExistJokeItem = jokeDB.query_by_md5(jokeMd5Value)
            print isExistJokeItem            if isExistJokeItem is None:
                print "not ExistJokeItem"
                jokeDB.insert_joke(joke_bean)
            else:
                print "ExistJokeItem"
                driver.close()
                return

        print "==============end================="
        print "\n"

        driver.close()
        if not isFindNextPage:
            return
        else:
            self.parse(end_url, index)

    def clas_db(self):
        if jokeDB is not None:
            jokeDB.close_db()

    def start_task(self):
        print "start_task::", 'Now Time::', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.parse("pic/page/", 1)
        self.clas_db()if __name__ == "__main__":
    request = RequestQsbkTxt()
    # request.parse("pic/page/", 1)
    request.parse("pic/page/", 1)

    request.clas_db()

2.9数据集合

1.根据解析内容构造数据集合

#!/usr/bin/python# coding=utf-8class JokeBean:

    def create_joke_bean(self,
                         author_nick_name,
                         author_gender,
                         author_age,
                         author_img_url,
                         content,
                         thumb_img_url,
                         stats_vote_content,
                         stats_comment_content,
                         stats_comment_detail_url,
                         md5):
        bean = {'author_nick_name': author_nick_name,
                'author_gender': author_gender,
                'author_age': author_age,
                'author_img_url': author_img_url,
                'content': content,
                'thumb_img_url': thumb_img_url,
                'stats_vote_content': stats_vote_content,
                'stats_comment_content': stats_comment_content,
                'stats_comment_detail_url': stats_comment_detail_url,
                'md5': md5}
        return bean

2.10保存数据库

1.设置数据库ip地址,端口号
2.设置连接数据用户名,密码
3.设置连接的数据库名称
4.如果数据库不存在,则创建数据库,并创建相应的表
5.设置增删改查操作

#!/usr/bin/python# coding=utf-8import mysql.connectorfrom mysql.connector import errorcodeclass BaseDB:
    host = 'zxltest.zicp.vip'
    port = '42278'
    urser_name = "***"
    pass_word = "***"
    db_name = 'joke'

    CREATE_TABLE_SQL = ("")

    def __init__(self):
        global cnx        global cursor        try:
            cnx = mysql.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port, database=self.db_name)
            cursor = cnx.cursor()
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
                exit(1)
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
                cnx = mysql.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port)
                cursor = cnx.cursor()
                self.__create_database()
                self.__create_table()
            else:
                print(err)
                exit(1)
        else:
            self.__create_table()
            print("DBUtil init finish")

    def __create_database(self):
        try:
            cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name))
            cnx.database = self.db_name            print("Create database finish")
        except mysql.connector.Error as err:
            print("Failed creating database: {}".format(err))
            exit(1)

    def __create_table(self):
        # for name, ddl in CityDB.TABLES.iteritems():
        print "create table::", self.CREATE_TABLE_SQL        try:
            print("Creating table {}: ".format(self.CREATE_TABLE_SQL),)
            cursor.execute(self.CREATE_TABLE_SQL)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
                print("already exists.")
            else:
                print(err.msg)
                exit(1)
        else:
            print("OK")

    def query(self, sql_str):
        print "query::", sql_str
        cursor.execute(sql_str)
        return cursor    def insert(self, sql_str, param):
        cursor.execute(sql_str, param)
        cnx.commit()

    def update(self, sql_str):
        cursor.execute(sql_str)
        cnx.commit    def delete(self, sql_str):
        cursor.execute(sql_str)
        cnx.commit()

    def close_db(self):
        cursor.close()
        cnx.close()

#!/usr/bin/python# coding=utf-8import mysqlfrom mysql.connector import errorcodefrom com_zxl_spider_data.JokeBean import JokeBeanfrom com_zxl_spider_db.BaseDB import BaseDBclass JokeDB(BaseDB):

    TABLE_NAME = 'joke'

    COLUME_ID = 'id'
    COLUME_AUTHOR_NICK_NAME = 'author_nick_name'
    COLUME_AUTHOR_GENDER = 'author_gender'
    COLUME_AUTHOR_AGE = 'author_age'
    COLUME_AUTHOR_IMG_URL = 'author_img_url'
    COLUME_CONTENT = 'content'
    COLUME_THUMB_IMG_URL = 'thumb_img_url'
    COLUME__STATS_VOTE_CONTENT = 'stats_vote_content'
    COLUME_STATS_COMMENT_CONTENT = 'stats_comment_content'
    COLUME_STATS_COMMENT_DETAIL_URL = 'stats_comment_detail_url'
    COLUME_MD5 = 'md5'

    CREATE_TABLE_SQL = (
        "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " ("
        "  " + COLUME_ID + " bigint(20) NOT NULL AUTO_INCREMENT,"
        "  " + COLUME_AUTHOR_NICK_NAME + "  varchar(16),"
        "  " + COLUME_AUTHOR_GENDER + " text,"
        "  " + COLUME_AUTHOR_AGE + " text,"
        "  " + COLUME_AUTHOR_IMG_URL + " text,"
        "  " + COLUME_CONTENT + " text,"
        "  " + COLUME_THUMB_IMG_URL + " text,"
        "  " + COLUME__STATS_VOTE_CONTENT + " text,"
        "  " + COLUME_STATS_COMMENT_CONTENT + " text,"
        "  " + COLUME_STATS_COMMENT_DETAIL_URL + " text,"
        "  " + COLUME_MD5 + " text,"
        "  PRIMARY KEY (" + COLUME_ID + ")"
        ") ENGINE=InnoDB")

    INSERT_JOKE_SQL = ("INSERT INTO " + TABLE_NAME + " ("
                                                     + COLUME_AUTHOR_NICK_NAME + ","
                                                     + COLUME_AUTHOR_GENDER + ","
                                                     + COLUME_AUTHOR_AGE + ","
                                                     + COLUME_AUTHOR_IMG_URL + ","
                                                     + COLUME_CONTENT + ","
                                                     + COLUME_THUMB_IMG_URL + ","
                                                     + COLUME__STATS_VOTE_CONTENT + ","
                                                     + COLUME_STATS_COMMENT_CONTENT + ","
                                                     + COLUME_STATS_COMMENT_DETAIL_URL + ","
                                                     + COLUME_MD5                                                     + ") "
                                                     + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")

    QUERY_JOKE_BY_MD5 = ("SELECT "
                         + COLUME_AUTHOR_NICK_NAME + ","
                         + COLUME_AUTHOR_GENDER + ","
                         + COLUME_AUTHOR_AGE + ","
                         + COLUME_AUTHOR_IMG_URL + ","
                         + COLUME_CONTENT + ","
                         + COLUME_THUMB_IMG_URL + ","
                         + COLUME__STATS_VOTE_CONTENT + ","
                         + COLUME_STATS_COMMENT_CONTENT + ","
                         + COLUME_STATS_COMMENT_DETAIL_URL + ","
                         + COLUME_MD5                         + " FROM " + TABLE_NAME                         + " WHERE " + COLUME_MD5 + " = '%s'")

    def create_insert_data(self, joke_bean):
        return (
            joke_bean['author_nick_name'],
            joke_bean['author_gender'],
            joke_bean['author_age'],
            joke_bean['author_img_url'],
            joke_bean['content'],
            joke_bean['thumb_img_url'],
            joke_bean['stats_vote_content'],
            joke_bean['stats_comment_content'],
            joke_bean['stats_comment_detail_url'],
            joke_bean['md5']
        )

    def insert_joke(self, joke_bean):
        self.insert(self.INSERT_JOKE_SQL, self.create_insert_data(joke_bean))

    def query_by_md5(self, md5):
        cursor = self.query(self.QUERY_JOKE_BY_MD5 % (md5,))

        for (COLUME_AUTHOR_NICK_NAME,
             COLUME_AUTHOR_GENDER,
             COLUME_AUTHOR_AGE,
             COLUME_AUTHOR_IMG_URL,
             COLUME_CONTENT,
             COLUME_THUMB_IMG_URL,
             COLUME__STATS_VOTE_CONTENT,
             COLUME_STATS_COMMENT_CONTENT,
             COLUME_STATS_COMMENT_DETAIL_URL,
             COLUME_MD5) in cursor:
            jokeBean = JokeBean()
            return jokeBean.create_joke_bean(COLUME_AUTHOR_NICK_NAME,
                                             COLUME_AUTHOR_GENDER,
                                             COLUME_AUTHOR_AGE,
                                             COLUME_AUTHOR_IMG_URL,
                                             COLUME_CONTENT,
                                             COLUME_THUMB_IMG_URL,
                                             COLUME__STATS_VOTE_CONTENT,
                                             COLUME_STATS_COMMENT_CONTENT,
                                             COLUME_STATS_COMMENT_DETAIL_URL,
                                             COLUME_MD5)
        return None

               

上一篇:vscode封装PHP代码片段


下一篇:学习周记 ( jQuery)