第三百四十八节,Python分布式爬虫打造搜索引擎Scrapy精讲—通过自定义中间件全局随机更换代理IP
设置代理ip只需要,自定义一个中间件,重写process_request方法,
request.meta['proxy'] = "http://185.82.203.146:1080" 设置代理IP
中间件,注意将中间件注册到配置文件里去
from adc.daili_ip.sh_yong_ip.sh_yong_ip import sui_ji_hq_ip from fake_useragent import UserAgent #导入浏览器用户代理模块 class RequestsUserAgentmiddware(object): #自定义浏览器代理中间件
#中间件随机更换Requests请求头信息的User-Agent浏览器用户代理
def __init__(self,crawler):
super(RequestsUserAgentmiddware, self).__init__() #获取上一级父类基类的,__init__方法里的对象封装值
self.ua = UserAgent() #实例化浏览器用户代理模块类
self.ua_type = crawler.settings.get('RANDOM_UA_TYPE','random') #获取settings.py配置文件里的RANDOM_UA_TYPE配置的浏览器类型,如果没有,默认random,随机获取各种浏览器类型 @classmethod #函数上面用上装饰符@classmethod,函数里有一个必写形式参数cls用来接收当前类名称
def from_crawler(cls, crawler): #重载from_crawler方法
return cls(crawler) #将crawler爬虫返回给类 def process_request(self, request, spider): #重载process_request方法
def get_ua(): #自定义函数,返回浏览器代理对象里指定类型的浏览器信息
return getattr(self.ua, self.ua_type)
sssf = get_ua()
print('启用用户代理浏览器信息:{0}'.format(sssf))
request.headers.setdefault('User-Agent', get_ua()) #将浏览器代理信息添加到Requests请求 class MyproxiesSpiderMiddleware(object):
#中间件随机更换IP def process_request(self, request, spider): #重写process_request方法
#到数据库随机获取一个IP xieyi = request._get_url() #_get_url可以获取到请求URL,来判断是什么协议请求如https
print(xieyi)
dai_ip = sui_ji_hq_ip('http') #到数据库随机获取一个代理IP
request.meta['proxy'] = "http://{0}".format(dai_ip) #字符串格式化设置代理IP #request.meta['proxy'] = "http://185.82.203.146:1080" 设置代理IP
随机数据库获取IP
#!/usr/bin/env python
# -*- coding:utf8 -*-
import time import requests from adc.daili_ip.mysq import shujuku as ORM def suiji_ip(rst):
"""
调用此函数随机到数据库获取代理IP返回IP,如果IP不可用会自动删除返回False
"""
atime = time.localtime(time.time()-240) #设置获取多少时间以内检测过的IP(单位秒)
sudu = '00:00:03' #设置获取访问速度小于等于多少的IP,单位(时分秒)默认3秒
dqatime = "{0}-{1}-{2} {3}:{4}:{5}".format(
atime.tm_year,
atime.tm_mon,
atime.tm_mday,
atime.tm_hour,
atime.tm_min,
atime.tm_sec
) # 将格式化时间日期,单独取出来拼接成一个完整日期 try:
mysq = ORM.session()
shuju = mysq.query(
ORM.daili_ip.ip,
ORM.daili_ip.port,
ORM.daili_ip.xtype,
ORM.daili_ip.seshi_ri_qi,
ORM.daili_ip.connectTimeMs
).from_statement(
"SELECT ip,port,xtype,seshi_ri_qi,connectTimeMs FROM daili_ip WHERE xtype='{0}' AND ce_shi='{1}' AND seshi_ri_qi>='{2}' AND connectTimeMs<='{3}' ORDER BY RAND() LIMIT 1".format(rst, '', dqatime, sudu)
).all()
mysq.close()
if shuju:
print('获取到IP')
else:
print('获取IP失败,请检查获取条件')
except Exception as e:
print('查询代理IP数据出错')
return True
ip = shuju[0][0]
duan_kou = shuju[0][1]
print('启用代理IP,数据库获取到IP:{0}'.format(shuju)) http_url = '{0}://image.baidu.com/'.format(rst)
proxy_url = '{0}://{1}:{2}'.format(rst, ip, duan_kou)
headers = {
'Referer': http_url,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0',
} print('启用代理IP,测试网址:{0}'.format(http_url))
print('启用代理IP,测试头:{0}'.format(proxy_url))
try:
proxy_dict = {
'http': proxy_url
}
response = requests.get(http_url, proxies=proxy_dict, headers=headers)
except Exception as e:
print('启用代理IP,测速连接失败{0}'.format(e))
print('启用代理IP,测速连接失败,当前IP不可用,删除当前ip!')
fanhui = mysq.query(ORM.daili_ip).filter(ORM.daili_ip.ip == ip).delete() # 删除不可以数据
mysq.commit()
mysq.close()
if fanhui == 1:
print("成功删除当前IP")
else:
print('删除当前IP失败')
return False
else:
code = response.status_code # 获取状态吗
sudu = str(response.elapsed) # 获取响应时间
if code >= 200 and code < 300:
atime = time.localtime()
dqatime = "{0}-{1}-{2} {3}:{4}:{5}".format(
atime.tm_year,
atime.tm_mon,
atime.tm_mday,
atime.tm_hour,
atime.tm_min,
atime.tm_sec
) # 将格式化时间日期,单独取出来拼接成一个完整日期 print('启用代理IP,测试代理ip--{0}{1}--状态可用--状态码--{2}'.format(ip, duan_kou, code))
print('启用代理IP,当前IP可以,正在向数据库标记')
fanhui = mysq.query(ORM.daili_ip).filter(ORM.daili_ip.ip == ip).update({
"ce_shi": "",
"seshi_ri_qi": dqatime,
"connectTimeMs": sudu
})
mysq.commit()
mysq.close()
if fanhui == 1:
print('向数据库成功标记可用IP!')
else:
print('向数据库标记可用IP失败!!!')
print('向爬虫返回IP:{0}:{1}'.format(ip, duan_kou))
return ip + ':' + duan_kou
else:
print('启用代理IP,测试代理ip--{0}{1}--状态不可用--状态码--{2}'.format(ip, duan_kou, code))
print('返回状态码不可以,正在向数据库删除当前IP')
fanhui = mysq.query(ORM.daili_ip).filter(ORM.daili_ip.ip == ip).delete() # 删除不可以数据
mysq.commit()
mysq.close()
if fanhui == 1:
print('删除当前IP成功')
else:
print('删除当前IP失败')
return False def sui_ji_hq_ip(rst):
"""
正式使用:调用此函数,接收一个参数协议,如http
循环到数据库获取IP,IP如果不可用删除后继续获取,直到ip可以后返回ip
值循环获取测试30分钟内有效的IP
"""
n = True
h = None
while n:
youxiao_ip = suiji_ip(rst)
if youxiao_ip:
h = youxiao_ip
n = False
return h # print(sui_ji_hq_ip('http'))
数据库模块文件
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,text,DATETIME,TIME
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine import requests
import json
import time
import datetime #配置数据库引擎信息
ENGINE = create_engine("mysql+pymysql://root:279819@127.0.0.1:3306/cshi?charset=utf8", max_overflow=500, echo=True) Base = declarative_base() #创建一个SQLORM基类 class daili_ip(Base): #ip池设计表
__tablename__ = 'daili_ip' id = Column(Integer, primary_key=True, autoincrement=True)
ip = Column(String(300), unique=True) #IP
port = Column(String(300)) #端口
city = Column(String(300)) #城市
isp = Column(String(300)) #运营商
connectTimeMs = Column(TIME()) #速度
anonymity = Column(String(300)) #匿名方式
country = Column(String(300)) #国家
xtype = Column(String(300)) #协议
zhuang_tai_ma = Column(String(300)) #状态码
ruku_riqi = Column(DATETIME()) #入库日期
ce_shi = Column(String(300)) #测试状态
seshi_ri_qi = Column(DATETIME()) #测试日期
shi_xiao_riqi = Column(DATETIME()) # 失效日期 def init_db():
Base.metadata.create_all(ENGINE) #向数据库创建指定表 def drop_db():
Base.metadata.drop_all(ENGINE) #向数据库删除指定表 def session():
cls = sessionmaker(bind=ENGINE) #创建sessionmaker类,操作表
return cls() # drop_db() #删除表
# init_db()