目录


概述[TOC]

tornado.gen模块就是tornado中的“协程”实现。下面简单的看下,协程和线程的区别:

阅读本文之前,需要了解Python生成器functools.wraps


torado.gen模块[TOC]

源代码在:http://www.tornadoweb.org/en/stable/_modules/tornado/gen.html
gen模块有很多deprecate代码,比如YieldPoint的概念已经被Future取代了,因此本文中,不对关于YieldPoint的任何代码进行解析。下面是gen模块的关键代码:

# !!!可以通过raise StopIteration或gen.Return异常,
# 来终止协程的执行,并返回结果!!!

# 该函数用于从StopIteration或gen.Return异常中提取返回结果
# + 1,如果异常对象有value属性,则返回value属性的值
# + 2,否则,如果异常对象有args属性,并且args属性的值是元组,
# + + 并且该元组至少有一个元素,则返回它的第一个元素
# + 3,否则,返回None
def _value_from_stopiteration(e):
    try:
        # StopIteration has a value attribute beginning in py33.
        # So does our Return class.
        return e.value
    except AttributeError:
        pass
    try:
        # Cython backports coroutine functionality by putting the value in
        # e.args[0].
        return e.args[0]
    except (AttributeError, IndexError):
        return None

# 通过gen.coroutine装饰器来创建协程
def coroutine(func, replace_callback=True):
    return _make_coroutine_wrapper(func, replace_callback=True)

# 核心代码1:创建协程
def _make_coroutine_wrapper(func, replace_callback):
    # On Python 3.5, set the coroutine flag on our generator, to allow it
    # to be used with 'await'.
    wrapped = func
    if hasattr(types, 'coroutine'):
        func = types.coroutine(func)

    # wrapper代表一个协程对象;
    # + 启动协程,会返回一个Future对象。
    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        # 生成一个绑定到该协程的Future对象
        future = TracebackFuture()

        if replace_callback and 'callback' in kwargs:
            # 当replace_callback为True,并且通过关键字参数callback指定了
            # + 回调函数,那么当Future对象完成时,它会将回调函数
            # + + lambda future: callback(future.result())
            # + + 添加到当前线程的IOLoop上,在下一次事件循环时,
            # + + 就会执行这个回调函数
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        try:
            # 调用被封装的函数
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            # 如果抛出了gen.Return或StopIteration异常,
            # + 那么则从异常对象中提取执行结果,并将结果保存到Future对象,
            # + 协程执行结束。
            result = _value_from_stopiteration(e)
        except Exception:
            # 如果出现其他异常,
            # + 那么,将异常信息保存到Future对象,协程执行结束。
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, GeneratorType):
                # 如果被封装的函数是一个生成器函数,
                # + 那么调用它会返回一个生成器,
                # + 接下来,会启动生成器。
                # + 之所以在这里启动生成器是因为很多生成器协程
                # + + 并不会真正的yield,也就说,在执行过程中,
                # + + + 所经历的分支上,并没有yield语句,所以直接在这里进行迭代,
                # + + 以避免创建Runner对象,提升性能。
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        # 在with StackContext语句块中,使用yield,
                        # + 会引发问题,请点击这里
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    # 如果抛出了gen.Return或StopIteration异常,
                    # + 那么则从异常对象中提取执行结果,并将结果保存到Future对象,
                    # + 协程执行结束。
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    # 如果出现其他异常,
                    # + 那么,将异常信息保存到Future对象,协程执行结束。
                    future.set_exc_info(sys.exc_info())
                else:
                    # 否则,使用 生成器、代表协程执行结果的Future对象、以及生成器第一次yield的值 
                    # + 创建Runner对象
                    _futures_to_runners[future] = Runner(result, future, yielded)
                yielded = None

                try:
                    return future
                finally:
                    # Subtle memory optimization: if next() raised an exception,
                    # the future's exc_info contains a traceback which
                    # includes this stack frame.  This creates a cycle,
                    # which will be collected at the next full GC but has
                    # been shown to greatly increase memory usage of
                    # benchmarks (relative to the refcount-based scheme
                    # used in the absence of cycles).  We can avoid the
                    # cycle by clearing the local variable after we return it.
                    future = None
        future.set_result(result)
        return future

    # 将被封装的函数保存到协程对象的属性中
    wrapper.__wrapped__ = wrapped
    # 给协程对象设置一个标记,拥有该标记的对象就是一个tornado协程对象
    wrapper.__tornado_coroutine__ = True
    # 返回协程对象
    return wrapper

