Asyncio源码解读

这两天简单读了下Asyncio的源码。Asyncio是Python的标准库,用于实现异步编程,我所读的是Python3.6版本下的Asyncio源码,如果有错误的理解希望有人能够指出。

Asyncio虽然被官方标榜为异步IO(Asynchronous I/O),但实际上是IO多路复用模型模拟异步IO的方式,首先了解下五种IO模型。

对于一次IO操作(以read为例),一般包含两个步骤:

  1. 等待数据准备好(waiting for data)。对于一个套接口上的操作,这一步骤关系到数据从网络到达,并将其复制到内核的某个缓冲区。
  2. 将数据从内核缓冲区复制到进程缓冲区(copying data from kernel to user)。

上图对五种IO模型做了比较,还蛮直观的,具体描述可以参考以下链接:

着重看图中的第三种模型:IO多路复用,似乎全程都处于堵塞状态,那么为什么Asyncio会被声称异步非堵塞呢?起初我也有这样的疑问,以下是我的两点猜想:

  1. IO多路复用的优势就在于可以同时等待多个描述符就绪,也就是说可以同时处理多个socket,那么对于这些socket来说,确实是非堵塞的,真正导致堵塞的是select/poll。
  2. 本来select/poll操作会在数据到达内核之后返回,然后进程主动调用recvfrom将数据复制到进程缓冲区,而Asyncio模拟了异步IO,等到整个IO操作(包括数据从内核复制到进程缓冲区)完成之后才返回。

这里顺便提一下堵塞和非堵塞,同步和异步。

同步和异步的区别在于是调用者主动等待调用的结果,还是由被调用者通过状态、信号来通知调用者,或通过回调函数来处理结果。侧重于获取结果的方式。

堵塞和非堵塞的区别在于如果IO操作未完成,进程是否能进行其他操作。侧重于进程的状态。

然后了解一下Asyncio中的几个概念:

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
  • coroutine:中文翻译叫协程,在Python中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用async关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。另外在协程中会使用await挂起阻塞方法的执行。
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别,是task的父类。

给个示例:

import asyncio

async def func(t):
    await asyncio.sleep(t)
    return t

loop = asyncio.get_event_loop()
task1 = loop.create_task(func(1))
task2 = loop.create_task(func(2))
task3 = loop.create_task(func(3))
loop.run_forever()

func是一个协程,create_task()方法将协程封装成一个task:

task = tasks.Task(coro, loop=self)

Task类的部分源码如下:

class Task(futures.Future):
    ...
    def __init__(self, coro, *, loop=None):
        assert coroutines.iscoroutine(coro), repr(coro)
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        self._coro = coro
        self._fut_waiter = None
        self._must_cancel = False
        self._loop.call_soon(self._step)
        self.__class__._all_tasks.add(self)

    def _step(self, exc=None):
        assert not self.done(), \
            '_step(): already done: {!r}, {!r}'.format(self, exc)
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        coro = self._coro
        self._fut_waiter = None

        self.__class__._current_tasks[self._loop] = self
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                self.set_exception(futures.CancelledError())
            else:
                self.set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            self.set_exception(exc)
        except BaseException as exc:
            self.set_exception(exc)
            raise
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if result._loop is not self._loop:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'Task {!r} got Future {!r} attached to a '
                            'different loop'.format(self, result)))
                elif blocking:
                    if result is self:
                        self._loop.call_soon(
                            self._step,
                            RuntimeError(
                                'Task cannot await on itself: {!r}'.format(
                                    self)))
                    else:
                        result._asyncio_future_blocking = False
                        result.add_done_callback(self._wakeup)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'yield was used instead of yield from '
                            'in task {!r} with {!r}'.format(self, result)))
            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self._step)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'yield was used instead of yield from for '
                        'generator in task {!r} with {}'.format(
                            self, result)))
            else:
                # Yielding something else is an error.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'Task got bad yield: {!r}'.format(result)))
        finally:
            self.__class__._current_tasks.pop(self._loop)
            self = None  # Needed to break cycles when an exception occurs.

