Scrapy实战篇(七)之爬取爱基金网站基金业绩数据

  本篇我们以scrapy+selelum的方式来爬取爱基金网站(http://fund.10jqka.com.cn/datacenter/jz/)的基金业绩数据.

  思路:我们以http://fund.10jqka.com.cn/datacenter/jz/网站作为起始,首先抓取页面中基金的详细页面地址,类似于http://fund.10jqka.com.cn/004551/的链接,在组装成http://fund.10jqka.com.cn/004551/historynet.html#historynet形式的地址,抓取其页面下的净值列表数据。

1、首先我们定义需要抓取的字段,如下:

class fundajjItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
#产品编号,日期,单位净值(元),累计净值(元),fqnet,增长值(元),增长率
product_id = Field()
date = Field()
net = Field()
totalnet = Field()
fqnet = Field()
inc = Field()
rate = Field()

2、从初始地址http://fund.10jqka.com.cn/datacenter/jz/抓取基金详细地址;因为初始页面无法通过直接请求得到网页源代码,所以通过selenium直接的防止直接返回网页源代码,而基金业绩数据的页面可以通过直接请求网址得到,所以不用selenium抓取,直接使用下载器下载数据;所以在下载器中间件中我们使用网址的不同形式判断是否使用selenium返回网页源代码;

class SeleniumMiddleware():
#通过类方法from_crawler获取的参数必须放在__init__()方法的第一个参数位置上,除self;否则报错
def __init__(self, MP,timeout=30 ):
self.timeout = timeout
self.browser = webdriver.Chrome()
self.browser.maximize_window()
self.browser.set_page_load_timeout(self.timeout)
self.wait = WebDriverWait(self.browser,self.timeout)
self.MAX_PAGE = MP @classmethod
def from_crawler(cls, crawler):
return cls(MP = crawler.settings.get('MAX_PAGE')) def process_request(self, request, spider):
'''
1、在下载器中间件中对接使用selenium,输出源代码之后,构造htmlresponse对象,直接返回给spider解析页面,提取数据,并且也不在执行下载器下载页面动作 2、通过下载器执行器下载数据,不在通过selenium 3、当网址是http://fund.10jqka.com.cn/datacenter/jz/时,因无法直接获取网页源代码,使用selenim直接回去源代码返回的方式处理,
当网址是http://fund.10jqka.com.cn/000074/historynet.html#historynet类型时,使用框架的下载器下载数据,返回给spider文件处理
通过获取spider文件的url,根据url判断是否使用selenium下载数据
'''
url = request.url
if url.endswith("historynet.html#historynet"):
return None
else:
self.wait = WebDriverWait(self.browser, self.timeout)
try: time.sleep(5)
self.browser.get(request.url) #MAX_PAGE暂时先写死,在settings中配置 可以写成动态变量
#思路:观察目标网站,不断下拉滚动条加载新数据,每页80条数据,可以第一次获取页面总共有多少只基金产品总数据,
#除以80即为需要下拉的次数
for i in range(1, self.MAX_PAGE):
#执行js代码,将滚动条下拉到最下面
self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
time.sleep(2) time.sleep(5)
response = self.browser.page_source
return HtmlResponse(url=request.url, body=response, request=request, encoding='utf-8',status=200)
except TimeoutException:
return HtmlResponse(url=request.url, status=500, request=request)
finally:
self.browser.close()

另外我们在下载器中间件中设置随机获取user-agent,伪装成各种各种的浏览器,MY_USER_AGENT在settings中以列表的形式配置。

#在下载器中间件中修改User-Agent的值,伪装成不同的浏览器
class RandomUserAgentMiddleware():
def __init__(self,UA):
self.user_agents = UA @classmethod
def from_crawler(cls, crawler):
return cls(UA = crawler.settings.get('MY_USER_AGENT')) def process_request(self,request,spider):
request.headers['User-Agent'] = random.choice(self.user_agents) def process_response(self,request, response, spider):
return response

3、编写spider文件;parse()方法我们从初始页面获取到基金详细页面的地址,并在后面拼接 historynet.html#historynet,以获取净值列表页面的地址;然后在解析历史数据页面,获取基金业绩历史数据。返回给pipeline。

#解析由http://fund.10jqka.com.cn/datacenter/jz/ 防止返回的response,返回的网址为http://fund.10jqka.com.cn/000074/historynet.html#historynet格式
def parse(self, response):
soup = BeautifulSoup(response.text, 'lxml')
funds = soup.find_all(name='tbody', id='containerMain')
for fund in funds:
for f in fund.find_all(name='tr', rel='tpl'):
for h in f.find_all(name='a', field='name'):
url = h['href'] + 'historynet.html#historynet'
yield Request(url = url , callback= self.parse_fund_detail) def parse_fund_detail(self,response):
url = response.url
prod_id = url.split('/')[-2]
soup = BeautifulSoup(response.text, 'lxml')
datas = soup.find_all(name='script', type='text/javascript')
for data in datas:
if data.text == '':
continue
else:
item = fundajjItem() #使用我们之前定义的item
data = data.text
data_list = data.split('=')
d_list = data_list[1].replace(';', '')
dj_list = json.loads(d_list)
for dj in dj_list:
dj['fundid'] = prod_id
item['product_id'] = dj.get('fundid')
item['date'] = dj.get('date')
item['net'] = dj.get('net')
item['totalnet'] = dj.get('totalnet')
item['fqnet'] = dj.get('fqnet')
item['inc'] = dj.get('inc')
item['rate'] = dj.get('rate')
yield item

4、修改pipeline,将获取到的数据存储到mysql和mongodb数据库中。

class MongoPipeline(object):

    def __init__(self,mongo_url,mongo_db,collection):
self.mongo_url = mongo_url
self.mongo_db = mongo_db
self.collection = collection @classmethod
def from_crawler(cls,crawler):
return cls(
mongo_url=crawler.settings.get('MONGO_URL'),
mongo_db = crawler.settings.get('MONGO_DB'),
collection = crawler.settings.get('COLLECTION')
) def open_spider(self,spider):
self.client = pymongo.MongoClient(self.mongo_url)
self.db = self.client[self.mongo_db] def process_item(self,item, spider):
name = self.collection
self.db[name].insert(dict(item))
return item def close_spider(self,spider):
self.client.close() class PymysqlPipeline(object):
def __init__(self,mysql_host,mysql_port,mysql_user,mysql_passwd,mysql_db):
self.host= mysql_host
self.port=mysql_port
self.user=mysql_user
self.passwd = mysql_passwd
self.db=mysql_db @classmethod
def from_crawler(cls,crawler):
return cls(
mysql_host = crawler.settings.get('MYSQL_HOST'),
mysql_port = crawler.settings.get('MYSQL_PORT'),
mysql_user=crawler.settings.get('MYSQL_USER'),
mysql_passwd = crawler.settings.get('MYSQL_PASSWD'),
mysql_db = crawler.settings.get('MYSQL_DB')
) def open_spider(self,spider):
self.dbconn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, port=self.port, db=self.db)
self.dbcur = self.dbconn.cursor() def process_item(self,item, spider):
items = dict(item)
fund_list = []
fund_list.append(items.get('product_id'))
fund_list.append(items.get('date'))
fund_list.append(float(items.get('net')))
fund_list.append(float(items.get('totalnet')))
fund_list.append(float(items.get('fqnet')))
fund_list.append(float(items.get('inc')))
fund_list.append(float(items.get('rate')))
self.dyn_insert_sql('Fund_date',tuple(fund_list),self.dbconn,self.dbcur) def close_spider(self,spider):
self.dbconn.close() def dyn_insert_sql(self,tablename, data, dbconn, cursor):
tablename = tablename
sql = "select GROUP_CONCAT(COLUMN_name,'') from information_schema.COLUMNS where table_name = %s ORDER BY ordinal_position "
cursor.execute(sql, tablename)
tup = cursor.fetchone()
# 动态构造sql语句
sql = 'INSERT INTO {table}({keys}) VALUES {values}'.format(table=tablename, keys=tup[0], values=data)
# 使用try-except语句块控制事务的原子性
try:
if cursor.execute(sql):
dbconn.commit()
except:
dbconn.rollback()

