concurrent.futures源码解读

昨天抽空读了concurrent.futures的源码,代码不算长,但觉得还是有必要整理记录下,毕竟读源码是最好的学习方式之一。

相信用concurrent.futures处理并发任务的小伙伴们经常会写出类似下面这样的代码:

executor = ThreadPoolExecutor(5)
futures = [executor.submit(fn, i) for i in range(5)]
for future in futures:
    print(future.result())

首先来看ThreadPoolExecutor类,其内部维护了一个工作队列_work_queue,和一个多线程集合_threads,当我们调用submit方法提交一个目标函数fn的时候,线程池内部创建一个future对象,再把fn和future对象封装进一个_WorkItem类的实例,可以称它为工作项,然后把这个工作项放入队列中。Future类和_WorkItem类是后面要提的重点。

def submit(self, fn, *args, **kwargs):
    with self._shutdown_lock:
        if self._shutdown:
            raise RuntimeError('cannot schedule new futures after shutdown')

        f = _base.Future()
        w = _WorkItem(f, fn, args, kwargs)

        self._work_queue.put(w)
        self._adjust_thread_count()
        return f

再看_adjust_thread_count方法,它会先做一个判断,如果线程数未满(说成线程池中的线程未被全部激活可能更合适),创建一个新线程,加入多线程集合。线程的目标函数是_worker,这个函数很简单,但也很重要,还传了两个参数,一个是线程池实例本身的弱引用(这里用弱引用是为了防止线程池被shutdown的时候因为相互引用而无法被回收),另一个是工作队列。

def _adjust_thread_count(self):
    # When the executor gets lost, the weakref callback will wake up
    # the worker threads.
    def weakref_cb(_, q=self._work_queue):
        q.put(None)
    # TODO(bquinlan): Should avoid creating new threads if there are more
    # idle threads than items in the work queue.
    num_threads = len(self._threads)
    if num_threads < self._max_workers:
        thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                 num_threads)
        t = threading.Thread(name=thread_name, target=_worker,
                             args=(weakref.ref(self, weakref_cb),
                                   self._work_queue))
        t.daemon = True
        t.start()
        self._threads.add(t)
        _threads_queues[t] = self._work_queue

而如果线程已经全部激活了,则这里不做任何处理,而是由已经存在的线程主动从工作队列中获取工作项。于是我们来看看这个_worker:

def _worker(executor_reference, work_queue):
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            ...

这个函数在某个线程中执行之后,就会不断地试图从工作队列中获取工作项,并run,队列为空则堵塞。所以这个线程永远不会停止,除非出现异常,或者线程池被shutdown。到这里,我们真正要加入线程的任务仍然未被执行,但不难猜到,work_item.run()做的就是这件事。工作项work_item由_WorkItem类实例化:

class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)

每个工作项绑定了一个future对象和我们提交的函数fn。run方法会调用fn函数,并返回结果,然后把结果给future对象。到这里为止,我们在线程池上调用submit方法之后,其背后所做的事就差不多要结束了。接下来的重点就是介绍这个future。

在《流畅的Python》这本书中,作者把future称做期物,并解释:“期物封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果(或异常)”。而concurrent.futures中的Future类其实并不接触具体的操作,但它保存了操作的状态和结果(或异常)。

class Future(object):
    ...

    def result(self, timeout=None):
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()
            else:
                raise TimeoutError()

    def set_result(self, result):
        with self._condition:
            self._result = result
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_result(self)
            self._condition.notify_all()
        self._invoke_callbacks()

下面我概述了Future类的一些关键点和主要方法:

  • future有五种状态,实际上映射了工作项的状态:PENDING(工作项在队列中等待空闲线程),RUNNING(工作项正在执行中),CANCELLED(工作项在等待中被取消,但仍在队列里),CANCELLED_AND_NOTIFIED(工作项取消后,被线程从队列里扔掉),FINISHED(工作项已完成)
  • def cancel(self) —— 这个方法会将future的PENDING状态置为CANCELLED(一般发生在线程池饱和的时候,用户主动取消等待中的工作项)
  • def add_done_callback(self, fn) —— 给future增加回调函数,在工作项完成或被取消后会立刻通过_invoke_callbacks方法调用回调函数
  • def result(self, timeout=None) —— 主线程调用该方法获取future中保存的工作项执行结果,如果该工作项未完成,则通过线程的条件变量condition的wait方法将主线程堵塞,直到子线程中该工作项完成,并调用future对象的set_result方法,该方法会通过notify_all唤醒堵塞中的线程。

除此之外,concurrent.futures还有两个辅助函数:as_completed和wait。以as_completed为例,它其实是一个生成器,其主要意义在于利用生成器的特性把已经完成的future逐个产出,也就是说不会按照futures原本的顺序进行迭代了,而是早完成的早产出。可以参考下图的运行例子:

在前面介绍Future类的时候,还有一个_waiters属性没有提到,这个属性只有在用到as_completed或wait的时候才会起作用,接下来以as_completed生成器为例进行分析。

def as_completed(fs, timeout=None):
    ...
    with _AcquireFutures(fs):
        finished = set(
                f for f in fs
                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
        pending = fs - finished
        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)    # 创建一个waiter对象,并添加到futures的每个future的_waiters属性中
    finished = list(finished)
    try:
        yield from _yield_finished_futures(finished, waiter, ref_collect=(fs,))    # 逐个产出启动生成器前就已经完成的future

        while pending:
            ...
            waiter.event.wait(wait_timeout)    # 堵塞,直到event对象的标志位置为True

            with waiter.lock:
                finished = waiter.finished_futures
                waiter.finished_futures = []
                waiter.event.clear()

            # reverse to keep finishing order
            finished.reverse()
            yield from _yield_finished_futures(finished, waiter, ref_collect=(fs, pending))    # 逐个产出waiter对象的finished_futures属性中的future

    finally:
        # Remove waiter from unfinished futures
        for f in fs:
            with f._condition:
                f._waiters.remove(waiter)

waiter对象有两个作用:当有future完成时,一是给waiter的finished_futures属性append该future,二是将线程event对象的标志位置为True,使得as_completed生成器不再因event.wait堵塞,继续向下执行,并通过yield from连接子生成器,逐个产出waiter的finished_futures属性中的future。

至此,concurrent.futures的线程池就解读完了,至于异常的处理,shutdown线程池等细节本文就不一一详述了。另外这个包到处都是线程同步,不熟悉的小伙伴们可以先去了解一下Lock、RLock、Condition、Event、Semaphore这些对象。

0

评论