Python并发请求之requests_future模块使用

Python并发请求之requests_future模块使用

requests_future是requests模块和concurrent.futures模块的综合,使用这个模块可以轻松实现异步并发。目前在github已经有1.5K的star了。

安装

pip3 install requests_future

使用例子

# -*- coding: utf-8 -*-
# @Time : 2020-12-09 10:00
# @Author : lhf
# @File : demo.py
# @Software: PyCharm
from requests_futures.sessions import FuturesSession
from concurrent.futures import as_completed
from lxml import html
import time

url = ["http://www.baidu.com", "http://www.163.com", "http://www.google.com", "http://www.cnblogs.com/c-x-a"]


def get_node(source, x=".//head/title//text()"):
    root = html.fromstring(source)
    node = root.xpath(x)
    return node


def response_hook(resp, *args, **kwargs):
    start = time.time()
    resp.encoding = resp.apparent_encoding
    resp.data = resp.text
    resp.code = resp.status_code
    resp.headers = resp.headers
    resp.elapsed = time.time() - start


def get_req():
    with FuturesSession(max_workers=4) as session:
        futures = [session.get(i, hooks={"response": response_hook}) for i in url]
        for future in as_completed(futures):
            resp = future.result()
            print("状态码", resp.code)
            print("标题", get_node(resp.data)[0])
            print("耗时", resp.elapsed)
            print("="*30)

if __name__ == '__main__':
    get_req()

首先通过导入FuturesSession,然后通过 with FuturesSession(max_workers=4) as session:创建一个上下文, 保证在并发请求之后关闭资源,然后就像使用session.get传入url等参数,除此之处还可以可以一个hooks字典。
其值是一个函数。这个函数会在请求结束后执行一系列操作,具体看我的response_hook函数就好了。
接下来我们就获取一个future对象列表,futures。可以使用concurrent.futures的as_completed,as_completed()可以把多个并发的协程一起给它,但它把返回的结果变成一个生成器,每次返回一个协程的结果,与函数wait()一样,执行协程是乱序的,不会等所有协程执行完成才返回。

我们还可以继承FuturesSession,然后重写上面的过程,代码如下

from requests_futures.sessions import FuturesSession
from concurrent.futures import as_completed
from lxml import html
import time

url = ["http://www.baidu.com", "http://www.163.com", "http://www.google.com", "http://www.cnblogs.com/c-x-a"]


class MySession(FuturesSession):

    def request(self, method, url, hooks=None, *args, **kwargs):
        start = time.time()
        if hooks is None:
            hooks = {}

        def response_hook(resp, *args, **kwargs):
            resp.encoding = resp.apparent_encoding
            resp.data = resp.text
            resp.code = resp.status_code
            resp.headers = resp.headers
            resp.elapsed = time.time() - start

        try:
            if isinstance(hooks['response'], (list, tuple)):
                hooks['response'].insert(0, response_hook)
            else:
                hooks['response'] = [response_hook, hooks['response']]
        except KeyError:
            hooks['response'] = response_hook

        return super(MySession, self).request(method, url, hooks=hooks, *args, **kwargs)


def get_node(source, x=".//head/title//text()"):
    root = html.fromstring(source)
    node = root.xpath(x)
    return node


def get_req():
    headers = {
        'Accept': 'application/json, text/javascript, */*; q=0.01',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'User-Agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36",
    }
    with MySession(max_workers=4) as session:
        futures = [session.get(i, headers=headers) for i in url]
        for future in as_completed(futures):
            resp = future.result()
            print("状态码", resp.code)
            print("标题", get_node(resp.data)[0])
            print("耗时", resp.elapsed)
            print("=" * 30)



if __name__ == '__main__':
    get_req()

这就是关于requests_futures的模块的简单使用案例,因为创建出session之后,剩下的和requests模块的session没多少差别,就不仔细说了,这样没啥难度,就可以轻松实现并发了,也可以使用之前我说的tomorrow模块进行并发,不过tomorrow之有线程池的支持,requests_futures好处是还支持进程池,所以我打算抛弃tomorrow模块了。

参考资料

https://github.com/ross/requests-futures

上一篇:dubbo之网络通讯


下一篇:Dubbo源码解析-远程调用