5、设置settings文件

配置mysql和mongo的链接信息

MONGO_URL='localhost'
MONGO_DB='test'
COLLECTION='Funddate' MYSQL_HOST='localhost'
MYSQL_PORT=3306
MYSQL_USER='root'
MYSQL_PASSWD=''
MYSQL_DB='test'

设置item_pipeline,激活mongo和mysql的pipeline组件

ITEM_PIPELINES = {
'scrapyfundajj.pipelines.MongoPipeline': 300,
'scrapyfundajj.pipelines.PymysqlPipeline': 310,
}

设置DOWNLOADER_MIDDLEWARES,激活自动以的下载器中间件

DOWNLOADER_MIDDLEWARES = {
#禁用掉框架内置的UserAgentMiddleware,使用自定义的RandomUserAgentMiddleware
'scrapy.downloadermiddleware.useragent.UserAgentMiddleware': None,
'scrapyfundajj.middlewares.RandomUserAgentMiddleware': 543,
'scrapyfundajj.middlewares.SeleniumMiddleware': 544,
'scrapyfundajj.middlewares.ScrapyfundajjDownloaderMiddleware':545,
}

其他设置

设置不遵循爬取协议
ROBOTSTXT_OBEY = False
#下载器在下载同一个网站下一个页面前需要等待的时间。该选项可以用来限制爬取速度, 减轻服务器压力。同时也支持小数
DOWNLOAD_DELAY = 3

至此,所有需要的工作已经全部处理完成,运行项目,获取数据即可。

完整代码路径:https://gitee.com/liangxinbin/Scrpay/tree/master/scrapyfundajj

 
上一篇:《一头扎进》系列之Python+Selenium框架实战篇7 - 年底升职加薪,年终奖全靠它!Merry Christmas


下一篇:jquery-三级联动