目录


概述[TOC]

tornado.iostream模块提供了向非阻塞文件或socket写数据 或 从它们读数据的工具类:

本文只解析BaseIOStream提供的read_bytes()接口。


BaseIOStream的read_bytes接口[TOC]

源代码在:http://www.tornadoweb.org/en/stable/_modules/tornado/iostream.html#BaseIOStream
下面是相关的主要代码:

class BaseIOStream(object):
    def __init__(self, io_loop=None, max_buffer_size=None,
                 read_chunk_size=None, max_write_buffer_size=None):
        self.io_loop = io_loop or ioloop.IOLoop.current()

        self.max_buffer_size = max_buffer_size or 104857600

        # 每次<fd>.read调用最多读取的字节数
        # A chunk size that is too close to max_buffer_size can cause
        # spurious failures.
        self.read_chunk_size = min(read_chunk_size or 65536,
                                   self.max_buffer_size // 2)
        ...

        # 读缓冲区:读缓冲区中的数据分为已经被消费 + 尚未被消费的。
        self._read_buffer = bytearray()
        # + 读指针指向第一个尚未被消费的字节。
        # + 随着缓冲区中的数据被消费,读指针会右移。
        # + 当读指针大于缓冲区大小时,缓冲区会向右收缩,释放空间。
        self._read_buffer_pos = 0
        # 读缓冲区的大小(特指未被消费的那部分缓冲区的大小)
        self._read_buffer_size = 0
        ...

        # read_bytes()方法的第一个参数
        self._read_bytes = None
        ...

        # read callback
        self._read_callback = None
        # read future
        self._read_future = None
        ...

        # 关注的事件
        self._state = None
        ...

    # 异步的读取指定数量的字节。
    # + 如果指定了callback,那么当读取到指定数量的数据之后,
    # + + 会使用数据作为第一个参数调用这个回调函数;
    # + 如果没有指定callback,则返回一个Future对象。
    # 本文只解析streaming_callbackpartial为默认值的情况。
    def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
                   partial=False):
        """Asynchronously read a number of bytes.

        If a ``streaming_callback`` is given, it will be called with chunks
        of data as they become available, and the final result will be empty.
        Otherwise, the result is all the data that was read.
        If a callback is given, it will be run with the data as an argument;
        if not, this method returns a `.Future`.

        If ``partial`` is true, the callback is run as soon as we have
        any bytes to return (but never more than ``num_bytes``)

        .. versionchanged:: 4.0
            Added the ``partial`` argument.  The callback argument is now
            optional and a `.Future` will be returned if it is omitted.
        """
        future = self._set_read_callback(callback)

        assert isinstance(num_bytes, numbers.Integral)
        self._read_bytes = num_bytes
        self._read_partial = partial
        self._streaming_callback = stack_context.wrap(streaming_callback)

        try:
            self._try_inline_read()
        except:
            if future is not None:
                future.add_done_callback(lambda f: f.exception())
            raise
        return future

    # 如果callback为None,则返回一个Future对象,
    # + 当读操作完成时,会将数据或异常信息填充到该对象中;
    # 否则,将其设置为read callback,当读操作完成之后,会调用该回调函数。
    def _set_read_callback(self, callback):
        # 如果read callback 和 read future 都不为None,
        # + 说明已经有一个读操作正在执行,抛出错误。
        assert self._read_callback is None, "Already reading"
        assert self._read_future is None, "Already reading"

        if callback is not None:
            self._read_callback = stack_context.wrap(callback)
        else:
            self._read_future = TracebackFuture()
        return self._read_future

    # 尝试从读缓冲区,完成当前的读操作。
    # + 如果读操作能够被满足,则在下一次IOLoop迭代时,执行read callback;
    # + 否则,将文件描述符添加到IOLoop上,并且关注其上的读操作。
    def _try_inline_read(self):
        """Attempt to complete the current read operation from buffered data.

        If the read can be completed without blocking, schedules the
        read callback on the next IOLoop iteration; otherwise starts
        listening for reads on the socket.
        """
        # See if we've already got the data from a previous read
        self._run_streaming_callback()

        # 1,尝试从读缓冲区完成当前挂起的读操作:
        pos = self._find_read_pos()
        if pos is not None:
            # 1.1,如果可以完成,那么从read buffer读取数据,
            # + 之后,使用数据调用read callback,或填充read future
            self._read_from_buffer(pos)
            return

        self._check_closed()
        try:
            # 2,尝试从fd上读取能够完成当前读操作的数据
            pos = self._read_to_buffer_loop()
        except Exception:
            # If there was an in _read_to_buffer, we called close() already,
            # but couldn't run the close callback because of _pending_callbacks.
            # Before we escape from this function, run the close callback if
            # applicable.
            self._maybe_run_close_callback()
            raise
        # 2.1,如果能够从fd上读取到能完成当前读操作的数据,那么从read buffer读取数据
        if pos is not None:
            self._read_from_buffer(pos)
            return
        # We couldn't satisfy the read inline, so either close the stream
        # or listen for new data.
        # 3,否则将fd添加到IOLoop,并关注其上的读操作
        if self.closed():
            self._maybe_run_close_callback()
        else:
            self._add_io_state(ioloop.IOLoop.READ)

    # 尝试从读缓冲区中找到满足
    # + 当前挂起的读请求的位置,如果当前读请求能够被满足,则返回这个位置,否则返回None。
    # + + 比如,当前的读请求是read_bytes(num_bytes, partial=False),那么当读缓冲区中有 
    # + + + 大于等于 num_bytes 个字节时,则返回num_bytes
    # + + + 否则,返回None
    def _find_read_pos(self):
        """Attempts to find a position in the read buffer that satisfies
        the currently-pending read.

        Returns a position in the buffer if the current read can be satisfied,
        or None if it cannot.
        """


        if (self._read_bytes is not None and
            (self._read_buffer_size >= self._read_bytes or
             (self._read_partial and self._read_buffer_size > 0))):
            num_bytes = min(self._read_bytes, self._read_buffer_size)
            return num_bytes
        ...
        return None

    # 从读缓冲区中,完成当前挂起的读请求。
    def _read_from_buffer(self, pos):
        """Attempts to complete the currently-pending read from the buffer.

        The argument is either a position in the read buffer or None,
        as returned by _find_read_pos.
        """
        self._read_bytes = self._read_delimiter = self._read_regex = None
        self._read_partial = False
        self._run_read_callback(pos, False)

    def _run_read_callback(self, size, streaming):
        ...
        else:
            callback = self._read_callback
            self._read_callback = self._streaming_callback = None
            if self._read_future is not None:
                assert callback is None
                # 如果没有设置read callback,
                # + 则将数据保存到read future
                future = self._read_future
                self._read_future = None
                future.set_result(self._consume(size))
        if callback is not None:
            assert (self._read_future is None) or streaming
            # 如果设置了read callback,
            # + 那么则在下一次IOLoop迭代时,调度它
            self._run_callback(callback, self._consume(size))
        else:
            # If we scheduled a callback, we will add the error listener
            # afterwards.  If we didn't, we have to do it now.
            self._maybe_add_error_listener()

    def _run_callback(self, callback, *args):
        def wrapper():
            self._pending_callbacks -= 1
            try:
                return callback(*args)
            except Exception:
                app_log.error("Uncaught exception, closing connection.",
                              exc_info=True)
                # Close the socket on an uncaught exception from a user callback
                # (It would eventually get closed when the socket object is
                # gc'd, but we don't want to rely on gc happening before we
                # run out of file descriptors)
                self.close(exc_info=True)
                # Re-raise the exception so that IOLoop.handle_callback_exception
                # can see it and log the error
                raise
            finally:
                self._maybe_add_error_listener()
        # We schedule callbacks to be run on the next IOLoop iteration
        # rather than running them directly for several reasons:
        # * Prevents unbounded stack growth when a callback calls an
        #   IOLoop operation that immediately runs another callback
        # * Provides a predictable execution context for e.g.
        #   non-reentrant mutexes
        # * Ensures that the try/except in wrapper() is run outside
        #   of the application's StackContexts
        with stack_context.NullContext():
            # stack_context was already captured in callback, we don't need to
            # capture it again for IOStream's wrapper.  This is especially
            # important if the callback was pre-wrapped before entry to
            # IOStream (as in HTTPConnection._header_callback), as we could
            # capture and leak the wrong context here.
            self._pending_callbacks += 1
            self.io_loop.add_callback(wrapper)

    # 该方法用于检测:一个没有活跃读、写请求的连接是否被关闭,
    # + 为此,必须监听读事件。然而,在连接刚建立的时候,执行这个操作是无用的,
    # + 因为这种情况下,我们会立刻进行读写。在IOStream中,很多地方
    # + 都插入了这个检查,当连接空闲时,那么就监听其上的读事件。
    def _maybe_add_error_listener(self):
        # This method is part of an optimization: to detect a connection that
        # is closed when we're not actively reading or writing, we must listen
        # for read events.  However, it is inefficient to do this when the
        # connection is first established because we are going to read or write
        # immediately anyway.  Instead, we insert checks at various times to
        # see if the connection is idle and add the read listener then.
        if self._pending_callbacks != 0:
            return
        if self._state is None or self._state == ioloop.IOLoop.ERROR:
            if self.closed():
                self._maybe_run_close_callback()
            elif (self._read_buffer_size == 0 and
                  self._close_callback is not None):
                self._add_io_state(ioloop.IOLoop.READ)

    # 从read buffer上读取loc个字节,并返回。
    def _consume(self, loc):
        # Consume loc bytes from the read buffer and return them
        if loc == 0:
            return b""
        assert loc <= self._read_buffer_size

        # Slice the bytearray buffer into bytes, without intermediate copying
        # 这里用到了memoryview,
        # + memoryview为支持buffer protocol的对象,提供了基于字节的访问接口。
        # + 使用memoryview不会发生内存拷贝。
        b = (memoryview(self._read_buffer)
             [self._read_buffer_pos:self._read_buffer_pos + loc]
             ).tobytes()
        # 移动读指针 和 修改缓冲区大小
        self._read_buffer_pos += loc
        self._read_buffer_size -= loc

        # Amortized O(1) shrink
        # (this heuristic is implemented natively in Python 3.4+
        #  but is replicated here for Python 2)
        # 当 读指针 大于 缓冲区大小 的时候,
        # + 会对缓冲区进行收缩:
        # + + 1,删除已经被消费内容的缓冲区
        # + + 2,将读指针归零
        if self._read_buffer_pos > self._read_buffer_size:
            del self._read_buffer[:self._read_buffer_pos]
            self._read_buffer_pos = 0
        return b

    # 向事件处理函数添加`state`
    def _add_io_state(self, state):
        """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.

        Implementation notes: Reads and writes have a fast path and a
        slow path.  The fast path reads synchronously from socket
        buffers, while the slow path uses `_add_io_state` to schedule
        an IOLoop callback.  Note that in both cases, the callback is
        run asynchronously with `_run_callback`.

        To detect closed connections, we must have called
        `_add_io_state` at some point, but we want to delay this as
        much as possible so we don't have to set an `IOLoop.ERROR`
        listener that will be overwritten by the next slow-path
        operation.  As long as there are callbacks scheduled for
        fast-path ops, those callbacks may do more reads.
        If a sequence of fast-path ops do not end in a slow-path op,
        (e.g. for an @asynchronous long-poll request), we must add
        the error handler.  This is done in `_run_callback` and `write`
        (since the write callback is optional so we can have a
        fast-path write with no `_run_callback`)
        """
        if self.closed():
            # connection has been closed, so there can be no future events
            return
        if self._state is None:
            self._state = ioloop.IOLoop.ERROR | state
            with stack_context.NullContext():
                self.io_loop.add_handler(
                    self.fileno(), self._handle_events, self._state)
        elif not self._state & state:
            self._state = self._state | state
            self.io_loop.update_handler(self.fileno(), self._state)

    # 尝试从fd上读取期望数量的字节,
    # + 如果读取到的数据,能够满足当前的读操作,则返回位置;否则返回None。
    def _read_to_buffer_loop(self):
        # This method is called from _handle_read and _try_inline_read.
        try:
            if self._read_bytes is not None:
                target_bytes = self._read_bytes
            ...
            else:
                target_bytes = 0
            next_find_pos = 0
            # Pretend to have a pending callback so that an EOF in
            # _read_to_buffer doesn't trigger an immediate close
            # callback.  At the end of this method we'll either
            # establish a real pending callback via
            # _read_from_buffer or run the close callback.
            #
            # We need two try statements here so that
            # pending_callbacks is decremented before the `except`
            # clause below (which calls `close` and does need to
            # trigger the callback)
            self._pending_callbacks += 1
            while not self.closed():
                # Read from the socket until we get EWOULDBLOCK or equivalent.
                # SSL sockets do some internal buffering, and if the data is
                # sitting in the SSL object's buffer select() and friends
                # can't see it; the only way to find out if it's there is to
                # try to read it.
                # 如果fd的输入缓冲区为空,则退出
                if self._read_to_buffer() == 0:
                    break

                self._run_streaming_callback()

                # If we've read all the bytes we can use, break out of
                # this loop.  We can't just call read_from_buffer here
                # because of subtle interactions with the
                # pending_callback and error_listener mechanisms.
                #
                # If we've reached target_bytes, we know we're done.
                if (target_bytes is not None and
                        self._read_buffer_size >= target_bytes):
                    break

                # Otherwise, we need to call the more expensive find_read_pos.
                # It's inefficient to do this on every read, so instead
                # do it on the first read and whenever the read buffer
                # size has doubled.
                if self._read_buffer_size >= next_find_pos:
                    pos = self._find_read_pos()
                    if pos is not None:
                        return pos
                    next_find_pos = self._read_buffer_size * 2
            return self._find_read_pos()
        finally:
            self._pending_callbacks -= 1

    def _handle_events(self, fd, events):
        if self.closed():
            gen_log.warning("Got events for closed stream %s", fd)
            return
        try:
            ...
            if events & self.io_loop.READ:
                self._handle_read()
            ...

            # 在处理完读写事件之后,
            # + 会更改关注的事件。
            # 如果当前没有读操作,那么则不再关注读事件,否则关注;
            # 如果当前没有写操作,那么则不再关注写事件,否则关注;
            # 如果当前连接空闲(也就是没有读写操作,并且读缓冲区为空),
            # + 则关注读操作(目的是检测连接是否断开),否则不关注;
            # 最后,更新关注的事件
            state = self.io_loop.ERROR
            if self.reading():
                state |= self.io_loop.READ
            if self.writing():
                state |= self.io_loop.WRITE
            if state == self.io_loop.ERROR and self._read_buffer_size == 0:
                # If the connection is idle, listen for reads too so
                # we can tell if the connection is closed.  If there is
                # data in the read buffer we won't run the close callback
                # yet anyway, so we don't need to listen in this case.
                state |= self.io_loop.READ
            if state != self._state:
                assert self._state is not None, \
                    "shouldn't happen: _handle_events without self._state"
                self._state = state
                self.io_loop.update_handler(self.fileno(), self._state)

        except UnsatisfiableReadError as e:
            gen_log.info("Unsatisfiable read, closing connection: %s" % e)
            self.close(exc_info=True)
        except Exception:
            gen_log.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close(exc_info=True)
            raise

    def close(self, exc_info=False):
        """Close this stream.

        If ``exc_info`` is true, set the ``error`` attribute to the current
        exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
        use that instead of `sys.exc_info`).
        """
        if not self.closed():
            if exc_info:
                if not isinstance(exc_info, tuple):
                    exc_info = sys.exc_info()
                if any(exc_info):
                    self.error = exc_info[1]
            if self._read_until_close:
                if (self._streaming_callback is not None and
                        self._read_buffer_size):
                    self._run_read_callback(self._read_buffer_size, True)
                self._read_until_close = False
                self._run_read_callback(self._read_buffer_size, False)
            if self._state is not None:
                self.io_loop.remove_handler(self.fileno())
                self._state = None
            self.close_fd()
            self._closed = True
        self._maybe_run_close_callback()

    def _maybe_run_close_callback(self):
        # If there are pending callbacks, don't run the close callback
        # until they're done (see _maybe_add_error_handler)
        if self.closed() and self._pending_callbacks == 0:
            futures = []
            if self._read_future is not None:
                futures.append(self._read_future)
                self._read_future = None
            futures += [future for _, future in self._write_futures]
            self._write_futures.clear()
            if self._connect_future is not None:
                futures.append(self._connect_future)
                self._connect_future = None
            if self._ssl_connect_future is not None:
                futures.append(self._ssl_connect_future)
                self._ssl_connect_future = None
            for future in futures:
                future.set_exception(StreamClosedError(real_error=self.error))
            if self._close_callback is not None:
                cb = self._close_callback
                self._close_callback = None
                self._run_callback(cb)
            # Delete any unfinished callbacks to break up reference cycles.
            self._read_callback = self._write_callback = None
            # Clear the buffers so they can be cleared immediately even
            # if the IOStream object is kept alive by a reference cycle.
            # TODO: Clear the read buffer too; it currently breaks some tests.
            self._write_buffer = None
            self._write_buffer_size = 0

    def _handle_read(self):
        try:
            pos = self._read_to_buffer_loop()
        except UnsatisfiableReadError:
            raise
        except Exception as e:
            gen_log.warning("error on read: %s" % e)
            self.close(exc_info=True)
            return
        if pos is not None:
            self._read_from_buffer(pos)
            return
        else:
            self._maybe_run_close_callback()

    # 从fd上读取数据,并追加到读缓冲区的末尾。
    # + 该方法返回实际读取的自节数。如果没有读到任何数据,则返回0。
    # + 出错时,关闭fd,并抛出异常。
    def _read_to_buffer(self):
        """Reads from the socket and appends the result to the read buffer.

        Returns the number of bytes read.  Returns 0 if there is nothing
        to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
        error closes the socket and raises an exception.
        """
        while True:
            try:
                chunk = self.read_from_fd()
            except (socket.error, IOError, OSError) as e:
                if errno_from_exception(e) == errno.EINTR:
                    continue
                # ssl.SSLError is a subclass of socket.error
                if self._is_connreset(e):
                    # Treat ECONNRESET as a connection close rather than
                    # an error to minimize log spam  (the exception will
                    # be available on self.error for apps that care).
                    self.close(exc_info=True)
                    return
                self.close(exc_info=True)
                raise
            break
        if chunk is None:
            return 0
        self._read_buffer += chunk
        self._read_buffer_size += len(chunk)
        if self._read_buffer_size > self.max_buffer_size:
            gen_log.error("Reached maximum read buffer size")
            self.close()
            raise StreamBufferFullError("Reached maximum read buffer size")
        return len(chunk)