# 核心代码2:协程的运行过程
class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        ...

        # 1,将first_yielded转换成Future,并保存到self.future
        # 2,如果self.future已经完成,那么调用self.run
        # 3,如果self.future尚未完成,那么将self.future添加到IOLoop
        # + + + (把一个Future添加到IOLoop意味着,当Future完成时,会将设定的回调函数
        # + + + + 添加到IOLoop,在下次事件循环时,就会执行这个回调函数)
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None
            self.run()

    def handle_yield(self, yielded):
        ...
        else:
            try:
                # 将协程yield的值转换成Future对象。
                # + 并保存到self.future
                self.future = convert_yielded(yielded)
            except BadYieldError:
                # 如果转换失败,则将self.future指向一个新的Future对象,
                # 并将转换失败的异常信息,保存到这个Future对象
                self.future = TracebackFuture()
                self.future.set_exc_info(sys.exc_info())

        if not self.future.done() or self.future is moment:
            # 如果self.future没有完成,或者self.future是moment,
            # + 那么当self.future完成时,将self.run(间接地)添加到IOLoop。
            # + 下次事件循环时,就会执行self.run。
            # 并返回False
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None # noqa
                self.run()
            self.io_loop.add_future(
                self.future, inner)
            return False
        # 如果self.future完成了,那么直接返回True
        return True