在__init__()方法中,调用了loop对象的call_soon()方法,传入_step()方法,相当于将协程注册到事件循环中。_step()方法主要是调用协程的send()方法,去运行协程,并且在协程挂起之后,再次调用loop对象的call_soon()方法,准备下一步能够继续运行该协程。

那么现在来看看BaseEventLoop类:

class BaseEventLoop(events.AbstractEventLoop):
    ...
    def run_forever(self):
        """Run until stop() is called."""
        self._check_closed()
        if self.is_running():
            raise RuntimeError('This event loop is already running')
        if events._get_running_loop() is not None:
            raise RuntimeError(
                'Cannot run the event loop while another loop is running')
        self._set_coroutine_wrapper(self._debug)
        self._thread_id = threading.get_ident()
        if self._asyncgens is not None:
            old_agen_hooks = sys.get_asyncgen_hooks()
            sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                                   finalizer=self._asyncgen_finalizer_hook)
        try:
            events._set_running_loop(self)
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._stopping = False
            self._thread_id = None
            events._set_running_loop(None)
            self._set_coroutine_wrapper(False)
            if self._asyncgens is not None:
                sys.set_asyncgen_hooks(*old_agen_hooks)

    def call_soon(self, callback, *args):
        """Arrange for a callback to be called as soon as possible.

        This operates as a FIFO queue: callbacks are called in the
        order in which they are registered.  Each callback will be
        called exactly once.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle

    def _call_soon(self, callback, args):
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle

该类中有两个属性得提一下,_scheduled属性是一个列表,存放了事件循环中计划执行的任务;_ready是一个队列,存放了事件循环中马上执行的任务。

在call_soon()方法中调用_call_soon()方法,实例化了一个handle对象,并且加入到_ready队列中。_scheduled和_ready实际上存放的都是handle对象,handle对象可以说是对task对象的进一步封装,传入的是前面提到的_step()方法。

创建好任务之后,自然需要启动事件循环。调用run_forever()方法,在该方法中,循环调用了_run_once()方法,可见_run_once()方法是协程调度的关键,源码如下:

class BaseEventLoop(events.AbstractEventLoop):
    ...
    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """

        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)  # 去除_scheduled列表中被取消的任务
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = max(0, when - self.time())

        if self._debug and timeout != 0:
            t0 = self.time()
            event_list = self._selector.select(timeout)
            dt = self.time() - t0
            if dt >= 1.0:
                level = logging.INFO
            else:
                level = logging.DEBUG
            nevent = len(event_list)
            if timeout is None:
                logger.log(level, 'poll took %.3f ms: %s events',
                           dt * 1e3, nevent)
            elif nevent:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: %s events',
                           timeout * 1e3, dt * 1e3, nevent)
            elif dt >= 1.0:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: timeout',
                           timeout * 1e3, dt * 1e3)
        else:
            event_list = self._selector.select(timeout)  # 调用select/poll/epoll,一旦有IO操作完成,则返回,反之则堵塞
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)  # 将_scheduled列表中即将要开始的任务,移到_ready队列

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()  # 逐个执行_ready队列中的任务
        handle = None  # Needed to break cycles when an exception occurs.

几个关键处已经加了注释,最后看看handle对象的_run()方法:

class Handle:
    def _run(self):
        try:
            self._callback(*self._args)
        except Exception as exc:
            cb = _format_callback_source(self._callback, self._args)
            msg = 'Exception in callback {}'.format(cb)
            context = {
                'message': msg,
                'exception': exc,
                'handle': self,
            }
            if self._source_traceback:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)
        self = None  # Needed to break cycles when an exception occurs.

这里的self._callback()方法就是task对象的_step()方法,会调用一次协程的send()方法。如果当前协程遇到await就挂起,运行下一个协程;如果协程已经跑完了,抛出StopIteration异常,会调用set_result()方法,保存结果。

点赞 分享

发表评论

共有 0 条评论