python网络-多任务实现之协程

一、协程

协程,又称微线程,纤程。英文名Coroutine。

协程不是进程,也不是线程,它就是一个函数,一个特殊的函数——可以在某个地方挂起,并且可以重新在挂起处继续运行。所以说,协程与进程、线程相比,不是一个维度的概念。

一个进程可以包含多个线程,一个线程也可以包含多个协程,也就是说,一个线程内可以有多个那样的特殊函数在运行。但是有一点,必须明确,一个线程内的多个协程的运行是串行的。如果有多核CPU的话,多个进程或一个进程内的多个线程是可以并行运行的,但是一个线程内的多个协程却绝对串行的,无论有多少个CPU(核)。这个比较好理解,毕竟协程虽然是一个特殊的函数,但仍然是一个函数。一个线程内可以运行多个函数,但是这些函数都是串行运行的。当一个协程运行时,其他协程必须挂起。

通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定

二、yield实现协程

 1 import time
 2 
 3 def A():
 4     while True:
 5         print("----A---")
 6         yield
 7         time.sleep(0.3)
 8 
 9 def B(c):
10     while True:
11         print("----B---")
12         next(c)
13         time.sleep(0.3)
14 
15 if __name__=='__main__':
16     a = A()
17     B(a)

执行结果

----B---
----A---
----B---
----A---
----B---
----A---
----B---
----A---
----B---
----A---
省略。。。

代码说明:

第17行:调用函数B,并把a传递进去。执行打印B的代码,代码执行到next(c)时,会调用函数A,执行打印A的代码,当代码实行带第6行遇到yield的实行,该协程进入等待状态,回到原来next(c)处继续执行,从而实现多协程的切换,通过yield关键字。

 

三、greenlet

1、greenlet实现多任务协程

为了更好使用协程来完成多任务,python中的greenlet模块对其封装,从而使得切换任务变的更加简单,在使用前先要确保greenlet模块安装

使用如下命令安装greenlet模块:

sudo pip install greenlet
#coding = utf-8
from greenlet import greenlet
def test1():
    print("1")
    gr2.switch()
    print("2")

def test2():
    print("3")
    gr1.switch()
    print("4")

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

运行结果:

1
3
2

当创建一个greenlet时,首先初始化一个空的栈, switch到这个栈的时候,会运行在greenlet构造时传入的函数(首先在test1中打印 1), 如果在这个函数(test1)中switch到其他协程(到了test2 打印3),那么该协程会被挂起,等到切换回来(在test1切换回来 打印2)。当这个协程对应函数执行完毕,那么这个协程就变成dead状态。
  

注意 上面没有打印test2的最后一行输出 4,因为在test2中切换到gr1之后挂起,但是没有地方再切换回来。

2、greenlet的模块与类

我们首先看一下greenlet这个module里面的属性

