通过简单的代码,就可以一窥asyncio event loop的初始化流程:

1
2
3
4
5
import asyncio

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_forever()

上边简单的几行代码,牵涉到event loop的初始化以及运行的全部过程。

get_event_loop()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def get_event_loop():
    """Return an asyncio event loop.

    When called from a coroutine or a callback (e.g. scheduled with call_soon
    or similar API), this function will always return the running event loop.

    If there is no running event loop set, the function will return
    the result of `get_event_loop_policy().get_event_loop()` call.
    """
    current_loop = _get_running_loop()
    if current_loop is not None:
        return current_loop
    return get_event_loop_policy().get_event_loop()

可以看到,这个函数的流程也非常简单:

如果调用此函数,则返回当前(线程)正在运行中的event loop实例,如果返回为空,则会调用get_event_loop_policy().get_event_loop()创建并且返回一个新的event loop。

首次调用这个函数,_get_running_loop()肯定返回的是None,接着会调用get_event_loop_policy(),初始化全局变量_event_loop_policy,如果没有事先通过asyncio.set_event_loop_policy(policy)来指定event loop policy,在linux环境中会使用**DefaultEventLoopPolicy**。

1
2
3
4
5
6
def _init_event_loop_policy():
    global _event_loop_policy
    with _lock:
        if _event_loop_policy is None:  # pragma: no branch
            from . import DefaultEventLoopPolicy
            _event_loop_policy = DefaultEventLoopPolicy()

使用**DefaultEventLoopPolicy**每个线程可以拥有自己的event loop,默认情况下,只会给主线程自动创建一个event loop。其他的loop policy可能实现上有所不同。DefaultEventLoopPolicy通过一个TLS来确保每个线程拥有自己的独有loop。

DefaultEventLoopPolicy的默认实现是_UnixDefaultEventLoopPolicy,而_UnixDefaultEventLoopPolicy默认使用_UnixSelectorEventLoop作为自己的**_loop_factory**。

介绍完上述一些默认配置之后,我们来到了关键的一步get_event_loop()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

而这里的关键new_event_loop则用到了上边的默认配置对应的也就是我们的主角**_loop_factory**:

1
2
3
4
5
6
7
    def new_event_loop(self):
        """Create a new event loop.

        You must call set_event_loop() to make this the current event
        loop.
        """
        return self._loop_factory()

_UnixDefaultEeventLoopPolicy

从上图中可以看到_UnixDefaultEeventLoopPolicy的继承关系。

后续对_UnixDefaultEeventLoopPolicy的实例化过程,主要内容如下:

  1. 默认情况下,selector为None,通过selectors.DefaultSelector()来自动选择当前环境下性能最好的selector。在linux环境下,会选择EpollSelector

  2. 完成selector配置之后,通过I/O多路复用就会给当前的loop提供异步处理I/O的能力;

  3. 执行self._make_self_pipe()给当前loop增加处理信号的能力

    1
    2
    3
    4
    5
    6
    7
    
        def _make_self_pipe(self):
            # A self-socket, really. :-)
            self._ssock, self._csock = self._socketpair()
            self._ssock.setblocking(False)
            self._csock.setblocking(False)
            self._internal_fds += 1
            self._add_reader(self._ssock.fileno(), self._read_from_self)
    

    ``

    此处创建的self-socket对,会在主线程接收到信号以及在loop关闭掉的时候做出对应的处理:

    1
    2
    3
    4
    5
    6
    7
    
        def _process_self_data(self, data):
            print("got signa:{}".format(data))
            for signum in data:
                if not signum:
                    # ignore null bytes written by _write_to_self()
                    continue
                self._handle_signal(signum)
    

    ``

经过上边这一套操作,现在主线程的event loop就完成了初始化。