接续第一话的内容,事件循环在创建之后,又是如何运行协程任务以及异步IO任务的?
由asyncio.run
的代码可知,loop.run_until_complete
是运行协程的方法。其定义如下:
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
self._check_closed()
self._check_running()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()
# tasks.py
def ensure_future(coro_or_future, *, loop=None):
return _ensure_future(coro_or_future, loop=loop)
def _ensure_future(coro_or_future, *, loop=None):
if futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
return coro_or_future
if not coroutines.iscoroutine(coro_or_future):
if inspect.isawaitable(coro_or_future):
coro_or_future = _wrap_awaitable(coro_or_future)
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable '
'is required')
if loop is None:
loop = events._get_event_loop(stacklevel=4)
return loop.create_task(coro_or_future)
run_until_complete
方法传入的协程会通过tasks.ensure_future
方法被封装成一个task
实例,从上述的代码可以看到,最终落实到了loop.create_task
方法。
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def create_task(self, coro, *, name=None):
self._check_closed()
if self._task_factory is None:
task = tasks.Task(coro, loop=self, name=name)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
tasks._set_task_name(task, name)
return task
# task.py
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None, name=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
if not coroutines.iscoroutine(coro):
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")
if name is None:
self._name = f'Task-{_task_name_counter()}'
else:
self._name = str(name)
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
self._context = contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def call_soon(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle)
return handle
loop.create_task
方法最终会生成一个Task
实例。Task
实例封装了协程以及其它一系列变量,最终调用loop
的call_soon
方法,传入了实例的__step
函数。call_soon
方法传入的函数,会通过events.Handle
封装生成一个handle
实例,并加入到事件循环的_ready
队列中。
__step
方法会通过coro.send(None)
或是coro.throw(exc)
方法启动Task
实例内部的协程并获取协程的返回结果,对于一般协程而言coro.send(None)
会直接throw
一个StopIteration
异常,并在异常结果里附上协程返回值。当然,也有其它情况(比如await
了一个yield
多次的Awaitable
实例)可能要多次call_soon
协程Task
的__step
函数,相关的例子可以查看*的这篇文章。
在这之后,我们再回到run_until_complete
方法,在ensure_future
后,便调用loop.run_forever
方法,启动事件循环。
# windows_events.py
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def run_forever(self):
try:
assert self._self_reading_future is None
self.call_soon(self._loop_self_reading)
super().run_forever()
finally:
if self._self_reading_future is not None:
ov = self._self_reading_future._ov
self._self_reading_future.cancel()
if ov is not None:
self._proactor._unregister(ov)
self._self_reading_future = None
# proactor_events.py
class BaseProactorEventLoop(base_events.BaseEventLoop):
def _loop_self_reading(self, f=None):
try:
if f is not None:
f.result() # may raise
if self._self_reading_future is not f:
return
f = self._proactor.recv(self._ssock, 4096)
except exceptions.CancelledError:
return
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self.call_exception_handler({
'message': 'Error on reading from the event loop self pipe',
'exception': exc,
'loop': self,
})
else:
self._self_reading_future = f
f.add_done_callback(self._loop_self_reading)
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
def run_forever(self):
self._check_closed()
self._check_running()
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()
old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
finally:
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)
def _run_once(self):
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
event_list = self._selector.select(timeout)
self._process_events(event_list)
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
# debug模式下代码,加了时间统计
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
ProactorEventLoop
在调用run_forever
时,首先会用call_soon
方法将_loop_self_reading
方法加入排期。_loop_self_reading
方法会读取proactor
中的future
,并且将自己加入future
的完成时回调,实现不间断地读取future
实例。
之后,ProactorEventLoop
调用了BaseEventLoop
的run_forever
方法,在其中会不断执行_run_once
方法去一遍遍地迭代事件循环。一轮_run_once
会做以下几件事情:
- 清理
_scheduled
中被取消的定时任务 -
select
出事件列表并进行处理 - 从
_scheduled
取出到时的任务,加入到_ready
列表中- 由上面的逻辑也可知,
call_soon
的任务也会被加入到_ready
列表中
- 由上面的逻辑也可知,
- 从
_ready
列表中依次取出所有handle
,调用_run
方法运行
通过这种机制,事件循环就能持续不断地运行任务。
由上述_run_once
的定义也可知,在select
事件列表一步会出现IOCP
的身影,这是因为BaseProactorEventLoop
的selector
就是proactor
,实际传入的就是IOCP
实例,因此最终就是调用了IOCP
实例的select
方法。也是只有在这一步,才会去处理一些IO操作。
所以问题来了,针对IO操作,asyncio
是如何进行调度的呢?我们首先看IocpProactor.select
的实现:
# windows_events.py
class IocpProactor:
def select(self, timeout=None):
if not self._results:
self._poll(timeout)
tmp = self._results
self._results = []
return tmp
def _poll(self, timeout=None):
if timeout is None:
ms = INFINITE
elif timeout < 0:
raise ValueError("negative timeout")
else:
ms = math.ceil(timeout * 1e3)
if ms >= INFINITE:
raise ValueError("timeout too big")
while True:
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
if self._loop.get_debug():
self._loop.call_exception_handler({
'message': ('GetQueuedCompletionStatus() returned an '
'unexpected event'),
'status': ('err=%s transferred=%s key=%#x address=%#x'
% (err, transferred, key, address)),
})
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue
if obj in self._stopped_serving:
f.cancel()
elif not f.done():
try:
value = callback(transferred, key, ov)
except OSError as e:
f.set_exception(e)
self._results.append(f)
else:
f.set_result(value)
self._results.append(f)
# Remove unregistered futures
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()
在IocpProactor._poll
中,会调用GetQueuedCompletionStatus
去查询完成端口的结果。直到有结果出现,才会根据结果中缓存的address
数据pop
出缓存的回调并且执行。
我们通过剖析一个IO操作的例子,来观察其中具体的奥秘:
from multiprocessing import Process
import asyncio
import time
HOST, PORT = '127.0.0.1', 31077
async def _svr_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
data = await reader.read(1024)
msg = data.decode()
print(f'[Server] recv: {msg}')
msg_back = ''.join([msg[i] for i in range(len(msg) - 1, 0, -1)])
print(f'[Server] send: {msg_back}')
writer.write(msg_back.encode())
await writer.drain()
writer.close()
async def _svr_task():
svr = await asyncio.start_server(_svr_handler, host=HOST, port=PORT)
async with svr:
await svr.serve_forever()
def _svr():
asyncio.run(_svr_task())
async def _test_cli(msg: str):
reader, writer = await asyncio.open_connection(HOST, PORT)
print(f'[Client] send: {msg}')
writer.write(msg.encode())
await writer.drain()
data = await reader.read(1024)
print(f'[Client] recv: {data.decode()}')
writer.close()
await writer.wait_closed()
def test_cli():
p = Process(target=_svr, daemon=True)
p.start()
time.sleep(0.5)
_msg = 'helloworld'
asyncio.run(_test_cli(_msg))
p.kill()
if __name__ == '__main__':
test_cli()
这是一个很简单的echo server
的实现,client
发送给server
信息,server
返回信息的reverse
。我们以client
的写操作writer.write
为例,看下IO事件是如何在事件循环里被处理的。
首先,open_connection
函数创建了对特定host
、port
的连接,并返回连接流的reader
跟writer
。
async def open_connection(host=None, port=None, *,
limit=_DEFAULT_LIMIT, **kwds):
loop = events.get_running_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
针对reader
,首先会初始化一个StreamReader
实例,再用StreamReaderProtocol
对reader
做进一步的封装。
针对writer
,首先会通过loop
的create_connection
方法,针对本次连接创建transport
实例,相当于一个通信管道的封装。transport
实例会与先前创建的StreamReaderProtocol
实例进行绑定。然后,再将创建的transport
实例和writer
绑定。
在ProactorEventLoop
中,会这样创建transport
实例:
# proactor_events.py
class BaseProactorEventLoop(base_events.BaseEventLoop):
return _ProactorSocketTransport(self, sock, protocol, waiter,
extra, server)
class _ProactorSocketTransport(_ProactorReadPipeTransport,
_ProactorBaseWritePipeTransport,
transports.Transport):
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server)
base_events._set_nodelay(sock)
_ProactorSocketTransport
实例会同时对_ProactorReadPipeTransport
以及_ProactorBaseWritePipeTransport
的初始化,因此会提供对管道读写的功能。其继承链如下:
(<class 'asyncio.proactor_events._ProactorSocketTransport'>,
<class 'asyncio.proactor_events._ProactorReadPipeTransport'>,
<class 'asyncio.proactor_events._ProactorBaseWritePipeTransport'>,
<class 'asyncio.proactor_events._ProactorBasePipeTransport'>,
<class 'asyncio.transports._FlowControlMixin'>,
<class 'asyncio.transports.Transport'>,
<class 'asyncio.transports.ReadTransport'>,
<class 'asyncio.transports.WriteTransport'>,
<class 'asyncio.transports.BaseTransport'>,
<class 'object'>)
之后,当客户端开始写操作,调用writer.write
时,实质是进行了以下操作:
# proactor_events.py
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
transports.WriteTransport):
def write(self, data):
# 省略一些判断逻辑
# Observable states:
# 1. IDLE: _write_fut and _buffer both None
# 2. WRITING: _write_fut set; _buffer None
# 3. BACKED UP: _write_fut set; _buffer a bytearray
# We always copy the data, so the caller can't modify it
# while we're still waiting for the I/O to happen.
if self._write_fut is None: # IDLE -> WRITING
assert self._buffer is None
# Pass a copy, except if it's already immutable.
self._loop_writing(data=bytes(data))
elif not self._buffer: # WRITING -> BACKED UP
# Make a mutable copy which we can extend.
self._buffer = bytearray(data)
self._maybe_pause_protocol()
else: # BACKED UP
# Append to buffer (also copies).
self._buffer.extend(data)
self._maybe_pause_protocol()
def _loop_writing(self, f=None, data=None):
try:
if f is not None and self._write_fut is None and self._closing:
return
assert f is self._write_fut
self._write_fut = None
self._pending_write = 0
if f:
f.result()
if data is None:
data = self._buffer
self._buffer = None
if not data:
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
if self._eof_written:
self._sock.shutdown(socket.SHUT_WR)
self._maybe_resume_protocol()
else:
self._write_fut = self._loop._proactor.send(self._sock, data)
if not self._write_fut.done():
assert self._pending_write == 0
self._pending_write = len(data)
self._write_fut.add_done_callback(self._loop_writing)
self._maybe_pause_protocol()
else:
self._write_fut.add_done_callback(self._loop_writing)
if self._empty_waiter is not None and self._write_fut is None:
self._empty_waiter.set_result(None)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal write error on pipe transport')
第一次write
时,write_future
以及buffer
为空,因此触发了_loop_writing
逻辑。在_loop_writing
中,调用了self._loop._proactor.send(self._sock, data)
生成了一个写操作的future
。而_proactor
,也就是在ProactorEventLoop
里的IocpProactor
实例了。
# windows_events.py
class IocpProactor:
def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
if isinstance(conn, socket.socket):
ov.WSASend(conn.fileno(), buf, flags)
else:
ov.WriteFile(conn.fileno(), buf)
def finish_send(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
else:
raise
return self._register(ov, conn, finish_send)
def _register_with_iocp(self, obj):
if obj not in self._registered:
self._registered.add(obj)
_overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
def _register(self, ov, obj, callback):
self._check_closed()
f = _OverlappedFuture(ov, loop=self._loop)
if f._source_traceback:
del f._source_traceback[-1]
if not ov.pending:
try:
value = callback(None, None, ov)
except OSError as e:
f.set_exception(e)
else:
f.set_result(value)
self._cache[ov.address] = (f, ov, obj, callback)
return f
在send
方法中,做了如下操作:
- 通过
CreateIoCompletionPort
注册socket
到完成端口 - 创建一个
Overlapped
实例,通过WSASend
发送数据到socket
- 创建一个关联
Overlapped
实例的future
,并且判断Overlapped
实例如果不在pending
状态就直接执行回调。之后,缓存这个future
实例到_cache
中。
在先前已经提到,事件循环执行时,IocpProactor
实例会调用_poll
方法,其中会采用GetQueuedCompletionStatus
查询IO操作完成结果。如果发现有IO操作完成,就会从操作中提取ov.address
并在_cache
中pop
出回调然后执行。这样通过IOCP
模型加上事件循环(事件循环实质就是IOCP
里头的worker
),就把writer.write
的操作从开始到完成的步骤给串起来了。
之后就是await writer.drain
,实质是做了如下操作:
# streams.py
class StreamWriter:
async def drain(self):
await self._protocol._drain_helper()
class FlowControlMixin(protocols.Protocol): # 这个会被StreamReaderProtocol继承
async def _drain_helper(self):
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
return
waiter = self._drain_waiter
assert waiter is None or waiter.cancelled()
waiter = self._loop.create_future()
self._drain_waiter = waiter
await waiter
writer.drain
实质上是await
了StreamReaderProtocol
实例的_drain_helper
协程,其中做了一些前置检查,然后依据当前事件循环设置了一个_drain_waiter
的future
实例,并await
。为什么要这么做呢?
首先,我们可以观察得知,在_run_once
的逻辑中,如果_ready
队列有任务,或者是有_scheduled
里头的定时任务,那么之后IocpProactor._poll
里头的GetQueuedCompletionStatus
就会有timeout
,否则GetQueuedCompletionStatus
对应的timeout
就是INFINITE
,会一直阻塞直到有IO事件完成。有兴趣的同学可以创建一个协程任务,里头create_future
之后await
下,一试便知。
然后,回到_ProactorBaseWritePipeTransport
的_loop_writing
方法。_write_fut
被创建后,会直接添加_loop_writing
为自己的完成回调。当IocpProactor
实例由GetQueuedCompletionStatus
获得一个完成事件之后,会取出来执行ov.getresult()
(在send
方法的finish_send
里头)来获取结果,这个结果就会被放到_write_fut
作为其最终的返回结果。此时_write_fut
由于完成了,因此会调用自己的回调_loop_writing
,但这个时候因为buffer
里没有数据了,所以就会走到_maybe_resume_protocol
# transports.py
class _FlowControlMixin(Transport):
def _maybe_resume_protocol(self):
if (self._protocol_paused and
self.get_write_buffer_size() <= self._low_water):
self._protocol_paused = False
try:
self._protocol.resume_writing()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.resume_writing() failed',
'exception': exc,
'transport': self,
'protocol': self._protocol,
})
# streams.py
class FlowControlMixin(protocols.Protocol):
def resume_writing(self):
assert self._paused
self._paused = False
if self._loop.get_debug():
logger.debug("%r resumes writing", self)
waiter = self._drain_waiter
if waiter is not None:
self._drain_waiter = None
if not waiter.done():
waiter.set_result(None)
在writer.drain
中,我们实际上是一直在await
这个_drain_waiter
。在调用_maybe_resume_protocol
之后,实际是走到了StreamReaderProtocol
实例的resume_writing
方法,在FlowControlMixin
类被定义。这个方法执行了两个操作:
- 将
_paused
状态置为False
-
_loop_writing
中,如果数据没发完,就会另外走到_maybe_pause_protocol
,会把这个状态置为true
。此时调用await writer.drain
,就正好会走到了await _drain_waiter
-
- 将
_drain_waiter
设置完成。这样,await writer.drain
就能完成了
客户端的写操作,在事件循环里就是通过如上复杂的方式调度的。总的来说,是如下的步骤:
- 用户调用
writer.write
将数据传进transport
的写buffer
-
transport
的_loop_writing
发现buffer
有数据,创建一个写future
- 通过
CreateIoCompletionPort
绑定socket
跟完成端口 - 通过
WSASend
发送数据 - 设置一个回调用来取发送数据的结果
- 取出来的结果给到写
future
- 写
future
预先设置_loop_writing
为完成回调,得到结果后执行下一轮_loop_writing
- 通过
- 用户调用
await writer.drain
- 写
future
在创建后,发现写future
没有到完成状态,先调用_maybe_pause_protocol
设置protocol
的_paused
为True
- 在
writer.drain
里判断protocol
为_paused
,重置_drain_waiter
为新的实例并await
- 写操作完成,触发写
future
的回调_loop_writing
。下一轮_loop_writing
发现没有数据发送,调用_maybe_resume_protocol
,设置protocol
的_paused
为False
,并设置_drain_waiter
为完成 -
_drain_waiter
完成,退出await writer.drain
- 写
针对读操作,以及其它的IO操作,有兴趣的小伙伴可以深入研究_