tornado.iostream模块提供了向非阻塞文件或socket写数据 或 从它们读数据的工具类:
本文只解析BaseIOStream提供的read_bytes()接口。
源代码在:http://www.tornadoweb.org/en/stable/_modules/tornado/iostream.html#BaseIOStream。
下面是相关的主要代码:
class BaseIOStream(object): def __init__(self, io_loop=None, max_buffer_size=None, read_chunk_size=None, max_write_buffer_size=None): self.io_loop = io_loop or ioloop.IOLoop.current() self.max_buffer_size = max_buffer_size or 104857600 # 每次<fd>.read调用最多读取的字节数 # A chunk size that is too close to max_buffer_size can cause # spurious failures. self.read_chunk_size = min(read_chunk_size or 65536, self.max_buffer_size // 2) ... # 读缓冲区:读缓冲区中的数据分为已经被消费 + 尚未被消费的。 self._read_buffer = bytearray() # + 读指针指向第一个尚未被消费的字节。 # + 随着缓冲区中的数据被消费,读指针会右移。 # + 当读指针大于缓冲区大小时,缓冲区会向右收缩,释放空间。 self._read_buffer_pos = 0 # 读缓冲区的大小(特指未被消费的那部分缓冲区的大小) self._read_buffer_size = 0 ... # read_bytes()方法的第一个参数 self._read_bytes = None ... # read callback self._read_callback = None # read future self._read_future = None ... # 关注的事件 self._state = None ... # 异步的读取指定数量的字节。 # + 如果指定了callback,那么当读取到指定数量的数据之后, # + + 会使用数据作为第一个参数调用这个回调函数; # + 如果没有指定callback,则返回一个Future对象。 # 本文只解析streaming_callback、partial为默认值的情况。 def read_bytes(self, num_bytes, callback=None, streaming_callback=None, partial=False): """Asynchronously read a number of bytes. If a ``streaming_callback`` is given, it will be called with chunks of data as they become available, and the final result will be empty. Otherwise, the result is all the data that was read. If a callback is given, it will be run with the data as an argument; if not, this method returns a `.Future`. If ``partial`` is true, the callback is run as soon as we have any bytes to return (but never more than ``num_bytes``) .. versionchanged:: 4.0 Added the ``partial`` argument. The callback argument is now optional and a `.Future` will be returned if it is omitted. """ future = self._set_read_callback(callback) assert isinstance(num_bytes, numbers.Integral) self._read_bytes = num_bytes self._read_partial = partial self._streaming_callback = stack_context.wrap(streaming_callback) try: self._try_inline_read() except: if future is not None: future.add_done_callback(lambda f: f.exception()) raise return future # 如果callback为None,则返回一个Future对象, # + 当读操作完成时,会将数据或异常信息填充到该对象中; # 否则,将其设置为read callback,当读操作完成之后,会调用该回调函数。 def _set_read_callback(self, callback): # 如果read callback 和 read future 都不为None, # + 说明已经有一个读操作正在执行,抛出错误。 assert self._read_callback is None, "Already reading" assert self._read_future is None, "Already reading" if callback is not None: self._read_callback = stack_context.wrap(callback) else: self._read_future = TracebackFuture() return self._read_future # 尝试从读缓冲区,完成当前的读操作。 # + 如果读操作能够被满足,则在下一次IOLoop迭代时,执行read callback; # + 否则,将文件描述符添加到IOLoop上,并且关注其上的读操作。 def _try_inline_read(self): """Attempt to complete the current read operation from buffered data. If the read can be completed without blocking, schedules the read callback on the next IOLoop iteration; otherwise starts listening for reads on the socket. """ # See if we've already got the data from a previous read self._run_streaming_callback() # 1,尝试从读缓冲区完成当前挂起的读操作: pos = self._find_read_pos() if pos is not None: # 1.1,如果可以完成,那么从read buffer读取数据, # + 之后,使用数据调用read callback,或填充read future self._read_from_buffer(pos) return self._check_closed() try: # 2,尝试从fd上读取能够完成当前读操作的数据 pos = self._read_to_buffer_loop() except Exception: # If there was an in _read_to_buffer, we called close() already, # but couldn't run the close callback because of _pending_callbacks. # Before we escape from this function, run the close callback if # applicable. self._maybe_run_close_callback() raise # 2.1,如果能够从fd上读取到能完成当前读操作的数据,那么从read buffer读取数据 if pos is not None: self._read_from_buffer(pos) return # We couldn't satisfy the read inline, so either close the stream # or listen for new data. # 3,否则将fd添加到IOLoop,并关注其上的读操作 if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ) # 尝试从读缓冲区中找到满足 # + 当前挂起的读请求的位置,如果当前读请求能够被满足,则返回这个位置,否则返回None。 # + + 比如,当前的读请求是read_bytes(num_bytes, partial=False),那么当读缓冲区中有 # + + + 大于等于 num_bytes 个字节时,则返回num_bytes # + + + 否则,返回None def _find_read_pos(self): """Attempts to find a position in the read buffer that satisfies the currently-pending read. Returns a position in the buffer if the current read can be satisfied, or None if it cannot. """ if (self._read_bytes is not None and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))): num_bytes = min(self._read_bytes, self._read_buffer_size) return num_bytes ... return None # 从读缓冲区中,完成当前挂起的读请求。 def _read_from_buffer(self, pos): """Attempts to complete the currently-pending read from the buffer. The argument is either a position in the read buffer or None, as returned by _find_read_pos. """ self._read_bytes = self._read_delimiter = self._read_regex = None self._read_partial = False self._run_read_callback(pos, False) def _run_read_callback(self, size, streaming): ... else: callback = self._read_callback self._read_callback = self._streaming_callback = None if self._read_future is not None: assert callback is None # 如果没有设置read callback, # + 则将数据保存到read future future = self._read_future self._read_future = None future.set_result(self._consume(size)) if callback is not None: assert (self._read_future is None) or streaming # 如果设置了read callback, # + 那么则在下一次IOLoop迭代时,调度它 self._run_callback(callback, self._consume(size)) else: # If we scheduled a callback, we will add the error listener # afterwards. If we didn't, we have to do it now. self._maybe_add_error_listener() def _run_callback(self, callback, *args): def wrapper(): self._pending_callbacks -= 1 try: return callback(*args) except Exception: app_log.error("Uncaught exception, closing connection.", exc_info=True) # Close the socket on an uncaught exception from a user callback # (It would eventually get closed when the socket object is # gc'd, but we don't want to rely on gc happening before we # run out of file descriptors) self.close(exc_info=True) # Re-raise the exception so that IOLoop.handle_callback_exception # can see it and log the error raise finally: self._maybe_add_error_listener() # We schedule callbacks to be run on the next IOLoop iteration # rather than running them directly for several reasons: # * Prevents unbounded stack growth when a callback calls an # IOLoop operation that immediately runs another callback # * Provides a predictable execution context for e.g. # non-reentrant mutexes # * Ensures that the try/except in wrapper() is run outside # of the application's StackContexts with stack_context.NullContext(): # stack_context was already captured in callback, we don't need to # capture it again for IOStream's wrapper. This is especially # important if the callback was pre-wrapped before entry to # IOStream (as in HTTPConnection._header_callback), as we could # capture and leak the wrong context here. self._pending_callbacks += 1 self.io_loop.add_callback(wrapper) # 该方法用于检测:一个没有活跃读、写请求的连接是否被关闭, # + 为此,必须监听读事件。然而,在连接刚建立的时候,执行这个操作是无用的, # + 因为这种情况下,我们会立刻进行读写。在IOStream中,很多地方 # + 都插入了这个检查,当连接空闲时,那么就监听其上的读事件。 def _maybe_add_error_listener(self): # This method is part of an optimization: to detect a connection that # is closed when we're not actively reading or writing, we must listen # for read events. However, it is inefficient to do this when the # connection is first established because we are going to read or write # immediately anyway. Instead, we insert checks at various times to # see if the connection is idle and add the read listener then. if self._pending_callbacks != 0: return if self._state is None or self._state == ioloop.IOLoop.ERROR: if self.closed(): self._maybe_run_close_callback() elif (self._read_buffer_size == 0 and self._close_callback is not None): self._add_io_state(ioloop.IOLoop.READ) # 从read buffer上读取loc个字节,并返回。 def _consume(self, loc): # Consume loc bytes from the read buffer and return them if loc == 0: return b"" assert loc <= self._read_buffer_size # Slice the bytearray buffer into bytes, without intermediate copying # 这里用到了memoryview, # + memoryview为支持buffer protocol的对象,提供了基于字节的访问接口。 # + 使用memoryview不会发生内存拷贝。 b = (memoryview(self._read_buffer) [self._read_buffer_pos:self._read_buffer_pos + loc] ).tobytes() # 移动读指针 和 修改缓冲区大小 self._read_buffer_pos += loc self._read_buffer_size -= loc # Amortized O(1) shrink # (this heuristic is implemented natively in Python 3.4+ # but is replicated here for Python 2) # 当 读指针 大于 缓冲区大小 的时候, # + 会对缓冲区进行收缩: # + + 1,删除已经被消费内容的缓冲区 # + + 2,将读指针归零 if self._read_buffer_pos > self._read_buffer_size: del self._read_buffer[:self._read_buffer_pos] self._read_buffer_pos = 0 return b # 向事件处理函数添加`state` def _add_io_state(self, state): """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler. Implementation notes: Reads and writes have a fast path and a slow path. The fast path reads synchronously from socket buffers, while the slow path uses `_add_io_state` to schedule an IOLoop callback. Note that in both cases, the callback is run asynchronously with `_run_callback`. To detect closed connections, we must have called `_add_io_state` at some point, but we want to delay this as much as possible so we don't have to set an `IOLoop.ERROR` listener that will be overwritten by the next slow-path operation. As long as there are callbacks scheduled for fast-path ops, those callbacks may do more reads. If a sequence of fast-path ops do not end in a slow-path op, (e.g. for an @asynchronous long-poll request), we must add the error handler. This is done in `_run_callback` and `write` (since the write callback is optional so we can have a fast-path write with no `_run_callback`) """ if self.closed(): # connection has been closed, so there can be no future events return if self._state is None: self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext(): self.io_loop.add_handler( self.fileno(), self._handle_events, self._state) elif not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.fileno(), self._state) # 尝试从fd上读取期望数量的字节, # + 如果读取到的数据,能够满足当前的读操作,则返回位置;否则返回None。 def _read_to_buffer_loop(self): # This method is called from _handle_read and _try_inline_read. try: if self._read_bytes is not None: target_bytes = self._read_bytes ... else: target_bytes = 0 next_find_pos = 0 # Pretend to have a pending callback so that an EOF in # _read_to_buffer doesn't trigger an immediate close # callback. At the end of this method we'll either # establish a real pending callback via # _read_from_buffer or run the close callback. # # We need two try statements here so that # pending_callbacks is decremented before the `except` # clause below (which calls `close` and does need to # trigger the callback) self._pending_callbacks += 1 while not self.closed(): # Read from the socket until we get EWOULDBLOCK or equivalent. # SSL sockets do some internal buffering, and if the data is # sitting in the SSL object's buffer select() and friends # can't see it; the only way to find out if it's there is to # try to read it. # 如果fd的输入缓冲区为空,则退出 if self._read_to_buffer() == 0: break self._run_streaming_callback() # If we've read all the bytes we can use, break out of # this loop. We can't just call read_from_buffer here # because of subtle interactions with the # pending_callback and error_listener mechanisms. # # If we've reached target_bytes, we know we're done. if (target_bytes is not None and self._read_buffer_size >= target_bytes): break # Otherwise, we need to call the more expensive find_read_pos. # It's inefficient to do this on every read, so instead # do it on the first read and whenever the read buffer # size has doubled. if self._read_buffer_size >= next_find_pos: pos = self._find_read_pos() if pos is not None: return pos next_find_pos = self._read_buffer_size * 2 return self._find_read_pos() finally: self._pending_callbacks -= 1 def _handle_events(self, fd, events): if self.closed(): gen_log.warning("Got events for closed stream %s", fd) return try: ... if events & self.io_loop.READ: self._handle_read() ... # 在处理完读写事件之后, # + 会更改关注的事件。 # 如果当前没有读操作,那么则不再关注读事件,否则关注; # 如果当前没有写操作,那么则不再关注写事件,否则关注; # 如果当前连接空闲(也就是没有读写操作,并且读缓冲区为空), # + 则关注读操作(目的是检测连接是否断开),否则不关注; # 最后,更新关注的事件 state = self.io_loop.ERROR if self.reading(): state |= self.io_loop.READ if self.writing(): state |= self.io_loop.WRITE if state == self.io_loop.ERROR and self._read_buffer_size == 0: # If the connection is idle, listen for reads too so # we can tell if the connection is closed. If there is # data in the read buffer we won't run the close callback # yet anyway, so we don't need to listen in this case. state |= self.io_loop.READ if state != self._state: assert self._state is not None, \ "shouldn't happen: _handle_events without self._state" self._state = state self.io_loop.update_handler(self.fileno(), self._state) except UnsatisfiableReadError as e: gen_log.info("Unsatisfiable read, closing connection: %s" % e) self.close(exc_info=True) except Exception: gen_log.error("Uncaught exception, closing connection.", exc_info=True) self.close(exc_info=True) raise def close(self, exc_info=False): """Close this stream. If ``exc_info`` is true, set the ``error`` attribute to the current exception from `sys.exc_info` (or if ``exc_info`` is a tuple, use that instead of `sys.exc_info`). """ if not self.closed(): if exc_info: if not isinstance(exc_info, tuple): exc_info = sys.exc_info() if any(exc_info): self.error = exc_info[1] if self._read_until_close: if (self._streaming_callback is not None and self._read_buffer_size): self._run_read_callback(self._read_buffer_size, True) self._read_until_close = False self._run_read_callback(self._read_buffer_size, False) if self._state is not None: self.io_loop.remove_handler(self.fileno()) self._state = None self.close_fd() self._closed = True self._maybe_run_close_callback() def _maybe_run_close_callback(self): # If there are pending callbacks, don't run the close callback # until they're done (see _maybe_add_error_handler) if self.closed() and self._pending_callbacks == 0: futures = [] if self._read_future is not None: futures.append(self._read_future) self._read_future = None futures += [future for _, future in self._write_futures] self._write_futures.clear() if self._connect_future is not None: futures.append(self._connect_future) self._connect_future = None if self._ssl_connect_future is not None: futures.append(self._ssl_connect_future) self._ssl_connect_future = None for future in futures: future.set_exception(StreamClosedError(real_error=self.error)) if self._close_callback is not None: cb = self._close_callback self._close_callback = None self._run_callback(cb) # Delete any unfinished callbacks to break up reference cycles. self._read_callback = self._write_callback = None # Clear the buffers so they can be cleared immediately even # if the IOStream object is kept alive by a reference cycle. # TODO: Clear the read buffer too; it currently breaks some tests. self._write_buffer = None self._write_buffer_size = 0 def _handle_read(self): try: pos = self._read_to_buffer_loop() except UnsatisfiableReadError: raise except Exception as e: gen_log.warning("error on read: %s" % e) self.close(exc_info=True) return if pos is not None: self._read_from_buffer(pos) return else: self._maybe_run_close_callback() # 从fd上读取数据,并追加到读缓冲区的末尾。 # + 该方法返回实际读取的自节数。如果没有读到任何数据,则返回0。 # + 出错时,关闭fd,并抛出异常。 def _read_to_buffer(self): """Reads from the socket and appends the result to the read buffer. Returns the number of bytes read. Returns 0 if there is nothing to read (i.e. the read returns EWOULDBLOCK or equivalent). On error closes the socket and raises an exception. """ while True: try: chunk = self.read_from_fd() except (socket.error, IOError, OSError) as e: if errno_from_exception(e) == errno.EINTR: continue # ssl.SSLError is a subclass of socket.error if self._is_connreset(e): # Treat ECONNRESET as a connection close rather than # an error to minimize log spam (the exception will # be available on self.error for apps that care). self.close(exc_info=True) return self.close(exc_info=True) raise break if chunk is None: return 0 self._read_buffer += chunk self._read_buffer_size += len(chunk) if self._read_buffer_size > self.max_buffer_size: gen_log.error("Reached maximum read buffer size") self.close() raise StreamBufferFullError("Reached maximum read buffer size") return len(chunk)
这个类中的方法,比较散乱,下面总结一下:
read_bytes(num_bytes)的大致执行流程是:
server.py:
import socket import errno import functools import struct import signal from tornado.ioloop import IOLoop import tornado.gen as gen from tornado.iostream import IOStream @gen.coroutine def handle_connection(connection): connection.setblocking(False) stream = IOStream(connection) payload = None while payload != "break": payload_length = yield stream.read_bytes(4) payload_length = struct.unpack("!I", payload_length)[0] print "payload_length is:", payload_length payload = yield stream.read_bytes(payload_length) print "payload is:", payload yield stream.write("server is good") stream.close() def on_connection_ready(server_socket, fd, events): try: connection, addr = server_socket.accept() except socket.error as ex: if ex.args[0] not in [errno.EAGAIN, errno.EWOULDBLOCK]: raise return handle_connection(connection) def handle_signal(): signal.signal(signal.SIGINT, lambda sn, fo: IOLoop.current().stop()) def main(): handle_signal() server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(("0.0.0.0", 9099)) server_socket.setblocking(False) server_socket.listen(5) IOLoop.current().add_handler(server_socket.fileno(), functools.partial(on_connection_ready, server_socket), IOLoop.READ) IOLoop.current().start() if __name__ == "__main__": main()
client.py:
import socket import struct sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(("127.0.0.1", 9099)) for i in range(13): str = "hello world, %d." % i sock.send(struct.pack("!I", len(str))) sock.send(str) print sock.recv(1024) sock.send(struct.pack("!I", len("break"))) sock.send("break") sock.close()