tornado.gen模块就是tornado中的“协程”实现。下面简单的看下,协程和线程的区别:
阅读本文之前,需要了解Python生成器和functools.wraps以及内核线程。
源代码在:http://www.tornadoweb.org/en/stable/_modules/tornado/gen.html。
gen模块有很多deprecate代码,比如YieldPoint的概念已经被Future取代了,因此本文中,不对关于YieldPoint的任何代码进行解析。下面是gen模块的关键代码:
# !!!可以通过raise StopIteration或gen.Return异常, # 来终止协程的执行,并返回结果!!! # 该函数用于从StopIteration或gen.Return异常中提取返回结果 # + 1,如果异常对象有value属性,则返回value属性的值 # + 2,否则,如果异常对象有args属性,并且args属性的值是元组, # + + 并且该元组至少有一个元素,则返回它的第一个元素 # + 3,否则,返回None def _value_from_stopiteration(e): try: # StopIteration has a value attribute beginning in py33. # So does our Return class. return e.value except AttributeError: pass try: # Cython backports coroutine functionality by putting the value in # e.args[0]. return e.args[0] except (AttributeError, IndexError): return None # 通过gen.coroutine装饰器来创建协程 def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True) # 核心代码1:创建协程 def _make_coroutine_wrapper(func, replace_callback): # On Python 3.5, set the coroutine flag on our generator, to allow it # to be used with 'await'. wrapped = func if hasattr(types, 'coroutine'): func = types.coroutine(func) # wrapper代表一个协程对象; # + 启动协程,会返回一个Future对象。 @functools.wraps(wrapped) def wrapper(*args, **kwargs): # 生成一个绑定到该协程的Future对象 future = TracebackFuture() if replace_callback and 'callback' in kwargs: # 当replace_callback为True,并且通过关键字参数callback指定了 # + 回调函数,那么当Future对象完成时,它会将回调函数 # + + lambda future: callback(future.result()) # + + 添加到当前线程的IOLoop上,在下一次事件循环时, # + + 就会执行这个回调函数 callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result())) try: # 调用被封装的函数 result = func(*args, **kwargs) except (Return, StopIteration) as e: # 如果抛出了gen.Return或StopIteration异常, # + 那么则从异常对象中提取执行结果,并将结果保存到Future对象, # + 协程执行结束。 result = _value_from_stopiteration(e) except Exception: # 如果出现其他异常, # + 那么,将异常信息保存到Future对象,协程执行结束。 future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, GeneratorType): # 如果被封装的函数是一个生成器函数, # + 那么调用它会返回一个生成器, # + 接下来,会启动生成器。 # + 之所以在这里启动生成器是因为很多生成器协程 # + + 并不会真正的yield,也就说,在执行过程中, # + + + 所经历的分支上,并没有yield语句,所以直接在这里进行迭代, # + + 以避免创建Runner对象,提升性能。 try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: # 在with StackContext语句块中,使用yield, # + 会引发问题,请点击这里 yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: # 如果抛出了gen.Return或StopIteration异常, # + 那么则从异常对象中提取执行结果,并将结果保存到Future对象, # + 协程执行结束。 future.set_result(_value_from_stopiteration(e)) except Exception: # 如果出现其他异常, # + 那么,将异常信息保存到Future对象,协程执行结束。 future.set_exc_info(sys.exc_info()) else: # 否则,使用 生成器、代表协程执行结果的Future对象、以及生成器第一次yield的值 # + 创建Runner对象 _futures_to_runners[future] = Runner(result, future, yielded) yielded = None try: return future finally: # Subtle memory optimization: if next() raised an exception, # the future's exc_info contains a traceback which # includes this stack frame. This creates a cycle, # which will be collected at the next full GC but has # been shown to greatly increase memory usage of # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. future = None future.set_result(result) return future # 将被封装的函数保存到协程对象的属性中 wrapper.__wrapped__ = wrapped # 给协程对象设置一个标记,拥有该标记的对象就是一个tornado协程对象 wrapper.__tornado_coroutine__ = True # 返回协程对象 return wrapper # 核心代码2:协程的运行过程 class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() ... # 1,将first_yielded转换成Future,并保存到self.future # 2,如果self.future已经完成,那么调用self.run # 3,如果self.future尚未完成,那么将self.future添加到IOLoop # + + + (把一个Future添加到IOLoop意味着,当Future完成时,会将设定的回调函数 # + + + + 添加到IOLoop,在下次事件循环时,就会执行这个回调函数) if self.handle_yield(first_yielded): gen = result_future = first_yielded = None self.run() def handle_yield(self, yielded): ... else: try: # 将协程yield的值转换成Future对象。 # + 并保存到self.future self.future = convert_yielded(yielded) except BadYieldError: # 如果转换失败,则将self.future指向一个新的Future对象, # 并将转换失败的异常信息,保存到这个Future对象 self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if not self.future.done() or self.future is moment: # 如果self.future没有完成,或者self.future是moment, # + 那么当self.future完成时,将self.run(间接地)添加到IOLoop。 # + 下次事件循环时,就会执行self.run。 # 并返回False def inner(f): # Break a reference cycle to speed GC. f = None # noqa self.run() self.io_loop.add_future( self.future, inner) return False # 如果self.future完成了,那么直接返回True return True # 需要额外补充的是: # 在协程中,不仅可以yield Future、直接一个yield(相当于yield moment), # + 还可以 res1, res2 = yield [Future1, Future2]、 # + res_dic = yield {key1: Future1, key2: Future2, ...}。 def run(self): # 如果Runner正在运行或者已经停止,则返回 if self.running or self.finished: return try: self.running = True while True: # 如果future尚未完成,则返回 future = self.future if not future.done(): return self.future = None try: orig_stack_contexts = stack_context._state.contexts exc_info = None # 获取future的结果 try: value = future.result() except Exception: self.had_exception = True exc_info = sys.exc_info() future = None # 如果future执行失败,则使用异常信息重启生成器 if exc_info is not None: try: yielded = self.gen.throw(*exc_info) finally: # Break up a reference to itself # for faster GC on CPython. exc_info = None # 如果future执行成功,则使用future的结果重启生成器 else: yielded = self.gen.send(value) if stack_context._state.contexts is not orig_stack_contexts: self.gen.throw( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: # 如果生成器执行过程中出现了gen.Return或StopIteration异常,则: # + 1,设置Runner的停止标记 # + 2,从异常对象中提取结果,并保存到代表协程执行结果的Future对象 self.finished = True self.future = _null_future ... self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None ... return except Exception: # 如果生成器执行过程中出现了其他异常,则: # + 1,设置Runner的停止标记 # + 2,将异常信息保存到代表协程执行结果的Future对象 self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None self._deactivate_stack_context() return # 如果生成器成功yield,那么, # + 1,将yield的值转换成Future对象,并保存到self.future # + 2,如果self.future已经完成,那么继续下一次循环 # + 3,如果self.future尚未完成,那么将self.future添加到IOLoop,并退出 if not self.handle_yield(yielded): return yielded = None finally: self.running = False def convert_yielded(yielded): # Lists and dicts containing YieldPoints were handled earlier. # 如果yielded是None,那么返回moment if yielded is None: return moment # 如果yielded是list类型的,那么要求每个元素都是Future对象; # 如果yielded是dict类型的,那么要求每个value都是Future对象; # + 返回一个新的Future对象,当yielded中所有的Future对象,都完成时, # + + 这个Future对象才完成。当yielded中的某些Future对象,出现异常时, # + + 会将第一个异常信息,保存到这个Future对象中,其他的打印出来 elif isinstance(yielded, (list, dict)): return multi(yielded) # 如果yielded是Future类型的,那么直接返回 elif is_future(yielded): return yielded ... else: # 否则,引发BadYieledError异常 raise BadYieldError("yielded unknown object %r" % (yielded,)) def multi(children, quiet_exceptions=()): ... return multi_future(children, quiet_exceptions=quiet_exceptions) def multi_future(children, quiet_exceptions=()): if isinstance(children, dict): keys = list(children.keys()) children = children.values() else: keys = None children = list(map(convert_yielded, children)) assert all(is_future(i) for i in children) unfinished_children = set(children) future = Future() if not children: future.set_result({} if keys is not None else []) def callback(f): unfinished_children.remove(f) if not unfinished_children: result_list = [] for f in children: try: result_list.append(f.result()) except Exception as e: if future.done(): if not isinstance(e, quiet_exceptions): app_log.error("Multiple exceptions in yield list", exc_info=True) else: future.set_exc_info(sys.exc_info()) if not future.done(): if keys is not None: future.set_result(dict(zip(keys, result_list))) else: future.set_result(result_list) listening = set() for f in children: if f not in listening: listening.add(f) f.add_done_callback(callback) return future class Return(Exception): def __init__(self, value=None): super(Return, self).__init__() self.value = value # Cython recognizes subclasses of StopIteration with a .args tuple. self.args = (value,)
总结一下,就是:gen.coroutine修饰的函数,就是一个“协程”,调用(或者叫启动)一个协程,会返回一个Future对象。在协程中,可以通过抛出StopIteration或gen.Return异常,来终止协程的执行,并返回结果。
通常情况下,gen.coroutine修饰的函数都是生成函数,每次调用启动生成器,都应该:
[root@iZj6chejzrsqpclb7miryaZ ~]# python t.py put seq: 1 put seq: 2 get seq: 1 put seq: 3 put seq: 4 put seq: 5 get seq: 2 put seq: 6 put seq: 7 put seq: 8 get seq: 3 put seq: 9 put seq: 10 put seq: 11 get seq: 4 put seq: 12 put seq: 13 put seq: 14 get seq: 5 put seq: 15 storage is already full. storage is already full. get seq: 6 put seq: 16 storage is already full. storage is already full. get seq: 7 put seq: 17 storage is already full. storage is already full. get seq: 8 put seq: 18 storage is already full. storage is already full. get seq: 9 put seq: 19 storage is already full. storage is already full. get seq: 10 put seq: 20 shutdown. [root@iZj6chejzrsqpclb7miryaZ ~]# cat t.py # coding: utf8 from tornado.ioloop import IOLoop from tornado.gen import coroutine from tornado.concurrent import Future class Storage(object): def __init__(self, max_goods): self._max_goods = max_goods self._goods_number = 0 self._put_seq = 0 self._get_seq = 0 def full(self): ret = self._max_goods == self._goods_number if ret: print "storage is already full." return ret def empty(self): ret = not self._goods_number if ret: print "storage has no enough goods." return ret def put(self): if self.full(): return self._put_seq += 1 print "put seq: %s" % self._put_seq self._goods_number += 1 def get(self): if self.empty(): return self._get_seq += 1 print "get seq: %s" % self._get_seq self._goods_number -= 1 storage = Storage(10) def my_sleep(n): future = Future() def _inner(f): f.set_result(None) ioloop = IOLoop.current() ioloop.call_at(ioloop.time() + n, _inner, future) return future @coroutine def produer(): produce_count = 0 while produce_count < 20: yield my_sleep(1) if not storage.full(): storage.put() produce_count += 1 @coroutine def consumer(): consume_count = 0 while consume_count < 10: yield my_sleep(3) if not storage.empty(): storage.get() consume_count += 1 class Shutdown: def __init__(self, total_count, ioloop=None): self._total_count = total_count self._current_count = 0 self._ioloop = ioloop or IOLoop.current() def __call__(self, f): f = None self._current_count += 1 if self._current_count == self._total_count: print "shutdown." self._ioloop.stop() shutdown = Shutdown(2) def main(): producer_future = produer() consumer_future = consumer() IOLoop.current().add_future(producer_future, shutdown) IOLoop.current().add_future(consumer_future, shutdown) IOLoop.current().start() if __name__ == "__main__": main()