>>> import greenlet
>>> dir(greenlet)
['GREENLET_USE_GC', 'GREENLET_USE_TRACING', 'GreenletExit', '_C_API', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', '__version__', 'error', 'getcurrent', 'gettrace', 'greenlet', 'settrace']

其中,比较重要的是getcurrent(), 类greenlet、异常类GreenletExit。

getcurrent()返回当前的greenlet实例;

GreenletExit:是一个特殊的异常,当触发了这个异常的时候,即使不处理,也不会抛到其parent(后面会提到协程中对返回值或者异常的处理)

然后我们再来看看greenlet.greenlet这个类:

>>>dir(greenlet.greenlet)
['GreenletExit', '__bool__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_stack_saved', 'dead', 'error', 'getcurrent', 'gettrace', 'gr_frame', 'parent', 'run', 'settrace', 'switch', 'throw']

比较重要的几个属性:

  run:当greenlet启动的时候会调用到这个callable,如果我们需要继承greenlet.greenlet时,需要重写该方法

  switch:前面已经介绍过了,在greenlet之间切换

  parent:可读写属性,后面介绍

  dead:如果greenlet执行结束,那么该属性为true

  throw:切换到指定greenlet后立即跑出异常

文章后面提到的greenlet大多都是指greenlet.greenlet这个class,请注意区别 

对于greenlet,最常用的写法是 x = gr.switch(y)。 这句话的意思是切换到gr,传入参数y。当从其他协程(不一定是这个gr)切换回来的时候,将值付给x。

import greenlet


def test1(x, y):
    z = gr2.switch(x + y)
    print("test1:%s" % z)


def test2(a):
    print('test2:%s' % a)
    gr1.switch(10)


gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
print(gr1.switch("Hello", "World"))

运行结果为:

test2:HelloWorld
test1:10
None

上面的例子,第10行从main greenlet切换到了gr1,test1第3行切换到了gs2,然后gr1挂起,第7行从gr2切回gr1时,将值(10)返回值给了 z。 

3、greenlet生命周期

 文章开始的地方提到第一个例子中的gr2其实并没有正常结束,我们可以借用greenlet.dead这个属性来查看

运行结果为:

 1 import greenlet
 2 
 3 
 4 def test1():
 5     gr2.switch(1)
 6     print("test1: finished")
 7 
 8 
 9 def test2(x):
10     print("test2:first %s" % x)
11     gr1.switch()
12     print("test2:back")
13 
14 gr1 = greenlet.greenlet(test1)
15 gr2 = greenlet.greenlet(test2)
16 gr1.switch()
17 print("gr1 is dead? : %s, gr2 is dead? :%s" % (gr1.dead, gr2.dead))
18 gr2.switch()
19 print("gr1 is dead? : %s, gr2 is dead? :%s" % (gr1.dead, gr2.dead))

运行结果为:

test2:first 1
test1: finished
gr1 is dead? : True, gr2 is dead? :False
test2:back
gr1 is dead? : True, gr2 is dead? :True

只有当协程对应的函数执行完毕,协程才会die,所以第一次Check的时候gr2并没有die,因为第12行切换出去了就没切回来。在main中再switch到gr2的时候, 执行后面的逻辑,gr2 die

4、greenlet注意事项

使用greenlet需要注意一下三点:

  第一:greenlet创生之后,一定要结束,不能switch出去就不回来了,否则容易造成内存泄露

  第二:python中每个线程都有自己的main greenlet及其对应的sub-greenlet ,不能线程之间的greenlet是不能相互切换的

  第三:不能存在循环引用,这个是官方文档明确说明

 1 from greenlet import greenlet, GreenletExit
 2 huge = []
 3 def show_leak():
 4     def test1():
 5         gr2.switch()
 6 
 7     def test2():
 8         huge.extend([x* x for x in range(100)])
 9         gr1.switch()
10         print 'finish switch del huge'
11         del huge[:]
12     
13     gr1 = greenlet(test1)
14     gr2 = greenlet(test2)
15     gr1.switch()
16     gr1 = gr2 = None
17     print 'length of huge is zero ? %s' % len(huge)
18 
19 if __name__ == '__main__':
20     show_leak() 

在test2函数中 第11行,我们将huge清空,然后再第16行将gr1、gr2的引用计数降到了0。但运行结果告诉我们,第11行并没有执行,所以如果一个协程没有正常结束是很危险的,往往不符合程序员的预期。greenlet提供了解决这个问题的办法,官网文档提到:如果一个greenlet实例的引用计数变成0,那么会在上次挂起的地方抛出GreenletExit异常,这就使得我们可以通过try ... finally 处理资源泄露的情况。如下面的代码: 

1 from greenlet import greenlet, GreenletExit
 2 huge = []
 3 def show_leak():
 4     def test1():
 5         gr2.switch()
 6 
 7     def test2():
 8         huge.extend([x* x for x in range(100)])
 9         try:
10             gr1.switch()
11         finally:
12             print 'finish switch del huge'
13             del huge[:]
14     
15     gr1 = greenlet(test1)
16     gr2 = greenlet(test2)
17     gr1.switch()
18     gr1 = gr2 = None
19     print 'length of huge is zero ? %s' % len(huge)
20 
21 if __name__ == '__main__':
22     show_leak()

上述代码的switch流程:main greenlet --> gr1 --> gr2 --> gr1 --> main greenlet, 很明显gr2没有正常结束(在第10行刮起了)。第18行之后gr1,gr2的引用计数都变成0,那么会在第10行抛出GreenletExit异常,因此finally语句有机会执行。同时,在文章开始介绍Greenlet module的时候也提到了,GreenletExit这个异常并不会抛出到parent,所以main greenlet也不会出异常。

四、gevent

greenlet已经实现了协程,但是这个还的人工切换,是不是觉得太麻烦了,不要捉急,python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent

其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。

由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO

import gevent


def f():
    for i in range(5):
        print("%s:%d"%(gevent.getcurrent(),i))


g1 = gevent.spawn(f)
g2 = gevent.spawn(f)
g3 = gevent.spawn(f)
g1.join()
g2.join()
g3.join()

运行结果为:

<Greenlet at 0x1ba533f9598: f(5)>:0
<Greenlet at 0x1ba533f9598: f(5)>:1
<Greenlet at 0x1ba533f9598: f(5)>:2
<Greenlet at 0x1ba533f9598: f(5)>:3
<Greenlet at 0x1ba533f9598: f(5)>:4
<Greenlet at 0x1ba533f97b8: f(5)>:0
<Greenlet at 0x1ba533f97b8: f(5)>:1
<Greenlet at 0x1ba533f97b8: f(5)>:2
<Greenlet at 0x1ba533f97b8: f(5)>:3
<Greenlet at 0x1ba533f97b8: f(5)>:4
<Greenlet at 0x1ba533f99d8: f(5)>:0
<Greenlet at 0x1ba533f99d8: f(5)>:1
<Greenlet at 0x1ba533f99d8: f(5)>:2
<Greenlet at 0x1ba533f99d8: f(5)>:3
<Greenlet at 0x1ba533f99d8: f(5)>:4

可以看到,3个greenlet是依次运行而不是交替运行

gevent的切换执行

import gevent


def f():
    for i in range(5):
        print("%s:%d"%(gevent.getcurrent(),i))
        gevent.sleep(0)


g1=gevent.spawn(f)
g2=gevent.spawn(f)
g3=gevent.spawn(f)
g1.join()
g2.join()
g3.join()

执行结果为:

<Greenlet at 0x20a5e719598: f>:0
<Greenlet at 0x20a5e7197b8: f>:0
<Greenlet at 0x20a5e7199d8: f>:0
<Greenlet at 0x20a5e719598: f>:1
<Greenlet at 0x20a5e7197b8: f>:1
<Greenlet at 0x20a5e7199d8: f>:1
<Greenlet at 0x20a5e719598: f>:2
<Greenlet at 0x20a5e7197b8: f>:2
<Greenlet at 0x20a5e7199d8: f>:2
<Greenlet at 0x20a5e719598: f>:3
<Greenlet at 0x20a5e7197b8: f>:3
<Greenlet at 0x20a5e7199d8: f>:3
<Greenlet at 0x20a5e719598: f>:4
<Greenlet at 0x20a5e7197b8: f>:4
<Greenlet at 0x20a5e7199d8: f>:4

3个greenlet交替运行

gevent.spawn 启动协程,参数为函数名称,参数名称

3、gevent并发下载器

monkey可以使一些阻塞的模块变得不阻塞,机制:遇到IO操作则自动切换,手动切换可以用gevent.sleep(0)

from gevent import monkey
import gevent
import urllib.request


#有I/O时需要这一句,如果没有这句话就会有阻塞状态,加上就没有阻塞
monkey.patch_all()


def myDownLoad(url):
    print("GET:%s"%url)
    resp = urllib.request.urlopen(url)
    data = resp.read()
    print("%d bytes received from %s"%(len(data),url))


gevent.joinall((
    gevent.spawn(myDownLoad,"http://www.baidu.com/"),
    gevent.spawn(myDownLoad,"https://apple.com"),
    gevent.spawn(myDownLoad,"https://www.cnblogs.com/Se7eN-HOU/")
))

运行结果为:

GET:http://www.baidu.com/
GET:https://apple.com
GET:https://www.cnblogs.com/Se7eN-HOU/
153390 bytes received from http://www.baidu.com/
18880 bytes received from https://www.cnblogs.com/Se7eN-HOU/
58865 bytes received from https://apple.com

从上能够看到是先发送的获取baidu的相关信息,然后依次是apple,cnblogs但是收到数据的先后顺序不一定与发送顺序相同,这也就体现出了异步,即不确定什么时候会收到数据,顺序不一定.

上面如果没有下面这句代码,

#有I/O时需要这一句,如果没有这句话就会有阻塞状态,加上就没有阻塞
monkey.patch_all()

执行结果如下

GET:http://www.baidu.com/
153378 bytes received from http://www.baidu.com/
GET:https://apple.com
58865 bytes received from https://apple.com
GET:https://www.cnblogs.com/Se7eN-HOU/
18880 bytes received from https://www.cnblogs.com/Se7eN-HOU/

每请求一个网站就会等着请求完毕了在执行第二个,在请求的过程中,网速慢等待的状态就是在阻塞。

五、asyncio

我们都知道,现在的服务器开发对于IO调度的优先级控制权已经不再依靠系统,都希望采用协程的方式实现高效的并发任务,如js、lua等在异步协程方面都做的很强大。

Python在3.4版本也加入了协程的概念,并在3.5确定了基本完善的语法和实现方式。同时3.6也对其进行了如解除了await和yield在同一个函数体限制等相关的优化。

event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

1、创建协程

首先定义一个协程,在def前加入async声明,就可以定义一个协程函数。

一个协程函数不能直接调用运行,只能把协程加入到事件循环loop中。asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

例如:

import asyncio


async def fun():
    print("---协程中---")

def main():
    print("---主线程中---")

    loop = asyncio.get_event_loop()
    loop.run_until_complete(fun())

if __name__ == "__main__":
    main()

运行结果:

---主线程中---
---协程中---

二、任务对象task

协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。

例如:

import asyncio


async def fun():
    print("---协程中---")
    return "Se7eN_HOU"

def main():
    print("---主线程中---")

    loop = asyncio.get_event_loop()
    #创建task
    task = loop.create_task(fun())
    print(task)
    loop.run_until_complete(task)
    print(task)

if __name__ == "__main__":
    main()

运行结果为:

---主线程中---
<Task pending coro=<fun() running at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4>>
---协程中---
<Task finished coro=<fun() done, defined at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> result='Se7eN_HOU'>

创建task后,task在加入事件循环之前是pending状态,因为fun()中没有耗时的阻塞操作,task很快就执行完毕了。后面打印的finished状态。
asyncio.ensure_future 和 loop.create_task都可以创建一个task,run_until_complete的参数是一个futrue对象。

 三、绑定回调

import asyncio

#协程
async def fun():
    print("---协程中---")
    return "Se7eN_HOU"

#协程的回调函数
def callback(future):
    #future.result是协程的返回值
    print("callBack:%s"%future.result())


def main():
    print("---主线程中---")
    #创建loop回路
    loop = asyncio.get_event_loop()
    #创建task
    task = loop.create_task(fun())
    #调用回调函数
    task.add_done_callback(callback)
    print(task)
    loop.run_until_complete(task)
    print(task)

if __name__ == "__main__":
    main()

运行结果为:

---主线程中---
<Task pending coro=<fun() running at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> cb=[callback() at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:9]>
---协程中---
callBack:Se7eN_HOU
<Task finished coro=<fun() done, defined at C:/Users/Se7eN_HOU/PycharmProjects/PythonLesson/test.py:4> result='Se7eN_HOU'>

也可以使用ensure_future获取返回值

例如:

import asyncio

#协程
async def fun():
    print("---协程中---")
    return "Se7eN_HOU"

#协程的回调函数
#def callback(future):
    #future.result是协程的返回值
    #print("callBack:%s"%future.result())


def main():
    #创建loop回路
    loop = asyncio.get_event_loop()
    #创建task
    #task = loop.create_task(fun())
    #调用回调函数
    #task.add_done_callback(callback)
    task = asyncio.ensure_future(fun())
    loop.run_until_complete(task)
    print("fun函数的返回值是:%s"%format(task.result()))

if __name__ == "__main__":
    main()

运行结果为:

---协程中---
fun函数的返回值是:Se7eN_HOU

四、await阻塞,执行并发

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行。
耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

例如:

import asyncio


async def test1():
    print("---1---")
    await asyncio.sleep(5)    
    print("---2---")


async def test2():
    print("---3---")
    await asyncio.sleep(1)
    print("---4---")


async def test3():
    print("---5---")
    await asyncio.sleep(3)
    print("---6---")

def main():
    loop = asyncio.get_event_loop()
    print("begin")

    t1 = test1()
    t2 = test2()
    t3 = test3()
    tasks1 = [t1,t2,t3]


    loop.run_until_complete(asyncio.wait(tasks1))
    print("end")
    loop.close()

if __name__=="__main__":
    main()

运行结果为:

begin
---3---
---1---
---5---
---4---
---6---
---2---
end

 

上一篇:python网络-多任务实现之协程


下一篇:Python几种并发实现方案的性能比较