作者 | 墨辨
概念篇
在理解协程这个概念及其作用场景前,先要了解几个基本的关于操作系统的概念,主要是进程、线程、同步、异步、阻塞、非阻塞,了解这几个概念,不仅是对协程这个场景,诸如消息队列、缓存等,都有一定的帮助。接下来,编者就自己的理解和网上查询的材料,做一个总结。
进程
在面试的时候,我们都会记住一个概念,进程是系统资源分配的最小单位。是的,系统由一个个程序,也就是进程组成的,一般情况下,分为文本区域、数据区域和堆栈区域。
文本区域存储处理器执行的代码(机器码),通常来说,这是一个只读区域,防止运行的程序被意外修改。
数据区域存储所有的变量和动态分配的内存,又细分为初始化的数据区(所有初始化的全局、静态、常量,以及外部变量)和为初始化的数据区(初始化为0的全局变量和静态变量),初始化的变量最初保存在文本区,程序启动后被拷贝到初始化的数据区。
堆栈区域存储着活动过程调用的指令和本地变量,在地址空间里,栈区紧连着堆区,他们的增长方向相反,内存是线性的,所以我们代码放在低地址的地方,由低向高增长,栈区大小不可预测,随开随用,因此放在高地址的地方,由高向低增长。当堆和栈指针重合的时候,意味着内存耗尽,造成内存溢出。
进程的创建和销毁都是相对于系统资源,非常消耗资源,是一种比较昂贵的操作。进程为了自身能得到运行,必须要抢占式的争夺CPU。对于单核CPU来说,在同一时间只能执行一个进程的代码,所以在单核CPU上实现多进程,是通过CPU快速的切换不同进程,看上去就像是多个进程在同时进行。
由于进程间是隔离的,各自拥有自己的内存内存资源,相比于线程的共同共享内存来说,相对安全,不同进程之间的数据只能通过 IPC(Inter-Process Communication) 进行通信共享。
线程
线程是CPU调度的最小单位。如果进程是一个容器,线程就是运行在容器里面的程序,线程是属于进程的,同个进程的多个线程共享进程的内存地址空间。
线程间的通信可以直接通过全局变量进行通信,所以相对来说,线程间通信是不太安全的,因此引入了各种锁的场景,不在这里阐述。
当一个线程崩溃了,会导致整个进程也崩溃了,即其他线程也挂了, 但多进程而不会,一个进程挂了,另一个进程依然照样运行。
在多核操作系统中,默认进程内只有一个线程,所以对多进程的处理就像是一个进程一个核心。
同步和异步
同步和异步关注的是消息通信机制,所谓同步,就是在发出一个函数调用时,在没有得到结果之前,该调用不会返回。一旦调用返回,就立即得到执行的返回值,即调用者主动等待调用结果。所谓异步,就是在请求发出去后,这个调用就立即返回,没有返回结果,通过回调等方式告知该调用的实际结果。
同步的请求,需要主动读写数据,并且等待结果;异步的请求,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
阻塞和非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。所以,区分的条件在于,进程/线程要访问的数据是否就绪,进程/线程是否需要等待。
非阻塞一般通过多路复用实现,多路复用有 select、poll、epoll几种实现方式。
协程
在了解前面的几个概念后,我们再来看协程的概念。
协程是属于线程的,又称微线程,纤程,英文名Coroutine。举个例子,在执行函数A时,我希望随时中断去执行函数B,然后中断B的执行,切换回来执行A。这就是协程的作用,由调用者*切换。这个切换过程并不是等同于函数调用,因为它没有调用语句。执行方式与多线程类似,但是协程只有一个线程执行。
协程的优点是执行效率非常高,因为协程的切换由程序自身控制,不需要切换线程,即没有切换线程的开销。同时,由于只有一个线程,不存在冲突问题,不需要依赖锁(加锁与释放锁存在很多资源消耗)。
协程主要的使用场景在于处理IO密集型程序,解决效率问题,不适用于CPU密集型程序的处理。然而实际场景中这两种场景非常多,如果要充分发挥CPU利用率,可以结合多进程+协程的方式。后续我们会讲到结合点。
原理篇
根据wikipedia的定义,协程是一个无优先级的子程序调度组件,允许子程序在特点的地方挂起恢复。所以理论上,只要内存足够,一个线程中可以有任意多个协程,但同一时刻只能有一个协程在运行,多个协程分享该线程分配到的计算机资源。协程是为了充分发挥异步调用的优势,异步操作则是为了避免IO操作阻塞线程。
知识准备
在了解原理前,我们先做一个知识的准备工作。
1)现代主流的操作系统几乎都是分时操作系统,即一台计算机采用时间片轮转的方式为多个用户服务,系统资源分配的基本单位是进程,CPU调度的基本单位是线程。
2)运行时内存空间分为变量区,栈区,堆区。内存地址分配上,堆区从低地到高,栈区从高往低。
3)计算机执行时一条条指令读取执行,执行到当前指令时,下一条指令的地址在指令寄存器的IP中,ESP寄存值指向当前栈顶地址,EBP指向当前活动栈帧的基地址。
4)系统发生函数调用时操作为:先将入参从右往左依次压栈,然后把返回地址压栈,最后将当前EBP寄存器的值压栈,修改ESP寄存器的值,在栈区分配当前函数局部变量所需的空间。
5)协程的上下文包含属于当前协程的栈区和寄存器里面存放的值。
**事件循环
**
在python3.3中,通过关键字yield from使用协程,在3.5中,引入了关于协程的语法糖async和await,我们主要看async/await的原理解析。其中,事件循环是一个核心所在,编写过 js的同学,会对事件循环Eventloop更加了解, 事件循环是一种等待程序分配事件或消息的编程架构(*)。在python中,asyncio.coroutine 修饰器用来标记作为协程的函数, 这里的协程是和asyncio及其事件循环一起使用的,而在后续的发展中,async/await被使用的越来越广泛。
async/await
async/await是使用python协程的关键,从结构上来看,asyncio 实质上是一个异步框架,async/await 是为异步框架提供的 API已方便使用者调用,所以使用者要想使用async/await 编写协程代码,目前必须机遇 asyncio 或其他异步库。
Future
在实际开发编写异步代码时,为了避免太多的回调方法导致的回调地狱,但又需要获取异步调用的返回结果结果,聪明的语言设计者设计了一个 叫Future的对象,封装了与loop 的交互行为。其大致执行过程为:程序启动后,通过add_done_callback 方法向 epoll 注册回调函数,当 result 属性得到返回值后,主动运行之前注册的回调函数,向上传递给 coroutine。这个Future对象为asyncio.Future。
但是,要想取得返回值,程序必须恢复恢复工作状态,而由于Future 对象本身的生存周期比较短,每一次注册回调、产生事件、触发回调过程后工作可能已经完成,所以用 Future 向生成器 send result 并不合适。所以这里又引入一个新的对象 Task,保存在Future 对象中,对生成器协程进行状态管理。
Python 里另一个 Future 对象是 concurrent.futures.Future,与 asyncio.Future 互不兼容,容易产生混淆。区别点在于,concurrent.futures 是线程级的 Future 对象,当使用 concurrent.futures.Executor 进行多线程编程时,该对象用于在不同的 thread 之间传递结果。
Task
上文中提到,Task是维护生成器协程状态处理执行逻辑的的任务对象,Task 中有一个_step 方法,负责生成器协程与 EventLoop 交互过程的状态迁移,整个过程可以理解为:Task向协程 send 一个值,恢复其工作状态。当协程运行到断点后,得到新的Future对象,再处理 future 与 loop 的回调注册过程。
Loop
在日常开发中,会有一个误区,认为每个线程都可以有一个独立的 loop。实际运行时,主线程才能通过 asyncio.get_event_loop() 创建一个新的 loop,而在其他线程时,使用 get_event_loop() 却会抛错。正确的做法为通过 asyncio.set_event_loop() ,将当前线程与 主线程的loop 显式绑定。
Loop有一个很大的缺陷,就是 loop 的运行状态不受 Python 代码控制,所以在业务处理中,无法稳定的将协程拓展到多线程中运行。
总结
实战篇
介绍完概念和原理,我来看看如何使用,这里,举一个实际场景的例子,来看看如何使用python的协程。
场景
外部接收一些文件,每个文件里有一组数据,其中,这组数据需要通过http的方式,发向第三方平台,并获得结果。
分析
由于同一个文件的每一组数据没有前后的处理逻辑,在之前通过Requests库发送的网络请求,串行执行,下一组数据的发送需要等待上一组数据的返回,显得整个文件的处理时间长,这种请求方式,完全可以由协程来实现。
为了更方便的配合协程发请求,我们使用aiohttp库来代替requests库,关于aiohttp,这里不做过多剖析,仅做下简单介绍。
aiohttp
aiohttp是asyncio和Python的异步HTTP客户端/服务器,由于是异步的,经常用在服务区端接收请求,和客户端爬虫应用,发起异步请求,这里我们主要用来发请求。
aiohttp支持客户端和HTTP服务器,可以实现单线程并发IO操作,无需使用Callback Hell即可支持Server WebSockets和Client WebSockets,且具有中间件。
代码实现
直接上代码了,talk is cheap, show me the code~
import aiohttp
import asyncio
from inspect import isfunction
import time
import logger
@logging_utils.exception(logger)
def request(pool, data_list):
loop = asyncio.get_event_loop()
loop.run_until_complete(exec(pool, data_list))
async def exec(pool, data_list):
tasks = []
sem = asyncio.Semaphore(pool)
for item in data_list:
tasks.append(
control_sem(sem,
item.get("method", "GET"),
item.get("url"),
item.get("data"),
item.get("headers"),
item.get("callback")))
await asyncio.wait(tasks)
async def control_sem(sem, method, url, data, headers, callback):
async with sem:
count = 0
flag = False
while not flag and count < 4:
flag = await fetch(method, url, data, headers, callback)
count = count + 1
print("flag:{},count:{}".format(flag, count))
if count == 4 and not flag:
raise Exception('EAS service not responding after 4 times of retry.')
async def fetch(method, url, data, headers, callback):
async with aiohttp.request(method, url=url, data=data, headers=headers) as resp:
try:
json = await resp.read()
print(json)
if resp.status != 200:
return False
if isfunction(callback):
callback(json)
return True
except Exception as e:
print(e)
这里,我们封装了对外发送批量请求的request方法,接收一次性发送的数据多少,和数据综合,在外部使用时,只需要构建好网络请求对象的数据,设定好请求池大小即可,同时,设置了重试功能,进行了4次重试,防止在网络抖动的时候,单个数据的网络请求发送失败。
最终效果
在使用协程重构网络请求模块之后,当数据量在1000的时候,由之前的816s,提升到424s,快了一倍,且请求池大小加大的时候,效果更明显,由于第三方平台同时建立连接的数据限制,我们设定了40的阀值。可以看到,优化的程度很显著。
编者说
人生苦短,我用python。协程好不好,谁用谁知道。如果有类似的场景,可以考虑启用,或者其他场景,欢迎留言讨论。
参考资料:
理解async/await:
https://segmentfault.com/a/1190000015488033?spm=ata.13261165.0.0.57d41b119Uyp8t
协程概念,原理(c++和node.js实现)
https://cnodejs.org/topic/58ddd7a303d476b42d34c911?spm=ata.13261165.0.0.57d41b119Uyp8t