高性能异步爬虫

高性能异步爬虫

目的:在爬虫中使用异步实现高性能的数据爬取操作。

 

异步爬虫的方式:

一、多线程/多进程(不建议使用)

好处:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

弊端:开启多进程或都线程的方式,我们是无法无限制地开启多进程或多线程的:在遇到要同时处理成百上千个的连接请求时,则无论多线程还是多进程都会严重占据系统资源,

降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

 

二、线程池/进程池(适当使用)

好处:很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。可以很好的降低系统开销。

弊端:“线程池”和“连接池”技术也只是在一定程度上缓解了频繁创建和销毁线程带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

 

案例:对比同步和使用线程池的执行效率

import time
# 使用单线程串行方式模拟执行下载
def get_page(str):
    print("正在下载 :", str)
    time.sleep(2)
    print('下载成功:', str)
​
​
name_list = ['xiaozi', 'aa', 'bb', 'cc']
​
start_time = time.time()
​
for i in range(len(name_list)):
    get_page(name_list[i])
​
end_time = time.time() 
print('%d second' % (end_time - start_time))  # 输出结果:8 second
import time
from concurrent.futures import ThreadPoolExecutor
​
# 使用线程池方式执行
start_time = time.time()
​
​
def get_page(str):
    print("正在下载 :", str)
    time.sleep(2)
    print('下载成功:', str)
​
​
name_list = ['xiaozi', 'aa', 'bb', 'cc']
# 实例化一个线程池对象
executor = ThreadPoolExecutor(4)
# for each in name_list:
#     executor.submit(get_page, each)
executor.map(get_page, name_list)
executor.shutdown(True)
end_time = time.time()
print(end_time - start_time)  # 2.0088231563568115

三、单线程+异步协程

无论哪种解决方案其实没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来。

解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的。

在python3.4之后新增了asyncio模块,可以帮我们检测IO阻塞,然后实现异步IO。注意:asyncio只能发tcp级别的请求,不能发http协议。

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上, 当满足某些条件的时候,函数就会被循环执行。

  • coroutine:协程对象,我们可以将协程对象注册到事件循环中,它会被事件循环调用。 我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回 一个协程对象。

  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

  • future:代表将来执行或还没有执行的任务,实际上和 task 没有本质区别。

  • async 定义一个协程.

  • await 用来挂起阻塞方法的执行。

协程的使用

import asyncio
​
async def request(url):
    print('正在请求的url是', url)
    print('请求成功')
    return url
​
​
# async修饰的函数,调用之后返回的一个协程对象
c = request("www.google.com")
​
 # 创建一个事件循环对象
 loop = asyncio.get_event_loop()
 # 将协程对象注册到loop中,然后启动loop
 loop.run_until_complete(c)
​
# task的使用
 loop = asyncio.get_event_loop()
# 基于loop创建了一个task任务对象
 task = loop.create_task(c)
 print(task)
​
 loop.run_until_complete(task)
 print(task)
​
# future的使用
 loop = asyncio.get_event_loop()
 task = asyncio.ensure_future(c)
 print(task)
 loop.run_until_complete(task)
 print(task)
​
def callback_func(task):
    # result返回的就是任务对象中封装的协程对象的对应函数的返回值
    print(task.result())
​
​
# 绑定回调
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
# 将回调函数绑定到任务对象中
task.add_done_callback(callback_func)
loop.run_until_complete(task)

多任务协程:

如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asynciowait() 方法即可执行。

注:在异步协程中 如果出现了与同步模块相关的代码 则无法实现异步

import asyncio
import time
​
async def request(url):
    print('正在下载', url)
    # 在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步
    # time.sleep(2)  # 这是同步模块代码
    # 当在asyncio遇到阻塞操作必须进行手动挂起
    await asyncio.sleep(2)
    print('下载完成', url)
