协程
协程不是计算机提供的,是人为创造的上下文切换技术,也可以被称为微线程。简而言之 其实就是在一个线程中实现代码块相互切换执行。
我们知道正常代码是从上到下依次执行,一个方法或函数操作完毕后才会进入下一个方法或函数执行。例如:
def func1():
print(1)
print(2)
def func2():
print(3)
print(4)
func1()
func2()
此时代码执行逻辑一定是先执行完func1()对象里的语句再执行func2() ,这种称为同步。但是如果我们想在func1()对象中print(1)后切换到func2()该怎么做呢?
可以采用以下几种基于协程的方式:
- greenlet。
- yield 关键字
- asyncio 装饰器(py3.4之后引入)
- async、await关键字(py3.5之后引入)【推荐】
1. greenlet实现协程
# greenlet是第三方模块需先引入
pip3 install greenlet
# -*- coding: utf-8 -*-
# author: micher.yu
# Time:2022/01/08
# simple_desc :
from greenlet import greenlet
def func1():
print(1) # 第二步:输出1
gr2.switch() # 第三步:切换到 func2 函数
print(2) # 第六步:输出2
gr2.switch() # 第七步:切换到func2 函数(如果不切换的话句柄会继续往下执行,也就不会进入func2 输出4)
def func2():
print(3) # 第四步:输出3
gr1.switch() # 第五步:切换到func1 函数
print(4) # 第八步:输出4,func2函数 执行完毕句柄继续往下执行
def func3():
print(5) # 第十步:输出5
gr1 = greenlet(func1) # 此处只是生成greenlet包装的func1对象,代码并不会实际运行
gr2 = greenlet(func2) # 此处生成greenlet包装的func2对象
gr1.switch() # 第一步:此处是正式执行func1()对象
func3() # 第九步:实例化func3
# 所以实际输出会是 1 3 2 4 5
2. yield关键字
不推荐,实际应用场景比较少。
如果对yield关键字还不太熟悉的话可以参考往期这篇文章详解python三大器——迭代器、生成器、装饰器其中生成器部分有详细讲解
def func1():
yield 1
yield from func2() # 这里其实相当于for item in func2(): yield item
yield 2
def func2():
yield 3
yield 4
for item in func1():
print(item)
# 输出结果将会是:1 3 4 2
3. asyncio 模块
在python3.4及之后的版本才可使用,这个框架使用事件循环来编排回调和异步任务。事件循环位于事件循环策略的上下文中。
下图是协程,事件循环和策略之间的相互作用
注意:asyncio
牛逼在于遇到IO阻塞
自动切换!
下面我们使用@asyncio.coroutine
装饰器(py3.10+会移除)定义了两个协程函数。(基于生成器的协程)
import asyncio
@asyncio.coroutine
def func1():
print(1)
# 此处用asyncio.sleep(2)来模拟IO耗时(asyncio.sleep也是一个协程对象,不能用time.sleep()),asyncio定义的协程函数遇到IO操作时会自动切换到事件循环中的其他任务
yield from asyncio.sleep(2)
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2)
print(4)
PS:如果py版本高于3.8依然可以使用asyncio.coroutine
装饰器但是会有告警建议你使用async & await
关键字来定义协程函数,不会影响使用!
协程函数并不能像普通函数一样直接实例化运行,调用协程函数协程并不会开始运行,只是返回一个协程对象。
fun1() # 此处是不会有结果的
可以通过 asyncio.iscoroutine
来验证是否是协程对象
print(asyncio.iscoroutine(func1())) # True
协程对象必须在事件循环中运行,我们可以通过asyncio.get_event_loop
方法来获取当前正在运行的循环实例。如loop
对象,然后把协程对象交给 loop.run_until_complete
,协程对象随后会在 loop
里得到运行。
loop = asyncio.get_event_loop()
loop.run_until_complete(func1())
# 运行结果为:
# 1
# 等待2s
# 2
run_until_complete
是一个阻塞(blocking)调用,直到协程运行结束,它才返回;所以他必须接受的是一个可等待对象
(协程
, 任务
和future对象)。run_until_complete
的参数是一个 future
,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查
要让这个协程对象转成future
的话,可以通过 asyncio.ensure_future
方法。
所以,我们可以写得更明显一些:
loop = asyncio.get_event_loop()
loop.run_until_complete(asnycio.ensure_future(func1()))
# 运行结果为:
# 1
# 等待2s
# 2
在有多个协程函数需要运行时怎么办?
-
我们可以将协程对象包装成
future对象
后再放到一个列表中再通过asyncio.wait
运行。asyncio.wait方法
或await关键字
只能传可等待
对象tasks = [ asyncio.ensure_future(func1()), # 把协程对象包转成一个 future 对象 asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) # 运行结果为: # 1 # 3 # 等待2s # 2 # 4
-
通过
asyncio.gather
可以直接将协程对象放到列表中(必须解包!也就是*[]):loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*[func1(), func2()])) # 运行结果为: # 1 # 3 # 等待2s # 2 # 4
完整代码为:
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 此处用asyncio.sleep(2)来模拟IO耗时(asyncio.sleep也是一个协程对象,不能用time.sleep()),自动切换到tasks中的其他任务
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 此处又遇到IO阻塞后,又会自动切换到tasks中其他的任务
print(4)
func1() # 调用协程函数,协程并不会开始运行,只是返回一个协程对象。可以通过 asyncio.iscoroutine 来验证是否是协程对象
print(asyncio.iscoroutine(func1())) # True
tasks = [
asyncio.ensure_future(func1()), # 把协程对象包转成一个 future 对象
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
# 方式一:
loop.run_until_complete(asyncio.wait(tasks))
# 方式二:
loop.run_until_complete(asyncio.gather(*[func1(), func2()]))
同样我们也可以执行Task对象
:
Task 对象
的作用是在运行某个任务的同时可以并发的运行其他任务。
Task 对象
可以使用 asyncio.create_task()
函数创建,也可以使用 loop.create_task()
- 取消 Task 对象
cancel()
- Task 任务是否被取消
cancelled()
- Task 对象是否完成
done()
- 返回结果
result()
1.Task 对象被完成,则返回结果
2.Task 对象被取消,则引发 CancelledError 异常
3.Task 对象的结果不可用,则引发 InvalidStateError 异常 - 添加回调,任务完成时触发
add_done_callback(task)
- 所有任务列表
asyncio.all_tasks()
- 返回当前任务
asyncio.current_task()
使用 loop
对象的 create_task
函数创建一个 Task
对象,在第一次打印 Task
对象时,状态为 pending
,完成执行函数后的状态为 finished
。
import asyncio
async def do_something():
print("这是一个Task例子....")
# 模拟阻塞1秒
await asyncio.sleep(1)
return "Task任务完成"
# 创建一个事件event_loop
loop = asyncio.get_event_loop()
# 创建一个task
task = loop.create_task(do_something())
# 第一次打印task
print(task)
# 将task加入到event_loop中
loop.run_until_complete(task)
# 再次打印task
print(task)
print(task.result())
""" 运行结果
#1 <Task pending name='Task-1' coro=<do_something() running at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97>>
#2 这是一个Task例子....
#3 <Task finished name='Task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task任务完成'>
#4 Task任务完成
"""
Task
对象的 result()
函数可以获取 do_something()
函数的返回值。
Task 任务回调
import asyncio
async def do_something(task_id):
print(f"这是一个Task例子,当前task_id:{task_id}")
# 模拟阻塞1秒
await asyncio.sleep(1)
return f"Task-id {task_id} 任务完成"
# 任务完成后的回调函数
def callback(task):
# 打印参数
print(task)
# 打印返回的结果
print(task.result())
# 创建一个事件event_loop
loop = asyncio.get_event_loop()
# 创建一个task
tasks = []
for i in range(5):
name = f"task-{i}"
task = loop.create_task(do_something(name), name=name)
task.add_done_callback(callback)
tasks.append(task)
# 将task加入到event_loop中
loop.run_until_complete(asyncio.wait(tasks))
""" 输出为:
这是一个Task例子,当前task_id:task-0
这是一个Task例子,当前task_id:task-1
这是一个Task例子,当前task_id:task-2
这是一个Task例子,当前task_id:task-3
这是一个Task例子,当前task_id:task-4
<Task finished name='task-0' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-0 任务完成'>
Task-id task-0 任务完成
<Task finished name='task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-1 任务完成'>
Task-id task-1 任务完成
<Task finished name='task-2' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-2 任务完成'>
Task-id task-2 任务完成
<Task finished name='task-3' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-3 任务完成'>
Task-id task-3 任务完成
<Task finished name='task-4' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-4 任务完成'>
Task-id task-4 任务完成
"""
使用 asyncio.wait()
函数将 Task 任务列表
添加到 event_loop
中,也可以使用 asyncio.gather()
函数。
多个任务执行结束后再回调
import asyncio
import functools
async def do_something(t):
print("暂停" + str(t) + "秒")
await asyncio.sleep(t)
return "暂停了" + str(t) + "秒"
def callback(event_loop, gatheringFuture):
print(gatheringFuture.result())
print("多个Task任务完成后的回调")
loop = asyncio.get_event_loop()
gather = asyncio.gather(do_something(1), do_something(3))
gather.add_done_callback(functools.partial(callback, loop))
loop.run_until_complete(gather)
""" 输出为:
暂停1秒
暂停3秒
['暂停了1秒', '暂停了3秒']
多个Task任务完成后的回调
"""