1.前言
对于一个应用来说,需要获取内容、服务端提供内容、客户端展示内容,这个三部分可以通过python,go,flutter结合从而实现一个应用。
2.Python爬虫获取内容
通过selenium调用浏览器内核,获取对应网页内容,并解析需要的内容,最后通过MySQL保存到数据库。
2.1安装python
前往官网下载安装包,我选择的python2
选择对应系统环境安装包,下载安装完成,设置环境变量
然后在终端输入"python --version",如果显示python版本则安装完成
zxl@zxl-7060:~$ python --version Python 2.7.12
2.2安装pip
pip 是 Python 包管理工具,该工具提供了对Python 包的查找、下载、安装、卸载的功能。
如果未安装,则通过命令curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py下载安装脚本
再通过命令sudo python get-pip.py安装脚本
通过命令pip --version来判断是否安装成功
出现如下类似信息,则安装成功
zxl@zxl-7060:~$ pip --version pip 19.1.1 from /home/mi/.local/lib/python2.7/site-packages/pip (python 2.7)
2.3安装selenium
Selenium 是一个用于Web应用程序测试的工具
通过python调用Selenium就像真正的用户在操作浏览器一样,可以很好的解决网页js加载等问题
pip install selenium
2.4选择浏览器驱动
以chrome为例,下载chrome driver,选择电脑中chrome浏览器对应版本的driver
2.4安装开发工具PyCharm
前往官网下载安装包,选择PyCharm下载
安装好开发工具,如下图所示创建好工程,准备开发
2.5安装MySQL
前往官网下载mysql,并进行安装
2.6安装mysql-connector
输入命令pip install mysql-connector
这样就可以操作mysql数据库了
2.7网页请求
1.设置浏览器驱动位置
2.设置不打开浏览器进行网页请求
3.网页请求
#!/usr/bin/python# coding=utf-8import platformfrom selenium import webdriverclass BaseRequest: def get_web_content(self, url): #chromedriver = "C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe" chromedriver = "/Users/zxl/Downloads/chromedriver" sysstr = platform.system() if sysstr == 'Darwin': chromedriver = "/Users/zxl/Downloads/chromedriver" elif sysstr == 'Windows': chromedriver = "D:\\my_github_workspace\\chromedriver.exe" elif sysstr == 'Linux': chromedriver = "/Users/zxl/Downloads/chromedriver" # 创建chrome参数对象 opt = webdriver.ChromeOptions() # 把chrome设置成***面模式,不论windows还是linux都可以,自动适配对应参数 opt.set_headless() prefs = {"profile.managed_default_content_settings.images": 2} opt.add_experimental_option("prefs", prefs) # 创建chrome***面对象 driver = webdriver.Chrome(executable_path=chromedriver, options=opt) driver.get(url) return driver
2.8网页内容解析
1.网页请求成功后,获取到该网页对象driver
2.通过xpath进行页面标签解析
3.解析完成关闭浏览器driver
#!/usr/bin/python# coding=utf-8import datetimeimport hashlibimport refrom selenium.common.exceptions import NoSuchElementExceptionfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.wait import WebDriverWaitfrom selenium.webdriver.support import expected_conditionsfrom com_zxl_spider_db.JokeDB import JokeDBfrom com_zxl_spider_request.BaseRequest import *from com_zxl_spider_data.JokeBean import *class RequestQsbkTxt(BaseRequest): def __init__(self): global jokeDB jokeDB = JokeDB() def parse(self, end_url, index): print "parse::end_url = ", end_url, "::index = ", index driver = self.get_web_content("https://www.qiushibaike.com/" + end_url + str(index)) elem1 = WebDriverWait(driver, 10).until( expected_conditions.presence_of_element_located((By.XPATH, '//ul[@class="pagination"]'))) print "elem1 = ", elem1 elem2 = WebDriverWait(driver, 10).until( expected_conditions.presence_of_element_located((By.XPATH, '//div[@class="article block untagged mb15"]'))) print "elem2 = ", elem2 # page_source = driver.page_source isFindNextPage = False paginationObject = driver.find_element_by_xpath('//ul[@class="pagination"]') pageListObject = paginationObject.find_elements_by_xpath('.//li') for pageItemObject in pageListObject: page_index_txt = pageItemObject.text print "pageItemObject::page_index_txt = ", page_index_txt itemFindResult = re.findall(".*?(\d+).*?", page_index_txt) print "pageItemObject::itemFindResult = ", itemFindResult if len(itemFindResult) > 0: if int(itemFindResult[0]) > index: index = int(itemFindResult[0]) isFindNextPage = True break # if index - int(itemFindResult[0]) == 1: # index = int(itemFindResult[0]) # isFindNextPage = True # break print "parse::isFindNextPage = ", isFindNextPage, "::index = ", index, "::end_url = ", hotPicJokeItemPath = '//div[@class="article block untagged mb15"]' hotPicJokeItems = driver.find_elements_by_xpath(hotPicJokeItemPath) print 'hotPicJokeItems length = ', len(hotPicJokeItems) for hotPicJokeItem in hotPicJokeItems: jokeId = hotPicJokeItem.get_attribute('id') md5Object = hashlib.md5() md5Object.update(jokeId.encode('utf-8')) jokeMd5Value = md5Object.hexdigest() authorObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="author clearfix"]') authorNickObject = authorObject.find_element_by_xpath('.//h2') authorNickName = authorNickObject.text authorImgObject = authorObject.find_element_by_xpath('.//img') authorImgUrl = authorImgObject.get_attribute('src') authorGender = '' authorAge = -1 try: authorGenderObject = authorObject.find_element_by_xpath(".//div[starts-with(@class,'articleGender')]") authorGender = authorGenderObject.get_attribute('class') authorAge = authorGenderObject.text except NoSuchElementException as e: print e contentObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="content"]') content = contentObject.text thumbImgUrl = '' try: thumbObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="thumb"]') thumbImgObject = thumbObject.find_element_by_xpath('.//img') thumbImgUrl = thumbImgObject.get_attribute('src') except NoSuchElementException as e: print e statsVoteContent = '' statsCommentContent = '' statsCommentDetailUrl = '' try: statsObject = hotPicJokeItem.find_element_by_xpath('.//div[@class="stats"]') try: statsVoteObject = statsObject.find_element_by_xpath('.//span[@class="stats-vote"]') statsVoteContent = statsVoteObject.text except NoSuchElementException as e: print e try: statsCommentObject = statsObject.find_element_by_xpath('.//span[@class="stats-comments"]') statsCommentContent = statsCommentObject.find_element_by_xpath( './/a[@class="qiushi_comments"]').text statsCommentDetailUrl = statsCommentObject.find_element_by_xpath( './/a[@class="qiushi_comments"]').get_attribute('href') except NoSuchElementException as e: print e except NoSuchElementException as e: print e # print authorNickName # print authorGender # print authorAge # print authorImgUrl # print content # print thumbImgUrl # print statsVoteContent # print statsCommentContent # print statsCommentDetailUrl # print jokeId # print jokeMd5Value # print '\n' # print '======================================end==========================================' # print '\n' joke_bean = JokeBean() joke_bean = joke_bean.create_joke_bean( authorNickName.encode('utf-8'), authorGender, authorAge, authorImgUrl, content.encode('utf-8'), thumbImgUrl, statsVoteContent, statsCommentContent, statsCommentDetailUrl, jokeMd5Value) isExistJokeItem = jokeDB.query_by_md5(jokeMd5Value) print isExistJokeItem if isExistJokeItem is None: print "not ExistJokeItem" jokeDB.insert_joke(joke_bean) else: print "ExistJokeItem" driver.close() return print "==============end=================" print "\n" driver.close() if not isFindNextPage: return else: self.parse(end_url, index) def clas_db(self): if jokeDB is not None: jokeDB.close_db() def start_task(self): print "start_task::", 'Now Time::', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') self.parse("pic/page/", 1) self.clas_db()if __name__ == "__main__": request = RequestQsbkTxt() # request.parse("pic/page/", 1) request.parse("pic/page/", 1) request.clas_db()
2.9数据集合
1.根据解析内容构造数据集合
#!/usr/bin/python# coding=utf-8class JokeBean: def create_joke_bean(self, author_nick_name, author_gender, author_age, author_img_url, content, thumb_img_url, stats_vote_content, stats_comment_content, stats_comment_detail_url, md5): bean = {'author_nick_name': author_nick_name, 'author_gender': author_gender, 'author_age': author_age, 'author_img_url': author_img_url, 'content': content, 'thumb_img_url': thumb_img_url, 'stats_vote_content': stats_vote_content, 'stats_comment_content': stats_comment_content, 'stats_comment_detail_url': stats_comment_detail_url, 'md5': md5} return bean
2.10保存数据库
1.设置数据库ip地址,端口号
2.设置连接数据用户名,密码
3.设置连接的数据库名称
4.如果数据库不存在,则创建数据库,并创建相应的表
5.设置增删改查操作
#!/usr/bin/python# coding=utf-8import mysql.connectorfrom mysql.connector import errorcodeclass BaseDB: host = 'zxltest.zicp.vip' port = '42278' urser_name = "***" pass_word = "***" db_name = 'joke' CREATE_TABLE_SQL = ("") def __init__(self): global cnx global cursor try: cnx = mysql.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port, database=self.db_name) cursor = cnx.cursor() except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") exit(1) elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") cnx = mysql.connector.connect(user=self.urser_name, password=self.pass_word, host=self.host, port=self.port) cursor = cnx.cursor() self.__create_database() self.__create_table() else: print(err) exit(1) else: self.__create_table() print("DBUtil init finish") def __create_database(self): try: cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(self.db_name)) cnx.database = self.db_name print("Create database finish") except mysql.connector.Error as err: print("Failed creating database: {}".format(err)) exit(1) def __create_table(self): # for name, ddl in CityDB.TABLES.iteritems(): print "create table::", self.CREATE_TABLE_SQL try: print("Creating table {}: ".format(self.CREATE_TABLE_SQL),) cursor.execute(self.CREATE_TABLE_SQL) except mysql.connector.Error as err: if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: print("already exists.") else: print(err.msg) exit(1) else: print("OK") def query(self, sql_str): print "query::", sql_str cursor.execute(sql_str) return cursor def insert(self, sql_str, param): cursor.execute(sql_str, param) cnx.commit() def update(self, sql_str): cursor.execute(sql_str) cnx.commit def delete(self, sql_str): cursor.execute(sql_str) cnx.commit() def close_db(self): cursor.close() cnx.close()
#!/usr/bin/python# coding=utf-8import mysqlfrom mysql.connector import errorcodefrom com_zxl_spider_data.JokeBean import JokeBeanfrom com_zxl_spider_db.BaseDB import BaseDBclass JokeDB(BaseDB): TABLE_NAME = 'joke' COLUME_ID = 'id' COLUME_AUTHOR_NICK_NAME = 'author_nick_name' COLUME_AUTHOR_GENDER = 'author_gender' COLUME_AUTHOR_AGE = 'author_age' COLUME_AUTHOR_IMG_URL = 'author_img_url' COLUME_CONTENT = 'content' COLUME_THUMB_IMG_URL = 'thumb_img_url' COLUME__STATS_VOTE_CONTENT = 'stats_vote_content' COLUME_STATS_COMMENT_CONTENT = 'stats_comment_content' COLUME_STATS_COMMENT_DETAIL_URL = 'stats_comment_detail_url' COLUME_MD5 = 'md5' CREATE_TABLE_SQL = ( "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (" " " + COLUME_ID + " bigint(20) NOT NULL AUTO_INCREMENT," " " + COLUME_AUTHOR_NICK_NAME + " varchar(16)," " " + COLUME_AUTHOR_GENDER + " text," " " + COLUME_AUTHOR_AGE + " text," " " + COLUME_AUTHOR_IMG_URL + " text," " " + COLUME_CONTENT + " text," " " + COLUME_THUMB_IMG_URL + " text," " " + COLUME__STATS_VOTE_CONTENT + " text," " " + COLUME_STATS_COMMENT_CONTENT + " text," " " + COLUME_STATS_COMMENT_DETAIL_URL + " text," " " + COLUME_MD5 + " text," " PRIMARY KEY (" + COLUME_ID + ")" ") ENGINE=InnoDB") INSERT_JOKE_SQL = ("INSERT INTO " + TABLE_NAME + " (" + COLUME_AUTHOR_NICK_NAME + "," + COLUME_AUTHOR_GENDER + "," + COLUME_AUTHOR_AGE + "," + COLUME_AUTHOR_IMG_URL + "," + COLUME_CONTENT + "," + COLUME_THUMB_IMG_URL + "," + COLUME__STATS_VOTE_CONTENT + "," + COLUME_STATS_COMMENT_CONTENT + "," + COLUME_STATS_COMMENT_DETAIL_URL + "," + COLUME_MD5 + ") " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)") QUERY_JOKE_BY_MD5 = ("SELECT " + COLUME_AUTHOR_NICK_NAME + "," + COLUME_AUTHOR_GENDER + "," + COLUME_AUTHOR_AGE + "," + COLUME_AUTHOR_IMG_URL + "," + COLUME_CONTENT + "," + COLUME_THUMB_IMG_URL + "," + COLUME__STATS_VOTE_CONTENT + "," + COLUME_STATS_COMMENT_CONTENT + "," + COLUME_STATS_COMMENT_DETAIL_URL + "," + COLUME_MD5 + " FROM " + TABLE_NAME + " WHERE " + COLUME_MD5 + " = '%s'") def create_insert_data(self, joke_bean): return ( joke_bean['author_nick_name'], joke_bean['author_gender'], joke_bean['author_age'], joke_bean['author_img_url'], joke_bean['content'], joke_bean['thumb_img_url'], joke_bean['stats_vote_content'], joke_bean['stats_comment_content'], joke_bean['stats_comment_detail_url'], joke_bean['md5'] ) def insert_joke(self, joke_bean): self.insert(self.INSERT_JOKE_SQL, self.create_insert_data(joke_bean)) def query_by_md5(self, md5): cursor = self.query(self.QUERY_JOKE_BY_MD5 % (md5,)) for (COLUME_AUTHOR_NICK_NAME, COLUME_AUTHOR_GENDER, COLUME_AUTHOR_AGE, COLUME_AUTHOR_IMG_URL, COLUME_CONTENT, COLUME_THUMB_IMG_URL, COLUME__STATS_VOTE_CONTENT, COLUME_STATS_COMMENT_CONTENT, COLUME_STATS_COMMENT_DETAIL_URL, COLUME_MD5) in cursor: jokeBean = JokeBean() return jokeBean.create_joke_bean(COLUME_AUTHOR_NICK_NAME, COLUME_AUTHOR_GENDER, COLUME_AUTHOR_AGE, COLUME_AUTHOR_IMG_URL, COLUME_CONTENT, COLUME_THUMB_IMG_URL, COLUME__STATS_VOTE_CONTENT, COLUME_STATS_COMMENT_CONTENT, COLUME_STATS_COMMENT_DETAIL_URL, COLUME_MD5) return None