tornado.iostream模块提供了向非阻塞文件或socket写数据 或 从它们读数据的工具类:
本文只解析BaseIOStream提供的read_bytes()接口。
源代码在:http://www.tornadoweb.org/en/stable/_modules/tornado/iostream.html#BaseIOStream。
下面是相关的主要代码:
class BaseIOStream(object):
def __init__(self, io_loop=None, max_buffer_size=None,
read_chunk_size=None, max_write_buffer_size=None):
self.io_loop = io_loop or ioloop.IOLoop.current()
self.max_buffer_size = max_buffer_size or 104857600
# 每次<fd>.read调用最多读取的字节数
# A chunk size that is too close to max_buffer_size can cause
# spurious failures.
self.read_chunk_size = min(read_chunk_size or 65536,
self.max_buffer_size // 2)
...
# 读缓冲区:读缓冲区中的数据分为已经被消费 + 尚未被消费的。
self._read_buffer = bytearray()
# + 读指针指向第一个尚未被消费的字节。
# + 随着缓冲区中的数据被消费,读指针会右移。
# + 当读指针大于缓冲区大小时,缓冲区会向右收缩,释放空间。
self._read_buffer_pos = 0
# 读缓冲区的大小(特指未被消费的那部分缓冲区的大小)
self._read_buffer_size = 0
...
# read_bytes()方法的第一个参数
self._read_bytes = None
...
# read callback
self._read_callback = None
# read future
self._read_future = None
...
# 关注的事件
self._state = None
...
# 异步的读取指定数量的字节。
# + 如果指定了callback,那么当读取到指定数量的数据之后,
# + + 会使用数据作为第一个参数调用这个回调函数;
# + 如果没有指定callback,则返回一个Future对象。
# 本文只解析streaming_callback、partial为默认值的情况。
def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
partial=False):
"""Asynchronously read a number of bytes.
If a ``streaming_callback`` is given, it will be called with chunks
of data as they become available, and the final result will be empty.
Otherwise, the result is all the data that was read.
If a callback is given, it will be run with the data as an argument;
if not, this method returns a `.Future`.
If ``partial`` is true, the callback is run as soon as we have
any bytes to return (but never more than ``num_bytes``)
.. versionchanged:: 4.0
Added the ``partial`` argument. The callback argument is now
optional and a `.Future` will be returned if it is omitted.
"""
future = self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._read_partial = partial
self._streaming_callback = stack_context.wrap(streaming_callback)
try:
self._try_inline_read()
except:
if future is not None:
future.add_done_callback(lambda f: f.exception())
raise
return future
# 如果callback为None,则返回一个Future对象,
# + 当读操作完成时,会将数据或异常信息填充到该对象中;
# 否则,将其设置为read callback,当读操作完成之后,会调用该回调函数。
def _set_read_callback(self, callback):
# 如果read callback 和 read future 都不为None,
# + 说明已经有一个读操作正在执行,抛出错误。
assert self._read_callback is None, "Already reading"
assert self._read_future is None, "Already reading"
if callback is not None:
self._read_callback = stack_context.wrap(callback)
else:
self._read_future = TracebackFuture()
return self._read_future
# 尝试从读缓冲区,完成当前的读操作。
# + 如果读操作能够被满足,则在下一次IOLoop迭代时,执行read callback;
# + 否则,将文件描述符添加到IOLoop上,并且关注其上的读操作。
def _try_inline_read(self):
"""Attempt to complete the current read operation from buffered data.
If the read can be completed without blocking, schedules the
read callback on the next IOLoop iteration; otherwise starts
listening for reads on the socket.
"""
# See if we've already got the data from a previous read
self._run_streaming_callback()
# 1,尝试从读缓冲区完成当前挂起的读操作:
pos = self._find_read_pos()
if pos is not None:
# 1.1,如果可以完成,那么从read buffer读取数据,
# + 之后,使用数据调用read callback,或填充read future
self._read_from_buffer(pos)
return
self._check_closed()
try:
# 2,尝试从fd上读取能够完成当前读操作的数据
pos = self._read_to_buffer_loop()
except Exception:
# If there was an in _read_to_buffer, we called close() already,
# but couldn't run the close callback because of _pending_callbacks.
# Before we escape from this function, run the close callback if
# applicable.
self._maybe_run_close_callback()
raise
# 2.1,如果能够从fd上读取到能完成当前读操作的数据,那么从read buffer读取数据
if pos is not None:
self._read_from_buffer(pos)
return
# We couldn't satisfy the read inline, so either close the stream
# or listen for new data.
# 3,否则将fd添加到IOLoop,并关注其上的读操作
if self.closed():
self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)
# 尝试从读缓冲区中找到满足
# + 当前挂起的读请求的位置,如果当前读请求能够被满足,则返回这个位置,否则返回None。
# + + 比如,当前的读请求是read_bytes(num_bytes, partial=False),那么当读缓冲区中有
# + + + 大于等于 num_bytes 个字节时,则返回num_bytes
# + + + 否则,返回None
def _find_read_pos(self):
"""Attempts to find a position in the read buffer that satisfies
the currently-pending read.
Returns a position in the buffer if the current read can be satisfied,
or None if it cannot.
"""
if (self._read_bytes is not None and
(self._read_buffer_size >= self._read_bytes or
(self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size)
return num_bytes
...
return None
# 从读缓冲区中,完成当前挂起的读请求。
def _read_from_buffer(self, pos):
"""Attempts to complete the currently-pending read from the buffer.
The argument is either a position in the read buffer or None,
as returned by _find_read_pos.
"""
self._read_bytes = self._read_delimiter = self._read_regex = None
self._read_partial = False
self._run_read_callback(pos, False)
def _run_read_callback(self, size, streaming):
...
else:
callback = self._read_callback
self._read_callback = self._streaming_callback = None
if self._read_future is not None:
assert callback is None
# 如果没有设置read callback,
# + 则将数据保存到read future
future = self._read_future
self._read_future = None
future.set_result(self._consume(size))
if callback is not None:
assert (self._read_future is None) or streaming
# 如果设置了read callback,
# + 那么则在下一次IOLoop迭代时,调度它
self._run_callback(callback, self._consume(size))
else:
# If we scheduled a callback, we will add the error listener
# afterwards. If we didn't, we have to do it now.
self._maybe_add_error_listener()
def _run_callback(self, callback, *args):
def wrapper():
self._pending_callbacks -= 1
try:
return callback(*args)
except Exception:
app_log.error("Uncaught exception, closing connection.",
exc_info=True)
# Close the socket on an uncaught exception from a user callback
# (It would eventually get closed when the socket object is
# gc'd, but we don't want to rely on gc happening before we
# run out of file descriptors)
self.close(exc_info=True)
# Re-raise the exception so that IOLoop.handle_callback_exception
# can see it and log the error
raise
finally:
self._maybe_add_error_listener()
# We schedule callbacks to be run on the next IOLoop iteration
# rather than running them directly for several reasons:
# * Prevents unbounded stack growth when a callback calls an
# IOLoop operation that immediately runs another callback
# * Provides a predictable execution context for e.g.
# non-reentrant mutexes
# * Ensures that the try/except in wrapper() is run outside
# of the application's StackContexts
with stack_context.NullContext():
# stack_context was already captured in callback, we don't need to
# capture it again for IOStream's wrapper. This is especially
# important if the callback was pre-wrapped before entry to
# IOStream (as in HTTPConnection._header_callback), as we could
# capture and leak the wrong context here.
self._pending_callbacks += 1
self.io_loop.add_callback(wrapper)
# 该方法用于检测:一个没有活跃读、写请求的连接是否被关闭,
# + 为此,必须监听读事件。然而,在连接刚建立的时候,执行这个操作是无用的,
# + 因为这种情况下,我们会立刻进行读写。在IOStream中,很多地方
# + 都插入了这个检查,当连接空闲时,那么就监听其上的读事件。
def _maybe_add_error_listener(self):
# This method is part of an optimization: to detect a connection that
# is closed when we're not actively reading or writing, we must listen
# for read events. However, it is inefficient to do this when the
# connection is first established because we are going to read or write
# immediately anyway. Instead, we insert checks at various times to
# see if the connection is idle and add the read listener then.
if self._pending_callbacks != 0:
return
if self._state is None or self._state == ioloop.IOLoop.ERROR:
if self.closed():
self._maybe_run_close_callback()
elif (self._read_buffer_size == 0 and
self._close_callback is not None):
self._add_io_state(ioloop.IOLoop.READ)
# 从read buffer上读取loc个字节,并返回。
def _consume(self, loc):
# Consume loc bytes from the read buffer and return them
if loc == 0:
return b""
assert loc <= self._read_buffer_size
# Slice the bytearray buffer into bytes, without intermediate copying
# 这里用到了memoryview,
# + memoryview为支持buffer protocol的对象,提供了基于字节的访问接口。
# + 使用memoryview不会发生内存拷贝。
b = (memoryview(self._read_buffer)
[self._read_buffer_pos:self._read_buffer_pos + loc]
).tobytes()
# 移动读指针 和 修改缓冲区大小
self._read_buffer_pos += loc
self._read_buffer_size -= loc
# Amortized O(1) shrink
# (this heuristic is implemented natively in Python 3.4+
# but is replicated here for Python 2)
# 当 读指针 大于 缓冲区大小 的时候,
# + 会对缓冲区进行收缩:
# + + 1,删除已经被消费内容的缓冲区
# + + 2,将读指针归零
if self._read_buffer_pos > self._read_buffer_size:
del self._read_buffer[:self._read_buffer_pos]
self._read_buffer_pos = 0
return b
# 向事件处理函数添加`state`
def _add_io_state(self, state):
"""Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
Implementation notes: Reads and writes have a fast path and a
slow path. The fast path reads synchronously from socket
buffers, while the slow path uses `_add_io_state` to schedule
an IOLoop callback. Note that in both cases, the callback is
run asynchronously with `_run_callback`.
To detect closed connections, we must have called
`_add_io_state` at some point, but we want to delay this as
much as possible so we don't have to set an `IOLoop.ERROR`
listener that will be overwritten by the next slow-path
operation. As long as there are callbacks scheduled for
fast-path ops, those callbacks may do more reads.
If a sequence of fast-path ops do not end in a slow-path op,
(e.g. for an @asynchronous long-poll request), we must add
the error handler. This is done in `_run_callback` and `write`
(since the write callback is optional so we can have a
fast-path write with no `_run_callback`)
"""
if self.closed():
# connection has been closed, so there can be no future events
return
if self._state is None:
self._state = ioloop.IOLoop.ERROR | state
with stack_context.NullContext():
self.io_loop.add_handler(
self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
# 尝试从fd上读取期望数量的字节,
# + 如果读取到的数据,能够满足当前的读操作,则返回位置;否则返回None。
def _read_to_buffer_loop(self):
# This method is called from _handle_read and _try_inline_read.
try:
if self._read_bytes is not None:
target_bytes = self._read_bytes
...
else:
target_bytes = 0
next_find_pos = 0
# Pretend to have a pending callback so that an EOF in
# _read_to_buffer doesn't trigger an immediate close
# callback. At the end of this method we'll either
# establish a real pending callback via
# _read_from_buffer or run the close callback.
#
# We need two try statements here so that
# pending_callbacks is decremented before the `except`
# clause below (which calls `close` and does need to
# trigger the callback)
self._pending_callbacks += 1
while not self.closed():
# Read from the socket until we get EWOULDBLOCK or equivalent.
# SSL sockets do some internal buffering, and if the data is
# sitting in the SSL object's buffer select() and friends
# can't see it; the only way to find out if it's there is to
# try to read it.
# 如果fd的输入缓冲区为空,则退出
if self._read_to_buffer() == 0:
break
self._run_streaming_callback()
# If we've read all the bytes we can use, break out of
# this loop. We can't just call read_from_buffer here
# because of subtle interactions with the
# pending_callback and error_listener mechanisms.
#
# If we've reached target_bytes, we know we're done.
if (target_bytes is not None and
self._read_buffer_size >= target_bytes):
break
# Otherwise, we need to call the more expensive find_read_pos.
# It's inefficient to do this on every read, so instead
# do it on the first read and whenever the read buffer
# size has doubled.
if self._read_buffer_size >= next_find_pos:
pos = self._find_read_pos()
if pos is not None:
return pos
next_find_pos = self._read_buffer_size * 2
return self._find_read_pos()
finally:
self._pending_callbacks -= 1
def _handle_events(self, fd, events):
if self.closed():
gen_log.warning("Got events for closed stream %s", fd)
return
try:
...
if events & self.io_loop.READ:
self._handle_read()
...
# 在处理完读写事件之后,
# + 会更改关注的事件。
# 如果当前没有读操作,那么则不再关注读事件,否则关注;
# 如果当前没有写操作,那么则不再关注写事件,否则关注;
# 如果当前连接空闲(也就是没有读写操作,并且读缓冲区为空),
# + 则关注读操作(目的是检测连接是否断开),否则不关注;
# 最后,更新关注的事件
state = self.io_loop.ERROR
if self.reading():
state |= self.io_loop.READ
if self.writing():
state |= self.io_loop.WRITE
if state == self.io_loop.ERROR and self._read_buffer_size == 0:
# If the connection is idle, listen for reads too so
# we can tell if the connection is closed. If there is
# data in the read buffer we won't run the close callback
# yet anyway, so we don't need to listen in this case.
state |= self.io_loop.READ
if state != self._state:
assert self._state is not None, \
"shouldn't happen: _handle_events without self._state"
self._state = state
self.io_loop.update_handler(self.fileno(), self._state)
except UnsatisfiableReadError as e:
gen_log.info("Unsatisfiable read, closing connection: %s" % e)
self.close(exc_info=True)
except Exception:
gen_log.error("Uncaught exception, closing connection.",
exc_info=True)
self.close(exc_info=True)
raise
def close(self, exc_info=False):
"""Close this stream.
If ``exc_info`` is true, set the ``error`` attribute to the current
exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
use that instead of `sys.exc_info`).
"""
if not self.closed():
if exc_info:
if not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
if any(exc_info):
self.error = exc_info[1]
if self._read_until_close:
if (self._streaming_callback is not None and
self._read_buffer_size):
self._run_read_callback(self._read_buffer_size, True)
self._read_until_close = False
self._run_read_callback(self._read_buffer_size, False)
if self._state is not None:
self.io_loop.remove_handler(self.fileno())
self._state = None
self.close_fd()
self._closed = True
self._maybe_run_close_callback()
def _maybe_run_close_callback(self):
# If there are pending callbacks, don't run the close callback
# until they're done (see _maybe_add_error_handler)
if self.closed() and self._pending_callbacks == 0:
futures = []
if self._read_future is not None:
futures.append(self._read_future)
self._read_future = None
futures += [future for _, future in self._write_futures]
self._write_futures.clear()
if self._connect_future is not None:
futures.append(self._connect_future)
self._connect_future = None
if self._ssl_connect_future is not None:
futures.append(self._ssl_connect_future)
self._ssl_connect_future = None
for future in futures:
future.set_exception(StreamClosedError(real_error=self.error))
if self._close_callback is not None:
cb = self._close_callback
self._close_callback = None
self._run_callback(cb)
# Delete any unfinished callbacks to break up reference cycles.
self._read_callback = self._write_callback = None
# Clear the buffers so they can be cleared immediately even
# if the IOStream object is kept alive by a reference cycle.
# TODO: Clear the read buffer too; it currently breaks some tests.
self._write_buffer = None
self._write_buffer_size = 0
def _handle_read(self):
try:
pos = self._read_to_buffer_loop()
except UnsatisfiableReadError:
raise
except Exception as e:
gen_log.warning("error on read: %s" % e)
self.close(exc_info=True)
return
if pos is not None:
self._read_from_buffer(pos)
return
else:
self._maybe_run_close_callback()
# 从fd上读取数据,并追加到读缓冲区的末尾。
# + 该方法返回实际读取的自节数。如果没有读到任何数据,则返回0。
# + 出错时,关闭fd,并抛出异常。
def _read_to_buffer(self):
"""Reads from the socket and appends the result to the read buffer.
Returns the number of bytes read. Returns 0 if there is nothing
to read (i.e. the read returns EWOULDBLOCK or equivalent). On
error closes the socket and raises an exception.
"""
while True:
try:
chunk = self.read_from_fd()
except (socket.error, IOError, OSError) as e:
if errno_from_exception(e) == errno.EINTR:
continue
# ssl.SSLError is a subclass of socket.error
if self._is_connreset(e):
# Treat ECONNRESET as a connection close rather than
# an error to minimize log spam (the exception will
# be available on self.error for apps that care).
self.close(exc_info=True)
return
self.close(exc_info=True)
raise
break
if chunk is None:
return 0
self._read_buffer += chunk
self._read_buffer_size += len(chunk)
if self._read_buffer_size > self.max_buffer_size:
gen_log.error("Reached maximum read buffer size")
self.close()
raise StreamBufferFullError("Reached maximum read buffer size")
return len(chunk)
这个类中的方法,比较散乱,下面总结一下:
read_bytes(num_bytes)的大致执行流程是:
server.py:
import socket
import errno
import functools
import struct
import signal
from tornado.ioloop import IOLoop
import tornado.gen as gen
from tornado.iostream import IOStream
@gen.coroutine
def handle_connection(connection):
connection.setblocking(False)
stream = IOStream(connection)
payload = None
while payload != "break":
payload_length = yield stream.read_bytes(4)
payload_length = struct.unpack("!I", payload_length)[0]
print "payload_length is:", payload_length
payload = yield stream.read_bytes(payload_length)
print "payload is:", payload
yield stream.write("server is good")
stream.close()
def on_connection_ready(server_socket, fd, events):
try:
connection, addr = server_socket.accept()
except socket.error as ex:
if ex.args[0] not in [errno.EAGAIN, errno.EWOULDBLOCK]:
raise
return
handle_connection(connection)
def handle_signal():
signal.signal(signal.SIGINT, lambda sn, fo: IOLoop.current().stop())
def main():
handle_signal()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("0.0.0.0", 9099))
server_socket.setblocking(False)
server_socket.listen(5)
IOLoop.current().add_handler(server_socket.fileno(),
functools.partial(on_connection_ready, server_socket),
IOLoop.READ)
IOLoop.current().start()
if __name__ == "__main__":
main()
client.py:
import socket
import struct
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1", 9099))
for i in range(13):
str = "hello world, %d." % i
sock.send(struct.pack("!I", len(str)))
sock.send(str)
print sock.recv(1024)
sock.send(struct.pack("!I", len("break")))
sock.send("break")
sock.close()