逆向爬虫21 整合Scrapy-Redis-Splash-Bloom功能
一. 场景引入
前面我们学了Scrapy框架,分布式Scrapy,Scrapy_Splash以及布隆过滤的工作原理。现在提一个需求,如果要把这些功能全部都用上该怎么办?
本节我们需要动用洪荒之力来阅读框架和模块源码,手动将这些功能融合起来,来实现一个可以具备Scrapy_redis分布式调度器,Bloom过滤器和Splash的Js渲染的爬虫功能。下图是我们这一节要实现的功能,红框内部是我们需要特殊处理的部分。
二. 原理说明
涉及到的模块:
① scrapy_redis:原本是用于分布式爬虫的调度器使用的,调度器具备过滤和调度作用,这里我们只用他的调度功能。
② scrapy_splash:用于对splash服务器发起请求。
③ scrapy_redis_bloomfilter:在scrapy_redis的基础上用上布隆过滤的功能。
由于使用这每一个模块时,都需要在settings.py文件中配置各自的DUPEFILTER_CLASS属性。
DUPEFILTER_CLASS = ‘scrapy_splash.SplashAwareDupeFilter’
DUPEFILTER_CLASS = “scrapy_redis.dupefilter.RFPDupeFilter”
DUPEFILTER_CLASS = “scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter”
这里希望三者都可以用上,总不能将三者都写上吧?因此这里我们需要自己写一个过滤器,同时将三者的功能都写进来,最后指定我们自己写的过滤器。
需求说明:
-
爬虫
请求是splash请求。 -
调度器
是scrapy_redis,可以实现分布式爬虫。 -
过滤器
是布隆过滤器,替换掉scrapy_redis中的过滤器。
三. 源码阅读
scrapy_redis过滤器
scrapy_redis_bloomfilter过滤器
scrapy_splash过滤器
源码说明:
从上面源码可知,scrapy_redis过滤器和布隆过滤器都继承了scrapy的过滤器,并且都重写了基类的函数,这次我们的目标是在分布式的基础上用上布隆过滤器,同时还要用上scrapy_splash的过滤器,因此我们可以使用如下方法。
- 自己写一个过滤器继承scrapy_redis的过滤器。
- 把scrapy_splash的过滤器中的功能添加进自己的过滤器中,仔细阅读源码发现,它重写了父类scrapy_redis的request_fingerprint函数,但它和父类的request_fingerprint兼容,因为他们都执行了scrapy过滤器中的request_fingerprint函数,只不过scrapy_splash在执行完成之后,又执行了自己独有的操作。
- 把布隆过滤器里的所有函数都复制进我们写的过滤器中,直接重写基类scrapy_redis的过滤器功能,这里不用担心冲突,因为需求就是用布隆过滤器代替scrapy_redis中的过滤器,所以直接复制进来重写父类功能即可。
最终dupefilter.py效果如下:
from scrapy_redis.dupefilter import RFPDupeFilter as BaseRFPDupeFilter
from scrapy.utils.url import canonicalize_url
from scrapy.utils.request import request_fingerprint
from scrapy_splash.utils import dict_hash
from copy import deepcopy
import logging
import time
from scrapy_redis_bloomfilter.defaults import BLOOMFILTER_HASH_NUMBER, BLOOMFILTER_BIT, DUPEFILTER_DEBUG
from scrapy_redis_bloomfilter import defaults
from scrapy_redis.connection import get_redis_from_settings
from scrapy_redis_bloomfilter.bloomfilter import BloomFilter
logger = logging.getLogger(__name__)
def splash_request_fingerprint(request, include_headers=None):
fp = request_fingerprint(request, include_headers=include_headers)
if 'splash' not in request.meta:
return fp
splash_options = deepcopy(request.meta['splash'])
args = splash_options.setdefault('args', {})
if 'url' in args:
args['url'] = canonicalize_url(args['url'], keep_fragments=True)
return dict_hash(splash_options, fp)
# 你现在这个玩意已经可以兼容redis+splash
# 继承了RedisDupeFilter.
# 把splash的东西. 复制过来了
# 把bloom的东西全部拷贝过来
class MyDupeFilter(BaseRFPDupeFilter):
logger = logger
def __init__(self, server, key, debug, bit, hash_number):
self.server = server
self.key = key
self.debug = debug
self.logdupes = True
self.bit = bit
self.hash_number = hash_number
self.bf = BloomFilter(server, self.key, bit, hash_number)
@classmethod
def from_settings(cls, settings):
server = get_redis_from_settings(settings)
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG)
bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT)
hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER)
return cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number)
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
return instance
@classmethod
def from_spider(cls, spider):
settings = spider.settings
server = get_redis_from_settings(settings)
dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)
key = dupefilter_key % {'spider': spider.name}
debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG)
bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT)
hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER)
print(key, bit, hash_number)
instance = cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number)
return instance
def request_seen(self, request):
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
if self.bf.exists(fp):
return True
self.bf.insert(fp)
return False
def log(self, request, spider):
if self.debug:
msg = "Filtered duplicate request: %(request)s"
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
elif self.logdupes:
msg = ("Filtered duplicate request %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)")
self.logger.debug(msg, {'request': request}, extra={'spider': spider})
self.logdupes = False
spider.crawler.stats.inc_value('bloomfilter/filtered', spider=spider)
def request_fingerprint(self, request):
return splash_request_fingerprint(request)
settings.py源码:
BOT_NAME = 'news'
SPIDER_MODULES = ['news.spiders']
NEWSPIDER_MODULE = 'news.spiders'
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
LOG_LEVEL = "WARNING"
# scrapy_splash
# 渲染服务的url, 这里换成你自己的
SPLASH_URL = 'http://192.168.31.172:8050'
# 下载器中间件, 这个必须要配置
DOWNLOADER_MIDDLEWARES = {
'scrapy_splash.SplashCookiesMiddleware': 723,
'scrapy_splash.SplashMiddleware': 725,
'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 810,
}
# 这个可由可无
# SPIDER_MIDDLEWARES = {
# 'scrapy_splash.SplashDeduplicateArgsMiddleware': 100,
# }
# 更换为自己的过滤器. 同时兼容redis, splash, bloom
DUPEFILTER_CLASS = 'news.dupefilter.MyDupeFilter'
# 使用Splash的Http缓存, 这个必须要配置
HTTPCACHE_STORAGE = 'scrapy_splash.SplashAwareFSCacheStorage'
# redis
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 301, # 可选项
}
# redis相关配置
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_DB = 14
REDIS_PARAMS = {
"password": "123456",
}
# scrapy_redis相关配置
# scrapy-redis配置信息 # 固定的
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
SCHEDULER_PERSIST = True # 如果为真. 在关闭时自动保存请求信息, 如果为假, 则不保存请求信息
# DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 注意这里
# bloom
# 去重类,要使用 BloomFilter 请替换 DUPEFILTER_CLASS
# DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"
# 哈希函数的个数,默认为 6,可以自行修改
BLOOMFILTER_HASH_NUMBER = 6
# BloomFilter 的 bit 参数,默认 30,占用 128MB 空间,去重量级 1 亿
BLOOMFILTER_BIT = 30
wangyi.py文件:
wangyi.py源码:
import scrapy
from scrapy_splash.request import SplashRequest
from scrapy_redis.spiders import RedisSpider
from news.items import NewsItem
lua_source = """
function main(splash, args)
assert(splash:go(args.url))
assert(splash:wait(2))
-- 准备一个js函数. 预加载
-- jsfunc是splash预留的专门为了js代码和lua代码结合准备的
get_btn_display = splash:jsfunc([[
function(){
return document.getElementsByClassName('load_more_btn')[0].style.display;
}
]])
while(true)
do
splash:runjs("document.getElementsByClassName('load_more_btn')[0].scrollIntoView(true)")
splash:select(".load_more_btn").click()
splash:wait(1)
-- 判断load_more_btn是否是none.
display = get_btn_display()
if(display == 'none')
then
break
end
end
return splash:html() -- 直接返回页面源代码
end
"""
class WangyiSpider(RedisSpider):
name = 'wangyi'
allowed_domains = ['163.com']
# redis_key = "wangyi:news:start_urls"
start_urls = ['https://news.163.com/']
# 重写start_request, 这地方由于重写了start_request, 就不需要redis_key了
def start_requests(self):
yield SplashRequest(
url=self.start_urls[0],
callback=self.parse,
endpoint="execute", # 终端表示你要执行哪一个splash的服务
args={
"lua_source": lua_source
},
dont_filter=True # 不去重. 直接进队列
)
def parse(self, response):
divs = response.xpath("//ul[@class='newsdata_list fixed_bar_padding noloading']/li[1]/div[2]/div")
for div in divs:
a = div.xpath("./div/div/h3/a")
if not a: # 过滤掉广告
continue
a = a[0]
xw = NewsItem()
xw['url'] = a.xpath("./@href").extract_first()
xw['title'] = a.xpath("./text()").extract_first()
yield xw
print(1)
yield scrapy.Request(
url=xw['url'],
)
items.py源码:
import scrapy
class NewsItem(scrapy.Item):
# define the fields for your item here like:
title = scrapy.Field()
url = scrapy.Field()
四. 小结
本节内容难度较大,很少会遇到这样的情况,如果以后遇到类似的需求,希望可以从中借鉴整合框架模块代码的能力。本节对面向对象的要求较高,需要合理根据需求,利用面向对象的特性做到最少代码改动。