IOLoop是一个水平触发的I/O事件循环。IOLoop在Linux上,使用epoll机制;在BSD或MAC上,使用kqueue机制;在Windows上,降级到select机制。使用IOLoop的系统,如果想要并发地处理成千上万的连接,那么应该使用支持epoll或kqueue的操作系统。
关于IOLoop对象的创建,之前已经进行过分析,请移步:这里(务必阅读)。本文主要对 事件循环的相关实现 进行分析。
server.py:
[root@iZj6chejzrsqpclb7miryaZ ~]# python server.py connection from: ('127.0.0.1', 58730) connection from: ('127.0.0.1', 58732) connection from: ('127.0.0.1', 58734) connection from: ('127.0.0.1', 58736) connection from: ('127.0.0.1', 58738) connection from: ('127.0.0.1', 58740) connection from: ('127.0.0.1', 58742) connection from: ('127.0.0.1', 58744) connection from: ('127.0.0.1', 58746) connection from: ('127.0.0.1', 58748) connection from: ('127.0.0.1', 58750) connection from: ('127.0.0.1', 58752) connection from: ('127.0.0.1', 58754) connection from: ('127.0.0.1', 58756) connection from: ('127.0.0.1', 58758) connection from: ('127.0.0.1', 58760) connection from: ('127.0.0.1', 58762) connection from: ('127.0.0.1', 58764) connection from: ('127.0.0.1', 58766) connection from: ('127.0.0.1', 58768) connection from: ('127.0.0.1', 58770) ^C[root@iZj6chejzrsqpclb7miryaZ ~]# cat server.py # coding: utf8 import socket from functools import partial import errno import signal from tornado.ioloop import IOLoop def handle_connection(connection, addr): print "connection from: %s" % (connection.getpeername(), ) connection.close() def on_connection_ready(server_socket, fd, events): try: connection, addr = server_socket.accept() except socket.error as ex: if ex.args[0] in [errno.EWOULDBLOCK, errno.EAGAIN]: return raise handle_connection(connection, addr) def main(): ioloop = IOLoop.current() server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(("0.0.0.0", 9099)) server_socket.setblocking(False) server_socket.listen(5) ioloop.add_handler(server_socket.fileno(), partial(on_connection_ready, server_socket), ioloop.READ) ioloop.start() if __name__ == "__main__": def on_quit(sn, fo): IOLoop.current().stop() signal.signal(signal.SIGINT, on_quit) main()
client.py:
[root@iZj6chejzrsqpclb7miryaZ ~]# cat client.py import socket import threading def test(): client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.connect(("127.0.0.1", 9099)) client_socket.send("123") client_socket.close() def main(): threads = [] for _ in range(20): thread = threading.Thread(target=test) threads.append(thread) thread.setDaemon(True) thread.start() for thread in threads: thread.join() if __name__ == "__main__": main()
IOLoop和PollIOLoop的源代码在:http://www.tornadoweb.org/en/stable/_modules/tornado/ioloop.html#IOLoop
EPollIOLoop、KQueueIOLoop、SelectIOLoop的源代码在:https://github.com/tornadoweb/tornado/tree/master/tornado/platform
EPollIOLoop、KQueueIOLoop、SelectIOLoop都继承自PollIOLoop,PollIOLoop继承自IOLoop。前三者都只是将自身所使用的I/O多路复用机制的接口 适配成 PollIOLoop所期望的接口。事件循环的主要代码都集中在PollIOLoop中,IOLoop的大部分代码是关于如何创建IOLoop对象的,同时也定义了PollIOLoop需要实现的接口,以及一些工具方法。
以EPollIOLoop为例:
class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
EPollIOLoop.initialize()方法会调用PollIOLoop.initialize()方法。它主要是将一个epoll对象(也就是自己的I/O多路复用实现)传递给PollIOLoop.initialize()方法。
PollIOLoop.initialize()方法,主要是初始化了在事件循环中所要使用到的各种数据结构:
class PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or `tornado.platform.select.SelectIOLoop` (all platforms). """ def initialize(self, impl, time_func=None, **kwargs): # 调用IOLoop的initialize()方法 super(PollIOLoop, self).initialize(**kwargs) # !!!I/O多路复用实现!!! self._impl = impl if hasattr(self._impl, 'fileno'): # 为epoll或kqueue描述符设置FD_CLOEXEC标记 set_close_exec(self._impl.fileno()) # 获取当前时间戳的函数,默认是time.time self.time_func = time_func or time.time # 该字典保存 注册到IOLoop的文件描述符 与 其处理函数 之间的映射关系 # + 当文件描述符上有事件发生时,会使用fd,event作为参数调用处理函数 self._handlers = {} # 该字典保存 发生事件的文件描述符 到 事件掩码 之间的映射关系 # + 每次事件轮询结束之后,都会将 发生事件的描述符及其上发生的事件 update到这个字典 self._events = {} # 添加到IOLoop的回调函数,都会保存到该列表, # + 每次事件循环开始时,都会先执行这些回调函数 # !!!注意:这里使用的是deque,deque是线程安全的!!! self._callbacks = collections.deque() # 添加到IOLoop的定时器(_Timeout对象),都会保存到该列表, # 每次事件循环开始时,都会先检查定时器列表, # + 并执行到达deadline的定时器的回调函数。 # 该列表本质上是一个最小堆,接下来称之为定时器堆 self._timeouts = [] # 该属性代表定时器堆中,被取消的定时器的数量。 # + 当取消一个定时器的时候,会将该计数器加1,并将定时器的回调函数置空; # 每次事件循环开始时,会执行下面的循环: # 1,如果定时器堆非空: # + 1.1,如果堆顶的定时器被取消了,那么则从定时器堆中 # + + 移除之,并将该计数器减少1;然后回到步骤1 # + 1.2,如果堆顶的定时器的deadline小于当前时间,那么则从定时器堆中 # + + 移除之,并且将它 添加到 待执行的定时器列表;然后回到步骤1 # + 1.3,否则,跳出循环 # 否则,跳出循环 # 当定时器堆中被取消的定时器达到总数量的一半 并且 # + 被取消的定时器的数量大于512 时, # + 会将所有被取消的定时器删除,然后重新构造定时器堆。 # 之所以如此,是因为: # + 如果每次取消一个定时器的时候,都从堆中删除元素的话,就需要不断地调整堆, # + + 所以为了提升效率,只是进行标记删除, # + + + 然后在事件循环开始时,再真正的从定时器堆中删除 self._cancellations = 0 # 事件循环的启动标记 self._running = False # 事件循环的停止标记 self._stopped = False # 是否正在关闭IOLoop, # + 关闭IOLoop的目的是释放它所使用的所有资源 self._closing = False # 启动IOLoop实例的线程的标识符 self._thread_ident = None # 请移步:关于blocking_signal_threshold self._blocking_signal_threshold = None # 用于定时器的计数器,每个定时器对象都有一个计数器 self._timeout_counter = itertools.count() # 请移步:关于waker # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
IOLoop:
class IOLoop(Configurable): # 在Future对象完成之后,会将回调函数添加到IOLoop中, # + 回调函数需要接受一个Future对象作为参数 def add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future)) # 运行回调函数 # + 回调函数应该返回None或一个Future对象 # + + 当返回None时,不做任何处理 # + + 当返回Future对象时,会在Future对象完成时,会获取它的结果,并销毁, # + + + 因此,需要避免未被处理的异常 # + 当回调函数出现异常时,会调用handle_callback_exception()方法 def _run_callback(self, callback): """Runs a callback with error handling. For use in subclasses. """ try: ret = callback() if ret is not None: from tornado import gen # Functions that return Futures typically swallow all # exceptions and store them in the Future. If a Future # makes it out to the IOLoop, ensure its exception (if any) # gets logged too. try: ret = gen.convert_yielded(ret) except gen.BadYieldError: # It's not unusual for add_callback to be used with # methods returning a non-None and non-yieldable # result, which should just be ignored. pass else: self.add_future(ret, self._discard_future_result) except Exception: self.handle_callback_exception(callback) def _discard_future_result(self, future): """Avoid unhandled-exception warnings from spawned coroutines.""" future.result() def handle_callback_exception(self, callback): """This method is called whenever a callback run by the `IOLoop` throws an exception. By default simply logs the exception as an error. Subclasses may override this method to customize reporting of exceptions. The exception itself is not passed explicitly, but is available in `sys.exc_info`. """ app_log.error("Exception in callback %r", callback, exc_info=True)
PollIOLoop:
class PollIOLoop(IOLoop): # 向IOLoop添加一个文件描述符。 # + 在事件轮询后,如果文件描述符上发生了关注的事件, # + + 那么会调用这里设置的处理函数。 # + + 处理函数接受两个参数(fd, events)。 def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) # 更新已经添加到IOLoop的文件描述符 def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) # 移除文件描述符 def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) # 启动IOLoop def start(self): # 如果已经启动,则抛出异常 if self._running: raise RuntimeError("IOLoop is already running") ... # 如果已经停止,将停止标记设置为False,并立即返回 if self._stopped: self._stopped = False return # 将当前线程的IOLoop实例保存起来, # + 然后将该IOLoop实例设置为当前线程的IOLoop实例。 # + 在事件循环结束之后,会还原回来。 # 比如在线程B中,启动线程A的IOLoop实例, # + 那么线程B在事件循环开始之前, # + + 会将自己的IOLoop实例设置为线程A的; # + + 然后在事件循环结束之后,还原回来。 # 如果不这样做,可能会导致无法工作,因为: # + 在很多模块中,都是通过IOLoop.current()方法, # + + 获取IOLoop实例的,那么在线程B中,获取到的就是线程B的IOLoop实例, # + + 然后这些模块就会把回调函数、定时器、处理函数注册到线程B的IOLoop实例上, # + + 但是线程B 启动的 却是线程A的IOLoop实例 old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self # 启动IOLoop实例的线程的标识符 self._thread_ident = thread.get_ident() # 将IOLoop实例设置为运行状态 self._running = True # 设置wakeup文件描述符 ... try: 事件循环 while True: ... # 收集到达deadline的定时器 ... # 逐个执行每一个添加到IOLoop的回调函数 for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) # 逐个执行每个到期的定时器的回调函数 for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # 需要额外说明的是:被调度过的定时器和回调函数, 会被从IOLoop中移除。 并且 ,在定时器或回调函数中, 再向IOLoop添加定时器或回调函数,会在下一次事件循环时被调度。 ... # 如果有回调函数,那么事件轮询的超时时间设置为0, # + 以防,当没有任何事件发生时,事件轮询一直在等待, # + + 进而导致回调函数无法即时执行 if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 # 如果有定时器,那么将超时时间设置为 # + 默认超时间 和 最先被调度的定时器的到期时间 中的较小值, # + + 以防,定时器无法按时执行 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) # 当没有回调函数,也没有定时器的时候,将事件轮询超时时间设置为默认值 else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT # 事件循环的退出条件: 运行标记为False if not self._running: break # 清除闹钟 ... # 事件轮询 try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise # 恢复闹钟 ... # 逐个执行发生了事件的文件描述符的处理函数 # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that modify self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None # 事件循环退出之后 finally: # 重置停止标记为False self._stopped = False # 清除闹钟 ... # 恢复当前线程的IOLoop实例 ... # 恢复wakeup文件描述符 ... # 停止事件循环 # + 1,设置运行标记为False,停止标记为True # + 2,唤醒IO循环 def stop(self): # 每次事件循环过程中都会判断启动标记, # + 当它为False时,事件循环就会退出 self._running = False self._stopped = True self._waker.wake() # 给IOLoop添加一个定时器 # + 1,生成一个定时器对象,它包含:deadline时间戳,回调函数,计数器 # + 2,将定时器对象添加到 定时器堆 中 # + 3,返回定时器对象,给调用方 def call_at(self, deadline, callback, *args, **kwargs): timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout # 从IOLoop中移除一个定时器 # + 1,将定时器对象的callback置空 # + 2,将计数器增加1 def remove_timeout(self, timeout): # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None self._cancellations += 1 # 向IOLoop中添加一个回调函数 # + 1,将回调函数添加到回调函数列表 # + 2,如果是在非IOLoop线程添加的,那么唤醒IO循环, # + + 以防,回调函数迟迟无法执行 def add_callback(self, callback, *args, **kwargs): if self._closing: return # Blindly insert into self._callbacks. This is safe even # from signal handlers because deque.append is atomic. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if thread.get_ident() != self._thread_ident: # This will write one byte but Waker.consume() reads many # at once, so it's ok to write even when not strictly # necessary. self._waker.wake() else: # If we're on the IOLoop's thread, we don't need to wake anyone. pass
阅读本部分的源码之前,建议先阅读:Python中的信号处理。
下面是IOLoop 和 PollIOLoop中的相关代码:
class IOLoop(Configurable): def set_blocking_signal_threshold(self, seconds, action): """Sends a signal if the `IOLoop` is blocked for more than ``s`` seconds. Pass ``seconds=None`` to disable. Requires Python 2.6 on a unixy platform. The action parameter is a Python signal handler. Read the documentation for the `signal` module for more information. If ``action`` is None, the process will be killed if it is blocked for too long. """ raise NotImplementedError() def set_blocking_log_threshold(self, seconds): """Logs a stack trace if the `IOLoop` is blocked for more than ``s`` seconds. Equivalent to ``set_blocking_signal_threshold(seconds, self.log_stack)`` """ self.set_blocking_signal_threshold(seconds, self.log_stack) def log_stack(self, signal, frame): """Signal handler to log the stack trace of the current thread. For use with `set_blocking_signal_threshold`. """ gen_log.warning('IOLoop blocked for %f seconds in\n%s', self._blocking_signal_threshold, ''.join(traceback.format_stack(frame))) class PollIOLoop(IOLoop): def start(self): ... try: # 事件循环 while True: # 一次事件循环的开始 ... # 取消闹钟 if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: # 事件轮询开始 event_pairs = self._impl.poll(poll_timeout) # 事件轮询结束 except Exception as e: ... # 重新设置闹钟 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) ... # 一次事件循环的结束 finally: ... # 事件循环结束之后,会清除闹钟 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) ... def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL)
PollIOLoop.set_blocking_signal_threshold(seconds, action)方法的流程是:
重点在PollIOLoop.start()方法,该方法就是事件循环:
结合这两个函数,可以总结出:一次事件轮询结束之后 到 下一次事件轮询开始之前,如果时间超过seconds秒,那么就会执行信号处理函数action。在这期间,事件循环的主要行为包括:
如果在回调函数、定时器、处理函数中,使用了阻塞的操作(比如使用MySQLdb这个MySQL驱动),那么就会阻塞事件循环。通过blocking_signal_threshold,可以辅助我们判断是否有阻塞的操作发生,以及发生在那里。如果没有在设定的时间内,进行下一次事件轮询,那么就会触发设置的闹钟,进而执行设置的信号处理函数,而且信号处理函数的第二个参数就是 frame 对象。
IOLoop.set_blocking_log_threshold(seconds)方法 等价于 set_blocking_signal_threshold(seconds, IOLoop.log_stack),IOLoop.log_stack()这个处理函数会把当前的线程栈记录到日志。
下面看一个例子:
[root@iZj6chejzrsqpclb7miryaZ ~]# cat t2.py import time import random import tornado.ioloop as ioloop myioloop = ioloop.IOLoop.current() def test_callback(): # !!!here is a blocking operation!!! time.sleep(random.random()) myioloop.add_callback(test_callback) myioloop.set_blocking_log_threshold(0.5) myioloop.add_callback(test_callback) myioloop.start() [root@iZj6chejzrsqpclb7miryaZ ~]# python t2.py WARNING:tornado.general:IOLoop blocked for 0.500000 seconds in File "t2.py", line 14, inmyioloop.start() File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 832, in start self._run_callback(self._callbacks.popleft()) File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 605, in _run_callback ret = callback() File "/usr/lib64/python2.7/site-packages/tornado/stack_context.py", line 277, in null_wrapper return fn(*args, **kwargs) File "t2.py", line 9, in test_callback time.sleep(random.random()) WARNING:tornado.general:IOLoop blocked for 0.500000 seconds in File "t2.py", line 14, in myioloop.start() File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 832, in start self._run_callback(self._callbacks.popleft()) File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 605, in _run_callback ret = callback() File "/usr/lib64/python2.7/site-packages/tornado/stack_context.py", line 277, in null_wrapper return fn(*args, **kwargs) File "t2.py", line 9, in test_callback time.sleep(random.random()) ^CTraceback (most recent call last): File "t2.py", line 14, in myioloop.start() File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 832, in start self._run_callback(self._callbacks.popleft()) File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 605, in _run_callback ret = callback() File "/usr/lib64/python2.7/site-packages/tornado/stack_context.py", line 277, in null_wrapper return fn(*args, **kwargs) File "t2.py", line 9, in test_callback time.sleep(random.random()) KeyboardInterrupt
可以看到time.sleep()处发生了阻塞。
该部分代码是平台相关的,不同的平台有不同的实现,我们只看posix的实现,源代码在:https://github.com/tornadoweb/tornado/blob/master/tornado/platform/posix.py。
在阅读这部分代码之前,应该先简单过一下fcntl这个模块。Python中的fcntl模块是对Unix的fcntl和ioctl接口的封装。它主要是通过文件描述符,对文件 和 IO 进行控制。官方文档在:https://docs.python.org/2/library/fcntl.html。
posix.py的代码不多,下面就全部贴出来了:
# 当给文件描述符设置了FD_CLOEXEC标记的时候, # + 如果进程通过exec*创建子进程,那么在子进程中,这个文件描述会被关闭, # + 但是如果通过fork创建子进程,那么在子进程中,就可以使用这个文件描述符 def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) # 当给文件描述符设置了O_NONBLOCK标记时, # + 那么这个文件描述符是非阻塞的 def _set_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) class Waker(interface.Waker): def __init__(self): # 创建一个管道,会返回两个文件描述符,一个用于读,一个用于写 r, w = os.pipe() # 将这两个文件描述符都设置成非阻塞的 _set_nonblocking(r) _set_nonblocking(w) # 设置FD_CLOEXEC标记 set_close_exec(r) set_close_exec(w) # 通过文件描述符创建file object self.reader = os.fdopen(r, "rb", 0) self.writer = os.fdopen(w, "wb", 0) # 返回 用于从管道中读数据的文件描述符 def fileno(self): return self.reader.fileno() # 返回 用于向管道中写数据的文件描述符 def write_fileno(self): return self.writer.fileno() # 向管道写一个字节 def wake(self): try: self.writer.write(b"x") except (IOError, ValueError): pass # 读空管道中的数据 def consume(self): try: while True: result = self.reader.read() if not result: break except IOError: pass # 关闭读、写file object def close(self): self.reader.close() common.try_close(self.writer)
在tornado中waker主要用来唤醒I/O循环。下是PollIOLoop中的相关代码:
class PollIOLoop(IOLoop): def initialize(self, impl, time_func=None, **kwargs): .... # 创建waker对象 self._waker = Waker() # 当管道中有数据可读的时候,读空它 self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) # close()方法不太常用,它主要用于释放IOLoop所使用的所有资源, # + 当然也包括waker对象的关闭。 def close(self, all_fds=False): self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in list(self._handlers.values()): self.close_fd(fd) self._waker.close() # 需要先阅读:signal.set_wakeup_fd(fd)的使用。 # 在start()方法中,事件循环开始之前,如果已经设置了wakeup描述符,那么什么都不做; # + 否则,将waker对象的写文件描述符设置为wakeup描述符, # + + 事件循环结束之后,再恢复回来。 # 这是通过信号的方式,来唤醒IO循环 def start(self): ... old_wakeup_fd = None # 如果操作系统是posix,并且支持signal.set_wakeup_fd() if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': # requires python 2.6+, unix. set_wakeup_fd exists but crashes # the python process on windows. try: # 将waker对象的写文件描述符设置为wakeup描述符, # + 之前设置的wakeup描述符会被保存在old_wakeup_fd中 old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) # 如果之前设置过wakeup描述符,也就是 old_wakeup_fd 不等于 -1, # + 那么,将wakeup描述符恢复成之前所设置的, # + + 之所以这么做是因为signal模块没有提供类似get_wakeup_fd的接口 if old_wakeup_fd != -1: # Already set, restore previous value. This is a little racy, # but there's no clean get_wakeup_fd and in real use the # IOLoop is just started once at the beginning. signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None # 如果在非主线程上调用signal.set_wakeup_fd(),会引发ValueError except ValueError: # Non-main thread, or the previous value of wakeup_fd # is no longer valid. old_wakeup_fd = None try: # 事件循环 while True: ... finally: ... # 事件循环结束之后,对wakeup描述符进行恢复 if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) def stop(self): self._running = False self._stopped = True # 关闭IOLoop时,唤醒IO循环 self._waker.wake() def add_callback(self, callback, *args, **kwargs): if self._closing: return # Blindly insert into self._callbacks. This is safe even # from signal handlers because deque.append is atomic. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if thread.get_ident() != self._thread_ident: # This will write one byte but Waker.consume() reads many # at once, so it's ok to write even when not strictly # necessary. # 在其他线程上添加callback的时候,唤醒IO循环, # + 以防止轮询等待导致回调函数无法即时执行 self._waker.wake() else: # If we're on the IOLoop's thread, we don't need to wake anyone. pass
下面看一个例子:
[root@iZj6chejzrsqpclb7miryaZ ~]# python t.py before poll wake <__main__.MyEPollIOLoop object at 0x7f925c8abc90> after poll before poll ^CTraceback (most recent call last): File "t.py", line 41, inmain() File "t.py", line 38, in main ioloop.start() File "/usr/lib64/python2.7/site-packages/tornado/ioloop.py", line 863, in start event_pairs = self._impl.poll(poll_timeout) File "t.py", line 14, in _inner ret = self._epoll.poll(*a, **kw) KeyboardInterrupt [root@iZj6chejzrsqpclb7miryaZ ~]# cat t.py import select import threading from tornado.ioloop import IOLoop, PollIOLoop class MyEpoll(object): def __init__(self): self._epoll = select.epoll() def __getattr__(self, attr_name): if attr_name == "poll": def _inner(*a, **kw): print("before poll") ret = self._epoll.poll(*a, **kw) print("after poll") return ret return _inner attr = getattr(self._epoll, attr_name, None) if attr is None: raise AttributeError("MyEpoll has no attribute: %s" % attr_name) return attr class MyEPollIOLoop(PollIOLoop): def initialize(self, my_impl=MyEpoll(), *a, **kw): super(MyEPollIOLoop, self).initialize(impl=my_impl, *a, **kw) def main(): IOLoop.configure(MyEPollIOLoop) ioloop = IOLoop.instance() def wake(): import time time.sleep(10) print("wake %r" % ioloop) ioloop._waker.wake() threading.Thread(target=wake).start() ioloop.start() if __name__ == "__main__": main()
当IOLoop上没有任何回调函数和定时器的时候,事件轮询的默认超时时间是3600秒,上面的示例程序中,使用了另外一个线程在10秒之后,使用waker来唤醒IO循环。