tornado.gen模块就是tornado中的“协程”实现。下面简单的看下,协程和线程的区别:
阅读本文之前,需要了解Python生成器和functools.wraps以及内核线程。
源代码在:http://www.tornadoweb.org/en/stable/_modules/tornado/gen.html。
gen模块有很多deprecate代码,比如YieldPoint的概念已经被Future取代了,因此本文中,不对关于YieldPoint的任何代码进行解析。下面是gen模块的关键代码:
# !!!可以通过raise StopIteration或gen.Return异常,
# 来终止协程的执行,并返回结果!!!
# 该函数用于从StopIteration或gen.Return异常中提取返回结果
# + 1,如果异常对象有value属性,则返回value属性的值
# + 2,否则,如果异常对象有args属性,并且args属性的值是元组,
# + + 并且该元组至少有一个元素,则返回它的第一个元素
# + 3,否则,返回None
def _value_from_stopiteration(e):
try:
# StopIteration has a value attribute beginning in py33.
# So does our Return class.
return e.value
except AttributeError:
pass
try:
# Cython backports coroutine functionality by putting the value in
# e.args[0].
return e.args[0]
except (AttributeError, IndexError):
return None
# 通过gen.coroutine装饰器来创建协程
def coroutine(func, replace_callback=True):
return _make_coroutine_wrapper(func, replace_callback=True)
# 核心代码1:创建协程
def _make_coroutine_wrapper(func, replace_callback):
# On Python 3.5, set the coroutine flag on our generator, to allow it
# to be used with 'await'.
wrapped = func
if hasattr(types, 'coroutine'):
func = types.coroutine(func)
# wrapper代表一个协程对象;
# + 启动协程,会返回一个Future对象。
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
# 生成一个绑定到该协程的Future对象
future = TracebackFuture()
if replace_callback and 'callback' in kwargs:
# 当replace_callback为True,并且通过关键字参数callback指定了
# + 回调函数,那么当Future对象完成时,它会将回调函数
# + + lambda future: callback(future.result())
# + + 添加到当前线程的IOLoop上,在下一次事件循环时,
# + + 就会执行这个回调函数
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
try:
# 调用被封装的函数
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
# 如果抛出了gen.Return或StopIteration异常,
# + 那么则从异常对象中提取执行结果,并将结果保存到Future对象,
# + 协程执行结束。
result = _value_from_stopiteration(e)
except Exception:
# 如果出现其他异常,
# + 那么,将异常信息保存到Future对象,协程执行结束。
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, GeneratorType):
# 如果被封装的函数是一个生成器函数,
# + 那么调用它会返回一个生成器,
# + 接下来,会启动生成器。
# + 之所以在这里启动生成器是因为很多生成器协程
# + + 并不会真正的yield,也就说,在执行过程中,
# + + + 所经历的分支上,并没有yield语句,所以直接在这里进行迭代,
# + + 以避免创建Runner对象,提升性能。
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
# 在with StackContext语句块中,使用yield,
# + 会引发问题,请点击这里
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
# 如果抛出了gen.Return或StopIteration异常,
# + 那么则从异常对象中提取执行结果,并将结果保存到Future对象,
# + 协程执行结束。
future.set_result(_value_from_stopiteration(e))
except Exception:
# 如果出现其他异常,
# + 那么,将异常信息保存到Future对象,协程执行结束。
future.set_exc_info(sys.exc_info())
else:
# 否则,使用 生成器、代表协程执行结果的Future对象、以及生成器第一次yield的值
# + 创建Runner对象
_futures_to_runners[future] = Runner(result, future, yielded)
yielded = None
try:
return future
finally:
# Subtle memory optimization: if next() raised an exception,
# the future's exc_info contains a traceback which
# includes this stack frame. This creates a cycle,
# which will be collected at the next full GC but has
# been shown to greatly increase memory usage of
# benchmarks (relative to the refcount-based scheme
# used in the absence of cycles). We can avoid the
# cycle by clearing the local variable after we return it.
future = None
future.set_result(result)
return future
# 将被封装的函数保存到协程对象的属性中
wrapper.__wrapped__ = wrapped
# 给协程对象设置一个标记,拥有该标记的对象就是一个tornado协程对象
wrapper.__tornado_coroutine__ = True
# 返回协程对象
return wrapper
# 核心代码2:协程的运行过程
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
...
# 1,将first_yielded转换成Future,并保存到self.future
# 2,如果self.future已经完成,那么调用self.run
# 3,如果self.future尚未完成,那么将self.future添加到IOLoop
# + + + (把一个Future添加到IOLoop意味着,当Future完成时,会将设定的回调函数
# + + + + 添加到IOLoop,在下次事件循环时,就会执行这个回调函数)
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def handle_yield(self, yielded):
...
else:
try:
# 将协程yield的值转换成Future对象。
# + 并保存到self.future
self.future = convert_yielded(yielded)
except BadYieldError:
# 如果转换失败,则将self.future指向一个新的Future对象,
# 并将转换失败的异常信息,保存到这个Future对象
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if not self.future.done() or self.future is moment:
# 如果self.future没有完成,或者self.future是moment,
# + 那么当self.future完成时,将self.run(间接地)添加到IOLoop。
# + 下次事件循环时,就会执行self.run。
# 并返回False
def inner(f):
# Break a reference cycle to speed GC.
f = None # noqa
self.run()
self.io_loop.add_future(
self.future, inner)
return False
# 如果self.future完成了,那么直接返回True
return True
# 需要额外补充的是:
# 在协程中,不仅可以yield Future、直接一个yield(相当于yield moment),
# + 还可以 res1, res2 = yield [Future1, Future2]、
# + res_dic = yield {key1: Future1, key2: Future2, ...}。
def run(self):
# 如果Runner正在运行或者已经停止,则返回
if self.running or self.finished:
return
try:
self.running = True
while True:
# 如果future尚未完成,则返回
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
# 获取future的结果
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
# 如果future执行失败,则使用异常信息重启生成器
if exc_info is not None:
try:
yielded = self.gen.throw(*exc_info)
finally:
# Break up a reference to itself
# for faster GC on CPython.
exc_info = None
# 如果future执行成功,则使用future的结果重启生成器
else:
yielded = self.gen.send(value)
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
# 如果生成器执行过程中出现了gen.Return或StopIteration异常,则:
# + 1,设置Runner的停止标记
# + 2,从异常对象中提取结果,并保存到代表协程执行结果的Future对象
self.finished = True
self.future = _null_future
...
self.result_future.set_result(_value_from_stopiteration(e))
self.result_future = None
...
return
except Exception:
# 如果生成器执行过程中出现了其他异常,则:
# + 1,设置Runner的停止标记
# + 2,将异常信息保存到代表协程执行结果的Future对象
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
self._deactivate_stack_context()
return
# 如果生成器成功yield,那么,
# + 1,将yield的值转换成Future对象,并保存到self.future
# + 2,如果self.future已经完成,那么继续下一次循环
# + 3,如果self.future尚未完成,那么将self.future添加到IOLoop,并退出
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False
def convert_yielded(yielded):
# Lists and dicts containing YieldPoints were handled earlier.
# 如果yielded是None,那么返回moment
if yielded is None:
return moment
# 如果yielded是list类型的,那么要求每个元素都是Future对象;
# 如果yielded是dict类型的,那么要求每个value都是Future对象;
# + 返回一个新的Future对象,当yielded中所有的Future对象,都完成时,
# + + 这个Future对象才完成。当yielded中的某些Future对象,出现异常时,
# + + 会将第一个异常信息,保存到这个Future对象中,其他的打印出来
elif isinstance(yielded, (list, dict)):
return multi(yielded)
# 如果yielded是Future类型的,那么直接返回
elif is_future(yielded):
return yielded
...
else:
# 否则,引发BadYieledError异常
raise BadYieldError("yielded unknown object %r" % (yielded,))
def multi(children, quiet_exceptions=()):
...
return multi_future(children, quiet_exceptions=quiet_exceptions)
def multi_future(children, quiet_exceptions=()):
if isinstance(children, dict):
keys = list(children.keys())
children = children.values()
else:
keys = None
children = list(map(convert_yielded, children))
assert all(is_future(i) for i in children)
unfinished_children = set(children)
future = Future()
if not children:
future.set_result({} if keys is not None else [])
def callback(f):
unfinished_children.remove(f)
if not unfinished_children:
result_list = []
for f in children:
try:
result_list.append(f.result())
except Exception as e:
if future.done():
if not isinstance(e, quiet_exceptions):
app_log.error("Multiple exceptions in yield list",
exc_info=True)
else:
future.set_exc_info(sys.exc_info())
if not future.done():
if keys is not None:
future.set_result(dict(zip(keys, result_list)))
else:
future.set_result(result_list)
listening = set()
for f in children:
if f not in listening:
listening.add(f)
f.add_done_callback(callback)
return future
class Return(Exception):
def __init__(self, value=None):
super(Return, self).__init__()
self.value = value
# Cython recognizes subclasses of StopIteration with a .args tuple.
self.args = (value,)
总结一下,就是:gen.coroutine修饰的函数,就是一个“协程”,调用(或者叫启动)一个协程,会返回一个Future对象。在协程中,可以通过抛出StopIteration或gen.Return异常,来终止协程的执行,并返回结果。
通常情况下,gen.coroutine修饰的函数都是生成函数,每次调用启动生成器,都应该:
[root@iZj6chejzrsqpclb7miryaZ ~]# python t.py
put seq: 1
put seq: 2
get seq: 1
put seq: 3
put seq: 4
put seq: 5
get seq: 2
put seq: 6
put seq: 7
put seq: 8
get seq: 3
put seq: 9
put seq: 10
put seq: 11
get seq: 4
put seq: 12
put seq: 13
put seq: 14
get seq: 5
put seq: 15
storage is already full.
storage is already full.
get seq: 6
put seq: 16
storage is already full.
storage is already full.
get seq: 7
put seq: 17
storage is already full.
storage is already full.
get seq: 8
put seq: 18
storage is already full.
storage is already full.
get seq: 9
put seq: 19
storage is already full.
storage is already full.
get seq: 10
put seq: 20
shutdown.
[root@iZj6chejzrsqpclb7miryaZ ~]# cat t.py
# coding: utf8
from tornado.ioloop import IOLoop
from tornado.gen import coroutine
from tornado.concurrent import Future
class Storage(object):
def __init__(self, max_goods):
self._max_goods = max_goods
self._goods_number = 0
self._put_seq = 0
self._get_seq = 0
def full(self):
ret = self._max_goods == self._goods_number
if ret:
print "storage is already full."
return ret
def empty(self):
ret = not self._goods_number
if ret:
print "storage has no enough goods."
return ret
def put(self):
if self.full():
return
self._put_seq += 1
print "put seq: %s" % self._put_seq
self._goods_number += 1
def get(self):
if self.empty():
return
self._get_seq += 1
print "get seq: %s" % self._get_seq
self._goods_number -= 1
storage = Storage(10)
def my_sleep(n):
future = Future()
def _inner(f):
f.set_result(None)
ioloop = IOLoop.current()
ioloop.call_at(ioloop.time() + n, _inner, future)
return future
@coroutine
def produer():
produce_count = 0
while produce_count < 20:
yield my_sleep(1)
if not storage.full():
storage.put()
produce_count += 1
@coroutine
def consumer():
consume_count = 0
while consume_count < 10:
yield my_sleep(3)
if not storage.empty():
storage.get()
consume_count += 1
class Shutdown:
def __init__(self, total_count, ioloop=None):
self._total_count = total_count
self._current_count = 0
self._ioloop = ioloop or IOLoop.current()
def __call__(self, f):
f = None
self._current_count += 1
if self._current_count == self._total_count:
print "shutdown."
self._ioloop.stop()
shutdown = Shutdown(2)
def main():
producer_future = produer()
consumer_future = consumer()
IOLoop.current().add_future(producer_future, shutdown)
IOLoop.current().add_future(consumer_future, shutdown)
IOLoop.current().start()
if __name__ == "__main__":
main()