深入剖析concurrent.futures

文章目录

模拟一个故事,从百度图片上面随便找几张美女图片,下载下来,保存到本地,先按照顺序下载

1 顺序同步下载

import random
import time

import requests

urls = [
    "https://t7.baidu.com/it/u=3676218341,3686214618&fm=193&f=GIF",
    "https://t7.baidu.com/it/u=3930750564,2979238085&fm=193&f=GIF",
    "https://pics7.baidu.com/feed/c8ea15ce36d3d5398b62865e47680d55372ab0c1.jpeg?token=43cb8aff8adfd6c74ec99218af7a3aad&s=FD36AD570CBC56949920F8E803003021",
    "https://pics4.baidu.com/feed/00e93901213fb80ea99ee55b212dcb28bb3894f6.jpeg?token=910769ca2750ca2900cb28542616f7c2",
    "https://gimg2.baidu.com/image_search/src=http%3A%2F%2Finews.gtimg.com%2Fnewsapp_match%2F0%2F11158692545%2F0.jpg&refer=http%3A%2F%2Finews.gtimg.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg?sec=1638945903&t=ab53b7ec3f91652eacf7499b1a4ff529"
]

def use_time(func):
    def inner(*args, **kwargs):
        s = time.time()
        func(*args, **kwargs)
        print(f"共消耗了{time.time()-s}s")
    return inner


def download(url):
    headers = {
        'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
    }
    resp = requests.get(url, headers=headers)
    return resp.content


def sava(content):
    with open(f'{random.randint(0,100)}.jpg','wb') as f:
        f.write(content)


@use_time
def main():
    for url in urls:
        resp = download(url)
        sava(resp)


if __name__ == '__main__':
    main()

深入剖析concurrent.futures
深入剖析concurrent.futures
顺序下载大概使用了15s时间

2 concurrent.futures并发下载

map函数

from concurrent.futures import ThreadPoolExecutor

from test4 import download, sava, urls, use_time

MAX_WORKER = 10


@use_time
def cmain():
    resp = ThreadPoolExecutor(max_workers=min(len(urls), MAX_WORKER)).map(
        download, urls
    )

    for _ in resp:
        sava(_)


if __name__ == '__main__':
    cmain()

深入剖析concurrent.futures
快了大概30倍,这只是小任务量的并发,如果任务量足够大,效率可想而知。
map方法与内置的map方法差不多,返回值是一个生成器,包含所有参数列表的返回值。
map返回值是一个包含Future对象的迭代器,迭代器的__next__ 方法调用各个Future对象的 result 方法,因此我们得到的是各个Future的结果,而非Future本身。

submit函数

import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

urls = [
    "https://t7.baidu.com/it/u=3676218341,3686214618&fm=193&f=GIF",
    "https://t7.baidu.com/it/u=3930750564,2979238085&fm=193&f=GIF",
    "https://pics7.baidu.com/feed/c8ea15ce36d3d5398b62865e47680d55372ab0c1.jpeg?token=43cb8aff8adfd6c74ec99218af7a3aad&s=FD36AD570CBC56949920F8E803003021",
    "https://pics4.baidu.com/feed/00e93901213fb80ea99ee55b212dcb28bb3894f6.jpeg?token=910769ca2750ca2900cb28542616f7c2",
    "https://gimg2.baidu.com/image_search/src=http%3A%2F%2Finews.gtimg.com%2Fnewsapp_match%2F0%2F11158692545%2F0.jpg&refer=http%3A%2F%2Finews.gtimg.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg?sec=1638945903&t=ab53b7ec3f91652eacf7499b1a4ff529"
]

def use_time(func):
    def inner(*args, **kwargs):
        s = time.time()
        func(*args, **kwargs)
        print(f"共消耗了{time.time()-s}s")
    return inner


def download(url):
    headers = {
        'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
    }
    resp = requests.get(url, headers=headers)
    return resp.content


def sava(content):
    with open(f'{random.randint(0,100)}.jpg','wb') as f:
        f.write(content)


@use_time
def main():
    fu_list = []
    with ThreadPoolExecutor(3) as the:
        for url in  urls:
            fu = the.submit(download, url)
            print(f"url-{url}->fu-{fu}")
            fu_list.append(fu)
        for fu in as_completed(fu_list): 
            print(f"state:{fu}")
            resp = fu.result() 
            sava(resp)


if __name__ == '__main__':
    main()
url-https://t7.baidu.com/it/u=3676218341,368621461 ->fu-<Future at 0x21a64dc6550 state=running>
url-https://t7.baidu.com/it/u=3930750564,2979238085->fu-<Future at 0x21a64dd6d00 state=running>
url-https://pics7.baidu.com/feed/c8ea15ce36d3d5398b->fu-<Future at 0x21a64de0910 state=running>
url-https://pics4.baidu.com/feed/00e93901213fb80ea9->fu-<Future at 0x21a64ded310 state=pending>
url-https://gimg2.baidu.com/image_search/src=http%3->fu-<Future at 0x21a64dedac0 state=pending>
state:<Future at 0x21a64dc6550 state=finished returned bytes>
state:<Future at 0x21a64dd6d00 state=finished returned bytes>
state:<Future at 0x21a64de0910 state=finished returned bytes>
state:<Future at 0x21a64dedac0 state=finished returned bytes>
state:<Future at 0x21a64ded310 state=finished returned bytes>
共消耗了0.7982451915740967s

使用两个for循环的结果其实和map的结果是一样的,唯一的不同是map的结果与参数不能保证一一对应,通过submit函数和Future对象能够一一对应。

executor.submit 和 futures.as_completed 这个组合比 executor.map 更灵活,因为 submit 方法能处理不同的可调用对象和参数,而 executor.map 只能处理参数不同的同一个可调用对象。此外,传给 futures.as_completed 函数的Future集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建。

3 GIL、多核心CPU与进程池

Python 标准库中的所有阻塞型 I/O 函数都会释放 GIL,允许其他线程运行。time.sleep() 函数也会释放 GIL。因此,尽管有 GIL,Python 线程还是能在 I/O密集型应用中发挥作用。

如果想利用多核心,那么请移步多进程,concurrent.futures同样提供了进程池对多进程提供支持

concurrent.futures模块实现的是真正的并行计算,因为它使用ProcessPoolExecutor 类把工作分配给多个 Python 进程处理。因此,如果需要做 CPU密集型处理,使用这个模块能绕开 GIL,利用所有可用的 CPU 核心。

import os
from concurrent.futures import ProcessPoolExecutor

from test4 import download, sava, urls, use_time

@use_time
def cmain():
    # resp = ThreadPoolExecutor(max_workers=min(len(urls), MAX_WORKER)).map(
    #     download, urls
    # )
    with ProcessPoolExecutor() as ppe:
        resp = ppe.map(download, urls)
    for _ in resp:
        sava(_)


if __name__ == '__main__':
    cmain()

ThreadPoolExecutor与ProcessPoolExecutor类都继承于同一个接口,不同的一点是,ProcessPoolExecutor的max_worker是可选的,默认值为os.cpu_count(), 也就是看电脑的最多CPU数量。

使用进程池代替线程池进行网络资源下载,速度反而慢了,我猜测可能是应该创建进程和分配资源的时间可能更长。

从这个角度考虑,进程池适用于计算密集型任务,而不适用于IO密集型计算。

上一篇:根据卡片背单词(21年10月)


下一篇:找出一组数中出现次数最多的数(csp201312-1)