python 快速万能同步转异步语法

pip install sync2asyncio

 

python 快速万能同步转异步语法。

 

使任意同步库快速变asyncio异步语法的方式 ,simple_run_in_executor

这是一个异步对象Asyncio的Future了,可以被await和异步任务编排。

await simple_run_in_executor(requests.get, url='http://www.baidu.com')  # 等效 await aiohttp.request('get',url)
await simple_run_in_executor(time.sleep, 5)                            # 等效 await asyncio.sleep(5)


2 代码实现和测试

from functools import partial
import threading
import asyncio

from threadpool_executor_shrink_able import ThreadPoolExecutorShrinkAble  # 没有使用内置的 concurrent.futures里面的,这个是优化4点功能的。

global_threadpool_executor = ThreadPoolExecutorShrinkAble(200)  # 这个是智能线程池,不是官方的concurrent.futures.threadpoolexecutor


async def simple_run_in_executor(f, *args, async_loop=None, threadpool_executor=None, **kwargs):
    """
    函数的目的是转化任何同步方法或函数为异步链路语法中的一环。例如你写一个功能,要调用10个包,其中有9个有对应的异步库,有一个还没有对应的异步库,
    因为一旦异步需要处处异步,不能因为某一个功能没有对应的异步库就前功尽弃。本函数就能够做到一个异步的链路里面调用同步库但不阻塞整个asyncio的loop循环。

    这个函数看起来很简单,主要是调用官方的 run_in_executor 。

    第1个特点是由官方的  方法改成了现在的函数 (方法是类里面面的,函数是模块下面的,我一般这么划分python方法和函数)

    第2个特点是直接内置了线程池,用户可以无需传参了。并且这个线程池功能比官方的线程池要好,可以设置一个很大的值,他会自适应自动扩大缩小。

    第3个特点是最重要的提高了易用性的地方。使用了整体偏函数把所有入参和函数生成一个偏函数,进而解决了官方只支持位置入参,不支持关键字入参的,
    当函数入参达到几十个时候,例如requests.get 如果你想设置timeout参数,如果不支持关键字入参,你需要把timeout参数之前的其他不重要参数全都传递一遍使用默认None来占位。
    函数入参个数比较多的情况下,不支持关键字入参就会很容易导致传参错误。


    :param f: 任意同步阻塞函数,是非 async def的函数
    :param args:  同步函数的入参
    :param async_loop: loop
    :param threadpool_executor: 在项城市里面运行。
    :param kwargs: 同步函数的入参
    :return: 同步函数的结果
    """
    loopx = async_loop or asyncio.get_event_loop()
    # print(id(loopx))
    executor = threadpool_executor or global_threadpool_executor
    result = await loopx.run_in_executor(executor, partial(f, *args, **kwargs))
    return result


if __name__ == '__main__':
    import time
    import requests  # 这是同步阻塞函数之一


    def block_fun(x):  # 这是自定义的第二个同步阻塞函数
        time.sleep(5)
        print(x)
        return x * 10


    async def enter_fun(xx):  # 入口函数,因为为一旦异步,必须处处异步。不能直接调用block_fun,否则阻塞其他任务。
        await asyncio.sleep(1)  # # 如果你这么写  time.sleep(1)  那就完了个蛋,程序运行完成需要更长的时间。
        await simple_run_in_executor(time.sleep,1)  # 如果世上没有asyncio.sleep异步函数,那么可以这么做。
        r = await  simple_run_in_executor(block_fun, xx)  # # 如果你这么写  r = block_fun(xx)   那就完了个蛋,程序运行完成需要更长的时间。
        print(r)
        resp = await  simple_run_in_executor(requests.get, url='http://www.baidu.com', timeout=5)
        # 如果你这么写  resp = requests.get( url='http://www.baidu.com')   那就完了个蛋,如果网站每次响应时间很大会发生严重影响,程序运行完成需要更长的时间。
        # 这个是调用了同步requests请求库,如果同步库请求一个网站需要10秒响应,asyncio中如果直接使用了同步库,会发生灭顶之灾,整个loop就成了废物。如果网站每次响应是1毫秒,那么异步中调用同步库还可以勉强接受的。
        # 但用 simple_run_in_executor来运行requests 即使网站响应时间很长,也不会对asyncio的loop产生严重阻塞影响了,这就是 simple_run_in_executor 要达到的目的。
        print(resp)


    loopy = asyncio.get_event_loop()
    print(id(loopy))
    tasks = []
    tasks.append(simple_run_in_executor(requests.get, url='http://www.baidu.com'))

    tasks.append(simple_run_in_executor(block_fun, 1))
    tasks.append(simple_run_in_executor(block_fun, 2))
    tasks.append(simple_run_in_executor(block_fun, 3))

    for i in range(100, 120):
        tasks.append(enter_fun(i))

    print('开始')
    loopy.run_until_complete(asyncio.wait(tasks))  # 通过以上可以观察到,所有的block_fun的print都是同一时间打印出来的,而不是每隔5秒一个接一个打印的。
    print('结束')


虽然block_fun同步函数里面需要阻塞5秒,但使用了 simple_run_in_executor 来执行这个同步阻塞函数,

虽然运行了十几个协程任务,不会造成整体整体运行时间延长。

如果不使用 simple_run_in_executor 来运行 block_fun,运行完这十几个协程任务最起码需要1分钟以上。

 

 

 

4 比较asyncio.run_coroutine_threadsafe 和 run_in_executor区别

asyncio并发真的太难了,比线程池用法难很多,里面的概念太难了,例如介绍这两个概念。
asyncio.run_coroutine_threadsafe 和 run_in_executor 是一对反义词。

asyncio.run_coroutine_threadsafe 是在非异步的上下文环境(也就是正常的同步语法的函数里面)下调用异步函数对象(协程),
因为当前函数定义没有被async修饰,就不能在函数里面使用await,必须使用这。这个是将asyncio包的future对象转化返回一个concurrent.futures包的future对象。

run_in_executor 是在异步环境(被async修饰的异步函数)里面,调用同步函数,将函数放到线程池运行防止阻塞整个事件循环的其他任务。
这个是将 一个concurrent.futures包的future对象 转化为 asyncio包的future对象,
asyncio包的future对象是一个asyncio包的awaitable对象,所以可以被await,concurrent.futures.Future对象不能被await。
 
上一篇:springboot simple_6 springboot mqtt


下一篇:什么是解耦?