Python实现协程的四种方式

协程


协程不是计算机提供的,是人为创造的上下文切换技术,也可以被称为微线程。简而言之 其实就是在一个线程中实现代码块相互切换执行
我们知道正常代码是从上到下依次执行,一个方法或函数操作完毕后才会进入下一个方法或函数执行。例如:

def func1():
	print(1)
	print(2)
	
def func2():
	print(3)
	print(4)

func1()
func2()

此时代码执行逻辑一定是先执行完func1()对象里的语句再执行func2() ,这种称为同步。但是如果我们想在func1()对象中print(1)后切换到func2()该怎么做呢?


可以采用以下几种基于协程的方式:

  • greenlet。
  • yield 关键字
  • asyncio 装饰器(py3.4之后引入)
  • async、await关键字(py3.5之后引入)【推荐】

1. greenlet实现协程


# greenlet是第三方模块需先引入
pip3 install greenlet
# -*- coding: utf-8 -*-
# author: micher.yu
# Time:2022/01/08
# simple_desc :
from greenlet import greenlet


def func1():
	print(1)  # 第二步:输出1
	gr2.switch()  # 第三步:切换到 func2 函数
	print(2)  # 第六步:输出2
	gr2.switch()  # 第七步:切换到func2 函数(如果不切换的话句柄会继续往下执行,也就不会进入func2 输出4)


def func2():
	print(3)  # 第四步:输出3
	gr1.switch()  # 第五步:切换到func1 函数
	print(4)  # 第八步:输出4,func2函数 执行完毕句柄继续往下执行


def func3():
	print(5)  # 第十步:输出5


gr1 = greenlet(func1)  # 此处只是生成greenlet包装的func1对象,代码并不会实际运行
gr2 = greenlet(func2)  # 此处生成greenlet包装的func2对象

gr1.switch()  # 第一步:此处是正式执行func1()对象
func3()  # 第九步:实例化func3

# 所以实际输出会是 1 3 2 4 5

2. yield关键字


不推荐,实际应用场景比较少。

如果对yield关键字还不太熟悉的话可以参考往期这篇文章详解python三大器——迭代器、生成器、装饰器其中生成器部分有详细讲解

def func1():
	yield 1
	yield from func2()  # 这里其实相当于for item in func2(): yield item 
	yield 2


def func2():
	yield 3
	yield 4


for item in func1():
	print(item)

# 输出结果将会是:1 3 4 2

3. asyncio 模块


在python3.4及之后的版本才可使用,这个框架使用事件循环来编排回调和异步任务。事件循环位于事件循环策略的上下文中。
下图是协程,事件循环和策略之间的相互作用
Python实现协程的四种方式
注意:asyncio牛逼在于遇到IO阻塞自动切换!

下面我们使用@asyncio.coroutine 装饰器(py3.10+会移除)定义了两个协程函数。(基于生成器的协程

import asyncio


@asyncio.coroutine
def func1():
	print(1)
	# 此处用asyncio.sleep(2)来模拟IO耗时(asyncio.sleep也是一个协程对象,不能用time.sleep()),asyncio定义的协程函数遇到IO操作时会自动切换到事件循环中的其他任务
	yield from asyncio.sleep(2)  
	print(2)


@asyncio.coroutine
def func2():
	print(3)
	yield from asyncio.sleep(2)
	print(4)

PS:如果py版本高于3.8依然可以使用asyncio.coroutine装饰器但是会有告警建议你使用async & await关键字来定义协程函数,不会影响使用!
Python实现协程的四种方式
协程函数并不能像普通函数一样直接实例化运行,调用协程函数协程并不会开始运行,只是返回一个协程对象。

fun1() # 此处是不会有结果的

可以通过 asyncio.iscoroutine 来验证是否是协程对象

print(asyncio.iscoroutine(func1()))  # True

协程对象必须在事件循环中运行,我们可以通过asyncio.get_event_loop方法来获取当前正在运行的循环实例。如loop对象,然后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里得到运行。

loop = asyncio.get_event_loop()
loop.run_until_complete(func1())
# 运行结果为:
# 1
# 等待2s
# 2

run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回;所以他必须接受的是一个可等待对象协程, 任务future对象)。
run_until_complete 的参数是一个 future,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查
Python实现协程的四种方式
要让这个协程对象转成future的话,可以通过 asyncio.ensure_future 方法。
Python实现协程的四种方式
所以,我们可以写得更明显一些:

loop = asyncio.get_event_loop()
loop.run_until_complete(asnycio.ensure_future(func1()))
# 运行结果为:
# 1
# 等待2s
# 2

在有多个协程函数需要运行时怎么办?

  1. 我们可以将协程对象包装成future对象后再放到一个列表中再通过asyncio.wait运行。

    asyncio.wait方法await关键字只能传可等待 对象

    tasks = [
    	asyncio.ensure_future(func1()),  # 把协程对象包转成一个 future 对象
    	asyncio.ensure_future(func2())
    ]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    # 运行结果为:
    # 1
    # 3
    # 等待2s
    # 2
    # 4
    
  2. 通过asyncio.gather可以直接将协程对象放到列表中(必须解包!也就是*[]):

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*[func1(), func2()]))
    # 运行结果为:
    # 1
    # 3
    # 等待2s
    # 2
    # 4
    

