Python 是一门上手快、优雅简洁的编程语言,其多范式、丰富的标准库和第三方库能够让编程人员把精力集中在逻辑和思维方法上,而不用去担心复杂语法、类型系统等外在因素,从而高效地达成自己的编程目标。Python 抽象层次非常高,这帮助我们更好更快地完成编程,但也屏蔽了很多细节,程序员也无法精确控制计算机底层的资源,代码性能优化就变得比较复杂。很多资深的程序员可能会觉得 Python 性能不够好,无法编写高性能的程序,其实这句话也不全对。对于计算密集型的程序,Python 可以通过扩展的形式使得核心计算直接调用其它语言(通常是 C 语言)写的包。比如说近期流行的机器学习所使用的 CUDA 技术,Nvidia 就在官网发表了一篇文章专门讲如何通过 Python 使用 CUDA 技术。还有科学计算库 Numpy,也是有代表的 Python 高性能的第三方库。如果第三方库不能满足我们,还可以通过 Cython 这种有着类 Python 语法的工具来生成 Python 的 C 语言扩展库,直接给我们的程序加速。这些技术使得 Python 也可以有高性能的计算能力,但是本身核心代码并不是用 Python 写的,所以在本文中不关注这些技术。我们在本文中主要讨论另一种情况,如何写 I/O 密集型的高性能 Python 程序。
RQData 就是一个典型的需要高 I/O 性能的产品。RQData 是米筐为专业投资者提供便利易用的金融数据 API 。用户使用 RQData 客户端工具包 rqdatac,通过网络的方式调取存在远程的金融数据(云端或本地取决于实施方式)。RQData 提供了非常多的 Python API,方便投资者调取全市场的金融数据。这些 API 包含了股票、期货、现货、期权、可转债、场内基金、风险因子、财务因子等类型,而这些数据最重要的特性就是数据量大。RQData 日常面临着每天数亿次对于总量约为 1T 数据的随机调取。这些调取所需返回的数据量往往也相当庞大,例如十年全 A 股的日线数据大约会返回 1 个 G 的数据。在这样的压力下,我们仍希望用户在使用 RQData 的时候就像浏览器下载文件一样快,这些需求在 RQData 中都是用 Python 通过 asyncio 模块完成的。接下来我们一起由浅入深地学习一下 asyncio 。
Asyncio 初识
asyncio 是 Python 3.5 中引入的一个库, 目的是方便编写网络应用程序。一个简单的以 RQData 服务端为例的网络处理的核心代码基本如下(简化了 API 的业务逻辑):
上面的代码建构了一个简单的服务端应用,它接收到用户的连接后,直接返回用户输入的数据。这个程序非常简单,却能体现出 asyncio 的魅力。我们构建的程序非常像传统的网络程序。和传统的网络程序相比,我们改动了三处:
- 用了 async 关键字定义了一个协程函数,而不是一个普通的函数。
- 相比于传统的阻塞的函数调用 , 在调用 async 关键字定义的函数前加入了 await 。
- 没有使用 socket 定义的方法,而是换成了 asyncio 中 EventLoop 定义的 async 协程函数。
仅仅多了 async 和 await 两个关键字,我们就享受了协程的优势。协程让我们不用担心过多的线程导致操作系统调度占用了太多的 CPU 时间,我们的程序可以运行在一个 CPU 上,享受 CPU 的高速缓存,减少 CPU 缓存同步的开销,而且我们的代码非常容易理解,几乎和传统的网络程序没有什么区别。
异步与协程
asyncio 全称 asynchronous I/O ——异步输入输出。相对于同步,异步函数会在调用时不等待 I/O 操作完成,I/O 结果通过回调的方式交给调用者。举个打电话的例子,如果对方忙线,同步函数会一起挂起,直到对方接听了我的电话;异步函数则是留言到语音信箱,通知对方在完成了上一通电话后,再给我回电。
虽然 Python 在 3.5 版本中才引入协程,但其实协程的历史非常久远。最早对协程的书面解说可追溯到 1963 年马尔文 · 爱德华 · 康威( MelvinE.Conway )写的一篇关于如何写 COBOL 语言编译器的文章。这篇文章指出可以基于状态机和协程写出一个高性能的、简单的 COBOL 编译器。康威在文章中第一次引入了协程的概念并在各种语言中发扬光大,而现在常见的编程语言都直接或间接地实现了协程。康威在他的文章里提出了低耦合的编程模式,他要求:
- 模块间的通过传递消息交流。
- 这些消息有一个固定的、单向的传递方向。
- 程序可以按左端是输入,右端是输出的方式布置,所有的消息都是向右移动的。
在以上的条件下,每一个模块就可以作为一个协程去运行。协程是特殊的子例程(函数、过程或方法)。在 Python 里面通过 async def 定义一个协程,协程就可以通过 yield (一般翻译为让步)告诉 CPU 让出自己的执行权,让 CPU 执行其它协程,直到该协程再次被显式要求执行。到下一次激活此协程时,CPU 将从协程上次 yield 返回的位置接着执行。协程在 yield 让出执行权以及被再次给予执行权时还可以输出或输入一个值,每次在 yield 时输出一个值的协程也被称为生成器。协程通过 yield 方式转移执行权,与函数不同,两个协程之间不是调用者和被调用者的关系,而是彼此对称、平等的。协程还可以通过 await 关键字等待另外一个协程的结果。
协程极大地简化了异步程序,它和其它协程交流的时候并不是强制同步的。举个搜索框的例子,我们可以把这个搜索函数作为一个协程运行,把列出最新热词作为另外一个协程运行,这样当我们的搜索函数在等待用户输入的时候,列出热词的协程就可以运行,而不用一定等待用户输入完成,在用户输入事件完成后,搜索函数就可以接受用户输入的关键字,接着运行。协程共用公共的事件循环器( Eventloop),事件循环器来寻找可以响应的 I/O 事件,并激活这些事件对应的协程。所以编写协程程序也会被称为编写事件驱动程序。
协程基础对象
本章我们介绍一下 Eventloop,Future,Promise 和 Generator 的概念。
Eventloop (事件循环)是一个循环查询 I/O 事件,使遇到 I/O 阻塞的代码可以在它需要的时候运行的程序结构,主要的功能是等待和响应 I/O 事件。Python 的 Eventloop 提供了常用的针对网络、文件和管道的事件注册函数,以及执行延时任务的 API 。Python 事件循环主要原理代码如下:
每一个事件的对应一个回调函数,事件循环每一次循环里,找到 IO 已经准备好的事件,在循环体里执行回调函数。事件循环为了找到已经准备好的 I/O 事件,在 Linux 下通常使用系统提供的 epoll API 。
在计算机科学中,Future 和 Promise 是指用于在并发编程语言中同步程序执行的构造。由于网络请求尚未结束,我们需要一个对象来暂时代表函数的执行结果;在网络请求结束后,可以通过这个对象来获取真正想要的结果,于是就有了上述这些构造。Future 从字面上表示的是我们未来在程序中需要获取的值,其本质是异步函数的返回值,通常作为一个值的占位符出现;其目的是让异步程序看起来像是顺序执行的,而不是将处理异步函数返回值的代码写在另一个回调函数里,破坏代码的可读性。在调用一个异步函数时,该函数直接返回一个 Future 对象,但真正的计算过程仍在异步进行中(这个计算的过程便被称为 Promise,因为它会在完结时把结果写入 Future,相当于做了一个在“未来”一定会给出结果的一个“承诺”);此时调用者可以继续执行其他不依赖这个计算结果的任务而无需等待计算的完成。当计算完成时,计算函数将结果设置给 Future 对象,这样调用者就可以使用这个值了。
Future 这个概念在 Python 中有一个具体的对象来描述,它定义了以下接口:
- result(),exception() 获取一个 Future 的值 , 或者异常;
- set_result(value),set_exception(exception) 设置一个 Future 的值,或者一个异常;
- done() 查看一个 Future 是否已经有值;
- cancel(),cancelled() 取消一个 Future,以及查看一个 Future 是否被取消;
- add_done_callback(callback,*,context=None),remove_done_callback(callback) 设置,删除一个 Future 在完成时运行的回调函数。
比如你去餐厅吃饭,发现没有座位,那么此时店员会要求你扫码排队,扫码完成后你的微信里出现了一个小程序告诉你当前排位多少号。所以相当于此时餐厅对你做出了一个“承诺(Promise)”,它承诺你在有位置之后会通知你;而你的这个排队小程序相当于是一个 Future,你只要带着这个小程序就不需要在餐厅门口等位而可以出去逛街了。这个小程序通常会有三个功能,一个是在有位置的时候给你发送通知,相当于给 Future 通过 add_done_callback 设置的回调函数;另一个是允许你查看当前排位是否轮到你了,相当于 Future 中的 done 函数;最后就是取消排队,相当于 cancel 函数。因此实际上 Future 和 Promise 的应用在生活中非常常见。
Generator (生成器)是特殊的一种函数,可以在每一次产生一个值后面可以把控制权交给调用者。Generator 也被称为 semicoroutine (半协程),是一种特殊的、能力更弱的协程—— 它无法在返回一个值的时候指定另外一个协程继续执行。Python 中协程继承了生成器,除了使用 for 循环迭代外,生成器的方法协程都可以调用。Generator 的状态存储和恢复的功能是实现协程的基础,有了生成器就可以在生成器的基础上实现协程。Python 中生成器使用如下:
当一个函数内包含了 yield 语句时,执行它将会固定返回一个生成器对象,而不会真正地执行这个函数,直到调用了这个对象的 send 或 next 方法。
通过生成器的 send 方法来激活一个生成器,每一次调用此函数都会恢复生成器的运行,直到遇到下一个 yield,并把 yield 右边的值返回。生成器结束的时候会抛出一个 StopIteration 错误,表明其已经结束运行。生成器在 yield 右边返回一个值,它还可以在 yield 左边接收一个值,举个例子:
这是一个简单的求和生成器。在 g = sum_gen() 这一句,我们构造了一个生成器,此时生成器内部的代码还没有运行,在 g.send(None) 这一句,生成器开始被激活,此时生成器会运行到第一个 yield 的右边,即 yield count 。这里可以将 yield count 看成一个表达式,它将会在生成器被激活时返回一个由外部的 send 方法传入的值;但是这个表达式本身的作用是将 count 变量传出外部。第一次执行生成器的 send 方法时传入的 None 并不会体现在任何地方,因为在初始状态时程序并未执行到 yield 那一行,因此传入的这个值会被直接丢弃。然后在下面的循环里,每一个用 send 方法激活生成器的时候,都会造成类似 yield count 语句返回了 send 方法传入值的效果,然后由左边的 total_value 接收该值。生成器还有一个注意的点是,第一次激活生成器的时候 send 的参数必须为 None,这是因为生成器在第一次被激活的时候不是从 yield 左边开始的,没有语句去接收一个值,这个时候,用 None 来表示空值。上面的程序运行的时候会打印如下的结果:
生成器将一般的顺序执行的代码变成了分段执行,改变了正常程序的执行流程,我们称能实现这种转换的机制为“控制流”。下面我们来看看在 Python 中控制流是如何实现的。
控制流的实现
前文已经介绍过协程的使用方法和效果,在 Python 中这一切是由事件循环( EventLoop )的方式实现的。
事件循环是一种常见的编程模式,被广泛运用于各种程序中。例如几乎所有的图形化界面都是以事件循环的模式来编写的,还有比如 NodeJS 直接把事件循环机制做进了语言本身的特性中。通俗地理解,所谓“事件循环”就是一个循环,它不断地接收事件并且处理事件,事件与事件之间从事件循环的角度看并没有相关性。就好比流水线上的一台机器,一头进入原料(事件),另一头出来成品(处理结果),从这台机器(机器本身对原料的加工过程就是事件触发的一个任务)的角度看,原料 1 与原料 2 之间没有直接联系,成品 1 与成品 2 之间也没有直接联系。但是原料 1 和原料 2 可能是由前一个步骤产生的,有一定的相关性;而成品 1 和成品 2 也可能会被后面的步骤共同使用。因此并不是说事件之间绝对独立,只是从事件循环这台机器的角度看是独立的。
在 Python 中,事件循环通过 EventLoop 对象、Task 对象和 asyncio 库共同完成。当 EventLoop 收到一个事件时,便会创建一个 Task 对象来抽象化这个事件所触发的处理过程(即上面例子里机器对原料的加工任务),在 Task 执行的过程中如果遇到了 I/O 操作、yield 语句或者 await 语句,那么代表这个 Task 将会暂时离开 EventLoop ;而之后再次收到执行(例如调用了生成器的 send 方法,或者 I/O 数据返回了)的信号时,Task 又会再次被丢回 EventLoop 继续执行。这样做的好处是 EventLoop 在 Task 执行过程中遇到等待的状况时可以继续执行其他的任务,高效地利用了 CPU 宝贵的执行时间。
下面我们举例说明一下一个 Task 完整的控制流是什么样子的:
上述代码的执行流程是:
程序从函数 print_sum 开始执行,执行到 await asyncio.sleep(1.0) 的时候遇到了阻塞,此时协程让出执行权,EventLoop 接管程序。EventLoop 等待一秒后,sleep 完成,EventLoop 激活了 Task,Task 从 sleep 后面开始执行,compute 返回了 x+y 的结果, 其值为 3,因为在 11 行 print_sum 使用了 await 等待了 compute 的结果,所以 print_sum 协程被激活,激活后 result 接收并存储了一个 3,接着下一行打印一个结果,print_sum 协程结束,此时整个 task 完成,Task 通知 EventLoop 自己已完成,EventLoop 察觉所有的 task 已完成,EventLoop 自己退出, 整个程序完成。
await 协程原理
一个协程可以在自己的过程里 await 另外一个协程的返回值,它会在 await 处挂起,等待直到 await 右边的协程结束。await 在 python 里的是通过委派生成器实现的。Python 里可以通过 yield from 关键字把生成器 yield 值的功能委派给另外一个生成器。yield from 的使用如下:
构造的 gen 生成器实例 g (第 11 行)将生成值的功能委派给了 sub 生成器(第 7 行),遇到 yield from 关键字的时候,此时对 gen 调用 send 方法会驱动委派生成器(此例子里是 sub 生成器)的运行,每次 send 调用在此时可以理解为对 sub 生成器调用 send 方法,直到 sub 生成器完成退出。sub 生成器结束后,继续调用 gen 生成器的 send 方法会继续执行 gen 生成器的代码,最终会循环打印出 1,2,1 序列。
yield from 的使用不需要任何特殊的调用,对于生成器来说仍然是不停的调用 send 方法来驱动生成器的运行。一个有 yield from 关键字的生成器在构造的时候代码信息会包含委派生成器的代码内容, 在代码运行遇到 yield from 的时候,激活的生成器时会进入到委派生成器,运行委派生成器的代码,委派生成器和生成器使用同样的 send 代码激活。yield from 功能和 await 功能如出一辙,await 其实就是协程版的 yield from 。await 一个协程的时候,协程的 send 方法调用就进入 await 右边的的协程,开始执行右边协程的 send 方法。在 await 右边协程结束的时候,右边协程的返回值会传递给 await 左边的语句。
Task 对象
Task 对象是管理协程所抽象的对象,是协程运行的基本单位,可以理解为轻量级线程,它的构造函数比较简单,接受一个协程参数,复制当前的上下文环境,构造函数如下:
Task 接收一个主协程为参数,Task 从主协程开始运行, 主协程退出的时候,Task 就完成了。Task 生成的时候和 EventLoop 绑定,在第 6 行,使用 loop 的 call_soon 方法注册了一下事件, 在 EventLoop 下一次循环的时候,step 方法会被立即调用,以后每一次 Task 被 EventLoop 唤醒的时候,Task 的 step 方法就会被调用,step 方法的主要逻辑如下:
第一步是通告此时运行的 Task 是我自己(第 5 行),然后激活协程的运行,在第 9 行里使用了 coro.send(None) 方法来激活协程的运行。前面我们讲到了生成器,生成器是半协程,在 CPython 里面,协程继承了生成器的所有功能,所以这里调用的 send 方法就是生成器的 send 。协程激活运行完后,如果抛出了 StopIteration 异常, 表明协程以运行完毕,在第 15 行设置了自己的返回值。否则,在 20 行又注册了一遍自己,等待下一个事件循环继续运行。最后通告此 task 在此次事件循环需要运行的过程已结束, 整个函数退出。
总结
至此,我们已经了解了 asyncio 作为一个库做了什么以及是如何做的,总结如下:
1.控制流的暂停与恢复,这是 Python 内部基于生成器相关的功能实现的,通过 send 方法激活生成器来恢复控制流,通过 yield 关键字来暂停控制流,让出执行权。
2.协程链,一个协程可以等待另外一个协程的运行结束并取得它的返回值。
3.事件循环,可以轮询 I/O 事件,并且运行它们注册的回调函数,驱使所有的协程可以在必要的时候运行。
引用
[1].GPU Accelerated Computing with Python
[2].RQData 介绍
[3].PEP 492 -- Coroutines with async and await syntax
[4].Conway M E. Design of a separable transition-diagram compiler[J]. Communications of the ACM, 1963, 6(7): 396-408.
每天超过 300 家金融机构通过 RQData API 服务获取各种标的金融数据,在协程的帮助下,我们通过一种简单且高效的方式完成了此任务。
欢迎联系米筐量化王老师微信 RicequantCS 沟通分享更多金融技术干货,也欢迎大家进行 RQData 产品试用。