这个类中的方法,比较散乱,下面总结一下:
read_bytes(num_bytes)的大致执行流程是:

  1. 如果read buffer有num_bytes个字节,goto 步骤4
  2. fd的缓冲区读数据到read buffer,一直到:
  3. fd注册到IOLoop,并关注其上的读事件,每次IOLoop迭代的时候,如果fd上有读事件发生,则执行步骤2
  4. 从read buffer上消费num_bytes个字节,

例子[TOC]

server.py:

import socket
import errno
import functools
import struct
import signal

from tornado.ioloop import IOLoop
import tornado.gen as gen
from tornado.iostream import IOStream

@gen.coroutine
def handle_connection(connection):
    connection.setblocking(False)
    stream = IOStream(connection)
    payload = None

    while payload != "break":
        payload_length = yield stream.read_bytes(4)
        payload_length = struct.unpack("!I", payload_length)[0]
        print "payload_length is:", payload_length
        payload = yield stream.read_bytes(payload_length)
        print "payload is:", payload
        yield stream.write("server is good")
    stream.close()

def on_connection_ready(server_socket, fd, events):
    try:
        connection, addr = server_socket.accept()
    except socket.error as ex:
        if ex.args[0] not in [errno.EAGAIN, errno.EWOULDBLOCK]:
            raise
        return

    handle_connection(connection)


def handle_signal():
    signal.signal(signal.SIGINT, lambda sn, fo: IOLoop.current().stop())

def main():
    handle_signal()

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(("0.0.0.0", 9099))
    server_socket.setblocking(False)
    server_socket.listen(5)

    IOLoop.current().add_handler(server_socket.fileno(),
        functools.partial(on_connection_ready, server_socket),
        IOLoop.READ)
    IOLoop.current().start()

if __name__ == "__main__":
    main()

client.py:

import socket
import struct

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1", 9099))

for i in range(13):
    str = "hello world, %d." % i
    sock.send(struct.pack("!I", len(str)))
    sock.send(str)
    print sock.recv(1024)
sock.send(struct.pack("!I", len("break")))
sock.send("break")
sock.close()