​
​
start_time = time.time()
urls = [
    'www.baidu,com',
    'www.apple,com',
    'www.google,com',
]
​
# 任务列表:存放多个任务对象
task_list = []
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    task_list.append(task)
​
loop = asyncio.get_event_loop()
# 需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(task_list))
​
print(time.time() - start_time)
​

 

 

四、aiohttp模块

接下来,咱们就可以尝试的将多任务异步协程应用到爬虫中进行试验,看是否能够实现多任务异步爬虫?为了表现出协程的优势,我们需要先创建一个合适的实验环境,最好的方法就是模拟一个需要等待一定时间才可以获取返回结果的网页,在本地模拟一个慢速服务器,这里我们选用 Flask。

 

服务器代码:

from flask import Flask
import time
app = Flask(__name__)
@app.route('/bobo')
def index_bobo():
    time.sleep(2)
    return 'Hello bobo'
@app.route('/jay')
def index_jay():
    time.sleep(2)
    return 'Hello jay'
@app.route('/tom')
def index_tom():
    time.sleep(2)
    return 'Hello tom'
if __name__ == '__main__':
    app.run(threaded=True)

接下来,将多任务异步协程操作应用在爬虫上:

import requests
import asyncio
import time
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom'
]
async def get_page(url):
    print('正在下载',url)
    response = requests.get(url=url)
    print('下载完毕:',response.text)
tasks = []
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('总耗时:',end-start)
​
# ########### 输出结果 ###############
# 正在下载 http://127.0.0.1:5000/bobo
# 下载完毕: Hello bobo
# 正在下载 http://127.0.0.1:5000/jay
# 下载完毕: Hello jay
# 正在下载 http://127.0.0.1:5000/tom
# 下载完毕: Hello tom
# 总耗时: 6.045619249343872

可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 6 秒,平均一个请求耗时 2 秒,说好的异步处理呢?

原因在于requests模块是非异步模块,要想实现真正的异步必须使用基于异步的网络请求模块所以这里就需要 aiohttp 派上用场了

 

aiohttp使用

发起请求:

params = {'key': 'value', 'page': 10} # 添加请求参数
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.baidu.com') as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

 

UA伪装:

url = 'http://httpbin.org/user-agent'
headers = {'User-Agent': 'test_user_agent'}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url,headers=headers) as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

 

自定义cookies:

url = 'http://httpbin.org/cookies'
cookies = {'cookies_name': 'test_cookies'}
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url,cookies=cookies) as resposne:
            print(await resposne.text())
loop = asyncio.get_event_loop()
tasks = [fetch(),]
loop.run_until_complete(asyncio.wait(tasks))

 

设置代理:

url = "http://python.org"
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get(url, proxy="http://some.proxy.com") as resposne:
        print(resposne.status)
loop = asyncio.get_event_loop()
tasks = [fetch(), ]
loop.run_until_complete(asyncio.wait(tasks))

 

 

异步IO处理:

在这里将请求由 requests 改成了 aiohttp,通过 aiohttp 的 ClientSession 类的 get() 方法进行请求,结果如下:

#环境安装:pip install aiohttp
#使用该模块中的ClientSession
import requests
import asyncio
import time
import aiohttp
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
]
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        #get()、post():
        #headers,params/data,proxy='http://ip:port'
        async with await session.get(url) as response:
            #text()返回字符串形式的响应数据
            #read()返回的二进制形式的响应数据
            #json()返回的就是json对象
            #注意:获取响应数据操作之前一定要使用await进行手动挂起
            page_text = await response.text()
            print(page_text)
tasks = []
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('总耗时:',end-start)
​
# ########### 输出结果 ###############
# Hello tom
# Hello jay
# Hello bobo
# Hello bobo
# Hello jay
# Hello bobo
# Hello tom
# Hello jay
# Hello jay
# Hello tom
# Hello tom
# Hello bobo
# 总耗时: 2.037203073501587
上一篇:activit 表结构 flowable也大体适用


下一篇:洛谷P1719 最大加权矩形