通过run_forever
或run_until_complete
方法可以让我们完成初始化的loop run起来。
其中run_until_complete
是将需要执行的函数"Task"化之后再调用run_forever
,而run_forever
真正将loop **loop**起来是通过
1
2
3
4
|
while True:
self._run_once()
if self._stopping:
break
|
来实现的。
_run_once()
python默认实现BaseEventLoop
的_run_once()
实现是这样的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
if self._debug and timeout != 0:
t0 = self.time()
event_list = self._selector.select(timeout)
dt = self.time() - t0
if dt >= 1.0:
level = logging.INFO
else:
level = logging.DEBUG
nevent = len(event_list)
if timeout is None:
logger.log(level, 'poll took %.3f ms: %s events',
dt * 1e3, nevent)
elif nevent:
logger.log(level,
'poll %.3f ms took %.3f ms: %s events',
timeout * 1e3, dt * 1e3, nevent)
elif dt >= 1.0:
logger.log(level,
'poll %.3f ms took %.3f ms: timeout',
timeout * 1e3, dt * 1e3)
else:
event_list = self._selector.select(timeout)
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
|
上边这段代码,可以说是asyncio的绝对核心了。
大致流程可以这样理解:
BaseEventLoop
通过_scheduled
来保存被调度的需要在未来某个时候需要运行的TimerHandle
数组,这些TimerHandle
往往是通过调用call_at(self, when, callback, *args)
方法加入队列;
BaseEventLoop
通过_ready
来保存已经准备就绪的事件,我们常用的run_until_complete
、ensure_future
等方法最终还是通过调用call_soon(self, callback, *args)
方法将Handle
实例加入队列,另外epoll获取到的就绪事件也是直接加入到_ready
数组中;
- 将
_scheduled
中被cancel的Handler移除,然后按照计划执行的时间排序;
- 调用self._selector.select(timeout)获取已就绪I/O事件,此处需要注意的是,如果当前
_scheduled
和_ready
中都空无一物,会将None传入此方法,对于epoll来说,就会无限期的在这里等待下去。
关于这个流程有几点解释下,可能会帮助理解,我也是反反复复看了几遍源码才摸索出来的:
coroutine
我们平常通过iscoroutine(obj)
来判断一个对象是不是coroutine
,这里的coroutine
其实是python底层帮我们封装的生成器,关于coroutine
的发展历程可以翻看之前的一篇记录asyncio的前世今生。
源码中coroutine其实是一个装饰器,相关的主要代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
def coroutine(func):
"""Decorator to mark coroutines.
...
if inspect.isgeneratorfunction(func):
coro = func
else:
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
if (base_futures.isfuture(res) or inspect.isgenerator(res) or
isinstance(res, CoroWrapper)):
res = yield from res
elif _AwaitableABC is not None:
# If 'func' returns an Awaitable (new in 3.5) we
# want to run it.
try:
await_meth = res.__await__
except AttributeError:
pass
else:
if isinstance(res, _AwaitableABC):
res = yield from await_meth()
return res
...
wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction().
return wrapper
|
可以看到,我们的coroutine最终在被执行的时候,还是通过调用res = yield from res
为协程的挂起、恢复执行做准备。
Future&Task

我们在使用asyncio的时候,会经常用到ensure_future(coro_or_future, *, loop=None)
、BaseEventLoop.create_task(self, coro)
之类的方法来将自己的协程包裹到一个Task对象中,然后加入到loop的调度和执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
class Task(futures.Future):
"""A coroutine wrapped in a Future."""
def __init__(self, coro, *, loop=None):
assert coroutines.iscoroutine(coro), repr(coro)
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
self._coro = coro
self._fut_waiter = None
self._must_cancel = False
self._loop.call_soon(self._step)
self.__class__._all_tasks.add(self)
def _step(self, exc=None):
....
coro = self._coro
self._fut_waiter = None
self.__class__._current_tasks[self._loop] = self
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
self.set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
except BaseException as exc:
self.set_exception(exc)
raise
else:
....
finally:
self.__class__._current_tasks.pop(self._loop)
self = None # Needed to break cycles when an exception occurs.
|