scheduler的调度规则

对爬虫的请求进行调度管理

允许接收requests并且会调度一个request去下载,且具有去重机制

优先级和队列不会被调度器执行(调度器不管优先级的问题),用户使用字段给每个Request对象,可以根据这些优先级去安排想要的优先级顺序

调度器使用两个优先级队列实例。设置在内存队列和磁盘队列中工作。当磁盘队列存在的时候,默认使用磁盘队列。当磁盘队列不能处理request的时候,使用内存队列

配置磁盘队列和内存队列,以允许requests存放进磁盘队列和内存队列中

总之,调度器是一个持有优先级队列和fallback逻辑的对象,并且可以处理去重策略

def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
logunser=False, stats=None, pqclass=None, crawler=None):
self.df = dupefilter#去重策略
self.dqdir = self._dqdir(jobdir)#磁盘队列工作目录
self.pqclass = pqclass
self.dqclass = dqclass
self.mqclass = mqclass
#日志,状态,爬虫
self.logunser = logunser
self.stats = stats
self.crawler = crawler




def from_crawler(cls, crawler):
settings = crawler.settings#导入crawler配置
#settings配置的去重策略的类配置
dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
dupefilter = create_instance(dupefilter_cls, settings, crawler)
#优先级队列
pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])
#优先级在优先级队列中抛过的警告
if pqclass is PriorityQueue:
warnings.warn("SCHEDULER_PRIORITY_QUEUE='queuelib.PriorityQueue'"
" is no longer supported because of API changes; "
"please use 'scrapy.pqueues.ScrapyPriorityQueue'",
ScrapyDeprecationWarning)
from scrapy.pqueues import ScrapyPriorityQueue
pqclass = ScrapyPriorityQueue
#磁盘队列
dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])
#内存队列
mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
#日志:记录不可序列化的requests日志
logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS',
settings.getbool('SCHEDULER_DEBUG'))
#return最终生成的对象
return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
stats=crawler.stats, pqclass=pqclass, dqclass=dqclass,
mqclass=mqclass, crawler=crawler)



#open一个爬虫,开启调度器
def open(self, spider):
self.spider = spider
self.mqs = self._mq()
self.dqs = self._dq() if self.dqdir else None
return self.df.open()


#当磁盘队列没有值的时候关闭调度器
def close(self, reason):
if self.dqs:
state = self.dqs.close()
self._write_dqs_state(self.dqdir, state)
return self.df.close(reason)


#如果requests已重复并已爬取的队列中,就抛出日志,并return false
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
dqok = self._dqpush(request)
if dqok:
self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
else:
self._mqpush(request)
self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
self.stats.inc_value('scheduler/enqueued', spider=self.spider)
return True


#从内存队列取request,如果取出来就抛出信号,如果没取出来就从磁盘队里中取,取出来也抛出信号,最终在确认一遍是否取出,取出的话就加入到以采队列中,并return 出去
def next_request(self):
request = self.mqs.pop()
if request:
self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
else:
request = self._dqpop()
if request:
self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
if request:
self.stats.inc_value('scheduler/dequeued', spider=self.spider)
return request

上一篇:scrapy 源码解析 (四):启动流程源码分析(四) Scheduler调度器


下一篇:Agent Registration Failed! Error: ORA-28000: The account is locked.