# 需要额外补充的是:
# 在协程中,不仅可以yield Future、直接一个yield(相当于yield moment),
# + 还可以 res1, res2 = yield [Future1, Future2]、
# + res_dic = yield {key1: Future1, key2: Future2, ...}。

    def run(self):
        # 如果Runner正在运行或者已经停止,则返回
        if self.running or self.finished:
            return

        try:
            self.running = True

            while True:
                # 如果future尚未完成,则返回                
                future = self.future
                if not future.done():
                    return
                self.future = None

                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    # 获取future的结果
                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
                    future = None

                    # 如果future执行失败,则使用异常信息重启生成器
                    if exc_info is not None:
                        try:
                            yielded = self.gen.throw(*exc_info)
                        finally:
                            # Break up a reference to itself
                            # for faster GC on CPython.
                            exc_info = None
                    # 如果future执行成功,则使用future的结果重启生成器
                    else:
                        yielded = self.gen.send(value)

                    if stack_context._state.contexts is not orig_stack_contexts:
                        self.gen.throw(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    # 如果生成器执行过程中出现了gen.Return或StopIteration异常,则:
                    # + 1,设置Runner的停止标记
                    # + 2,从异常对象中提取结果,并保存到代表协程执行结果的Future对象
                    self.finished = True
                    self.future = _null_future
                    ...
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    ...
                    return
                except Exception:
                    # 如果生成器执行过程中出现了其他异常,则:
                    # + 1,设置Runner的停止标记
                    # + 2,将异常信息保存到代表协程执行结果的Future对象
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_exc_info(sys.exc_info())
                    self.result_future = None
                    self._deactivate_stack_context()
                    return

                # 如果生成器成功yield,那么,
                # + 1,将yield的值转换成Future对象,并保存到self.future
                # + 2,如果self.future已经完成,那么继续下一次循环
                # + 3,如果self.future尚未完成,那么将self.future添加到IOLoop,并退出
                if not self.handle_yield(yielded):
                    return
                yielded = None
        finally:
            self.running = False

def convert_yielded(yielded):
    # Lists and dicts containing YieldPoints were handled earlier.

    # 如果yielded是None,那么返回moment
    if yielded is None:
        return moment

    # 如果yielded是list类型的,那么要求每个元素都是Future对象;
    # 如果yielded是dict类型的,那么要求每个value都是Future对象;
    # + 返回一个新的Future对象,当yielded中所有的Future对象,都完成时,
    # + + 这个Future对象才完成。当yielded中的某些Future对象,出现异常时,
    # + + 会将第一个异常信息,保存到这个Future对象中,其他的打印出来
    elif isinstance(yielded, (list, dict)):
        return multi(yielded)

    # 如果yielded是Future类型的,那么直接返回
    elif is_future(yielded):
        return yielded
    ...
    else:
        # 否则,引发BadYieledError异常
        raise BadYieldError("yielded unknown object %r" % (yielded,))

def multi(children, quiet_exceptions=()):
    ...
    return multi_future(children, quiet_exceptions=quiet_exceptions)

def multi_future(children, quiet_exceptions=()):
    if isinstance(children, dict):
        keys = list(children.keys())
        children = children.values()
    else:
        keys = None
    children = list(map(convert_yielded, children))
    assert all(is_future(i) for i in children)
    unfinished_children = set(children)

    future = Future()
    if not children:
        future.set_result({} if keys is not None else [])

    def callback(f):
        unfinished_children.remove(f)
        if not unfinished_children:
            result_list = []
            for f in children:
                try:
                    result_list.append(f.result())
                except Exception as e:
                    if future.done():
                        if not isinstance(e, quiet_exceptions):
                            app_log.error("Multiple exceptions in yield list",
                                          exc_info=True)
                    else:
                        future.set_exc_info(sys.exc_info())
            if not future.done():
                if keys is not None:
                    future.set_result(dict(zip(keys, result_list)))
                else:
                    future.set_result(result_list)

    listening = set()
    for f in children:
        if f not in listening:
            listening.add(f)
            f.add_done_callback(callback)
    return future

class Return(Exception):
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
        # Cython recognizes subclasses of StopIteration with a .args tuple.
        self.args = (value,)

总结一下,就是:gen.coroutine修饰的函数,就是一个“协程”,调用(或者叫启动)一个协程,会返回一个Future对象。在协程中,可以通过抛出StopIteration或gen.Return异常,来终止协程的执行,并返回结果。
通常情况下,gen.coroutine修饰的函数都是生成函数,每次调用启动生成器,都应该:


使用IOLoop和协程的例子-生产者消费者[TOC]

[root@iZj6chejzrsqpclb7miryaZ ~]# python t.py 
put seq: 1
put seq: 2
get seq: 1
put seq: 3
put seq: 4
put seq: 5
get seq: 2
put seq: 6
put seq: 7
put seq: 8
get seq: 3
put seq: 9
put seq: 10
put seq: 11
get seq: 4
put seq: 12
put seq: 13
put seq: 14
get seq: 5
put seq: 15
storage is already full.
storage is already full.
get seq: 6
put seq: 16
storage is already full.
storage is already full.
get seq: 7
put seq: 17
storage is already full.
storage is already full.
get seq: 8
put seq: 18
storage is already full.
storage is already full.
get seq: 9
put seq: 19
storage is already full.
storage is already full.
get seq: 10
put seq: 20
shutdown.
[root@iZj6chejzrsqpclb7miryaZ ~]# cat t.py 
# coding: utf8

from tornado.ioloop import IOLoop
from tornado.gen import coroutine
from tornado.concurrent import Future

class Storage(object):
    def __init__(self, max_goods):
        self._max_goods = max_goods
        self._goods_number = 0
        self._put_seq = 0
        self._get_seq = 0

    def full(self):
        ret = self._max_goods == self._goods_number
        if ret:
            print "storage is already full."
        return ret

    def empty(self):
        ret = not self._goods_number
        if ret:
            print "storage has no enough goods."
        return ret

    def put(self):
        if self.full():
            return

        self._put_seq += 1
        print "put seq: %s" % self._put_seq
        self._goods_number += 1

    def get(self):
        if self.empty():
            return

        self._get_seq += 1
        print "get seq: %s" % self._get_seq
        self._goods_number -= 1

storage = Storage(10)

def my_sleep(n):
    future = Future()
    def _inner(f):
        f.set_result(None)
    ioloop = IOLoop.current()
    ioloop.call_at(ioloop.time() + n, _inner, future)
    return future

@coroutine
def produer():
    produce_count = 0
    while produce_count < 20:
        yield my_sleep(1)
        if not storage.full():
            storage.put()
            produce_count += 1

@coroutine
def consumer():
    consume_count = 0
    while consume_count < 10:
        yield my_sleep(3)
        if not storage.empty():
            storage.get()
            consume_count += 1

class Shutdown:
    def __init__(self, total_count, ioloop=None):
        self._total_count = total_count
        self._current_count = 0
        self._ioloop = ioloop or IOLoop.current()

    def __call__(self, f):
        f = None
        self._current_count += 1
        if self._current_count == self._total_count:
            print "shutdown."
            self._ioloop.stop()
shutdown = Shutdown(2)

def main():
    producer_future = produer()
    consumer_future = consumer()
    IOLoop.current().add_future(producer_future, shutdown)
    IOLoop.current().add_future(consumer_future, shutdown)
    IOLoop.current().start()

if __name__ == "__main__":
    main()