完整代码为:

import asyncio


@asyncio.coroutine
def func1():
	print(1)
	yield from asyncio.sleep(2)  # 此处用asyncio.sleep(2)来模拟IO耗时(asyncio.sleep也是一个协程对象,不能用time.sleep()),自动切换到tasks中的其他任务
	print(2)


@asyncio.coroutine
def func2():
	print(3)
	yield from asyncio.sleep(2)  # 此处又遇到IO阻塞后,又会自动切换到tasks中其他的任务
	print(4)


func1()  # 调用协程函数,协程并不会开始运行,只是返回一个协程对象。可以通过 asyncio.iscoroutine 来验证是否是协程对象
print(asyncio.iscoroutine(func1()))  # True

tasks = [
	asyncio.ensure_future(func1()),  # 把协程对象包转成一个 future 对象
	asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
# 方式一:
loop.run_until_complete(asyncio.wait(tasks))
# 方式二:
loop.run_until_complete(asyncio.gather(*[func1(), func2()]))

同样我们也可以执行Task对象

Task 对象的作用是在运行某个任务的同时可以并发的运行其他任务。

Task 对象可以使用 asyncio.create_task() 函数创建,也可以使用 loop.create_task()

  • 取消 Task 对象 cancel()
  • Task 任务是否被取消 cancelled()
  • Task 对象是否完成 done()
  • 返回结果 result()
    1.Task 对象被完成,则返回结果
    2.Task 对象被取消,则引发 CancelledError 异常
    3.Task 对象的结果不可用,则引发 InvalidStateError 异常
  • 添加回调,任务完成时触发 add_done_callback(task)
  • 所有任务列表 asyncio.all_tasks()
  • 返回当前任务 asyncio.current_task()

使用 loop 对象的 create_task 函数创建一个 Task 对象,在第一次打印 Task 对象时,状态为 pending,完成执行函数后的状态为 finished

import asyncio



async def do_something():
	print("这是一个Task例子....")
	# 模拟阻塞1秒
	await asyncio.sleep(1)
	return "Task任务完成"


# 创建一个事件event_loop
loop = asyncio.get_event_loop()

# 创建一个task
task = loop.create_task(do_something())
# 第一次打印task
print(task)

# 将task加入到event_loop中
loop.run_until_complete(task)
# 再次打印task
print(task)
print(task.result())
""" 运行结果
#1 <Task pending name='Task-1' coro=<do_something() running at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97>>
#2 这是一个Task例子....
#3 <Task finished name='Task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task任务完成'>
#4 Task任务完成
"""

Task 对象的 result() 函数可以获取 do_something() 函数的返回值。

Task 任务回调

import asyncio



async def do_something(task_id):
	print(f"这是一个Task例子,当前task_id:{task_id}")
	# 模拟阻塞1秒
	await asyncio.sleep(1)
	return f"Task-id {task_id} 任务完成"


# 任务完成后的回调函数
def callback(task):
	# 打印参数
	print(task)
	# 打印返回的结果
	print(task.result())


# 创建一个事件event_loop
loop = asyncio.get_event_loop()

# 创建一个task
tasks = []
for i in range(5):
	name = f"task-{i}"
	task = loop.create_task(do_something(name), name=name)
	task.add_done_callback(callback)
	tasks.append(task)

# 将task加入到event_loop中
loop.run_until_complete(asyncio.wait(tasks))
""" 输出为:
这是一个Task例子,当前task_id:task-0
这是一个Task例子,当前task_id:task-1
这是一个Task例子,当前task_id:task-2
这是一个Task例子,当前task_id:task-3
这是一个Task例子,当前task_id:task-4
<Task finished name='task-0' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-0 任务完成'>
Task-id task-0 任务完成
<Task finished name='task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-1 任务完成'>
Task-id task-1 任务完成
<Task finished name='task-2' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-2 任务完成'>
Task-id task-2 任务完成
<Task finished name='task-3' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-3 任务完成'>
Task-id task-3 任务完成
<Task finished name='task-4' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-4 任务完成'>
Task-id task-4 任务完成
"""

使用 asyncio.wait() 函数将 Task 任务列表添加到 event_loop 中,也可以使用 asyncio.gather() 函数。

多个任务执行结束后再回调

import asyncio
import functools



async def do_something(t):
	print("暂停" + str(t) + "秒")
	await asyncio.sleep(t)
	return "暂停了" + str(t) + "秒"


def callback(event_loop, gatheringFuture):
	print(gatheringFuture.result())
	print("多个Task任务完成后的回调")


loop = asyncio.get_event_loop()
gather = asyncio.gather(do_something(1), do_something(3))
gather.add_done_callback(functools.partial(callback, loop))

loop.run_until_complete(gather)
""" 输出为:
暂停1秒
暂停3秒
['暂停了1秒', '暂停了3秒']
多个Task任务完成后的回调
"""

4. async & await 关键字【推荐

上一篇:发送短信按钮倒计时


下一篇:FastAPI(41)- Background Task 后台任务