下面是用于测试的多线程TCP服务器(比较简易,未处理粘包问题),其并发模型是:每个连接由一个单独的线程处理。该程序接受两个命令行参数:要监听的IP地址、端口:
xxxxxxxxxx
# threaded_server.py
import sys
import socket
import threading
import logging
LOGGER = logging.getLogger(__name__)
class ThreadedServer(object):
def __init__(self, handler, address, port, backlog=5):
self._handler = handler
self._address = address
self._port = port
self._backlog = backlog
def run(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server_socket.bind((self._address, self._port))
server_socket.listen(self._backlog)
LOGGER.info("listening on %s:%d" % (self._address, self._port))
try:
while True:
client_connection, client_address = server_socket.accept()
LOGGER.info("accept connection from %s" % (client_address, ))
t = threading.Thread(target=self._handler,
args=(client_connection, client_address))
t.setDaemon(True)
t.start()
finally:
server_socket.close()
def handler(conn, addr):
try:
while True:
data = conn.recv(1024)
if data == "":
break
LOGGER.info("receive %s from %s" % (data, addr))
conn.sendall("you said: %s" % data)
finally:
conn.close()
LOGGER.info("disconnect from %s" % (addr, ))
if __name__ == "__main__":
if len(sys.argv) < 3:
raise RuntimeError("missing server address and/or port")
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S')
address = sys.argv[1]
port = int(sys.argv[2])
server = ThreadedServer(handler, address, port)
server.run()
下面是用于向测试服务器发送请求的测试客户端,该程序需要四个命令行参数:客户端IP、客户端端口、服务端IP、服务端端口:
xxxxxxxxxx
# client.py
import sys
import socket
import time
def request(client_address, client_port, server_address, server_port):
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client_sock.bind((client_address, client_port))
client_sock.connect((server_address, server_port))
try:
for _ in range(100):
client_sock.sendall("I am client")
data = client_sock.recv(1024)
print("receive: %s" % data)
time.sleep(1)
finally:
print("close client socket")
client_sock.close()
if __name__ == "__main__":
if len(sys.argv) < 5:
raise RuntimeError("missing addresses or ports")
client_address = sys.argv[1]
client_port = int(sys.argv[2])
server_address = sys.argv[3]
server_port = int(sys.argv[4])
request(client_address, client_port, server_address, server_port)
Linux manual中的描述如下:
xxxxxxxxxx
SO_REUSEADDR
Indicates that the rules used in validating addresses supplied
in a bind(2) call should allow reuse of local addresses. For
AF_INET sockets this means that a socket may bind, except when
there is an active listening socket bound to the address.
When the listening socket is bound to INADDR_ANY with a spe‐
cific port then it is not possible to bind to this port for
any local address. Argument is an integer boolean flag.
翻译成中文如下:
xSO_REUSEADDR
表明:bind(2)调用提供的、用于验证地址的规则允许重用本地地址。
对于AF_INET套接字,设置该选项意味着:对于一个地址,除了存在被绑定到它的监听套接字的情况,
任何套接字都可以绑定它。如果使用某个端口将某个监听套接字绑定到INADDR_ANY,
那么对于任何本地地址,不能将任何套接字绑定到该端口。参数是一个整型布尔标记。
也就是:
在没设置SO_REUSEADDR选项的情况下,不同的socket不能绑定相同的地址。
在设置SO_REUSEADDR选项的情况下,对于一个地址,一旦存在绑定它的监听socket,那么其它socket不能再绑定它;否则,可以绑定。
并且,INADDR_ANY(表示所有本地地址)与所有本地地址冲突。
下面使用[准备工作]中的两个测试程序,进行验证:
启动两个服务器,分别监听127.0.0.1:2345和127.0.0.1:2346:
打开两个终端,分别执行:
xxxxxxxxxx
python threaded_server.py 127.0.0.1 2345
和
xxxxxxxxxx
python threaded_server.py 127.0.0.1 2346
启动两个客户端,都绑定127.0.0.1:3456,分别连接到127.0.0.1:2345和127.0.0.1:2346:
打开两个终端,分别执行:
xxxxxxxxxx
python client.py 127.0.0.1 3456 127.0.0.1 2345
和
xxxxxxxxxx
python client.py 127.0.0.1 3456 127.0.0.1 2346
执行第二个命令时,会出现类似下面的错误:
xxxxxxxxxx
Traceback (most recent call last):
File "client.py", line 30, in <module>
request(client_address, client_port, server_address, server_port)
File "client.py", line 8, in request
client_sock.bind((client_address, client_port))
File "/usr/lib64/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
socket.error: [Errno 98] Address already in use
这次实验验证了没设置SO_REUSEADDR选项的情况
将client.py
文件中的request()
函数的第二行取消注释,也就是给client socket设置SO_REUSEADDR选项。然后按照步骤2,重新执行测试
通过netstat命令,也可以看出,两个客户端socket绑定了相同的地址:
xxxxxxxxxx
netstat -nat | grep 127.0.0.1
tcp 0 0 127.0.0.1:2345 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:2346 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:3456 127.0.0.1:2346 ESTABLISHED
tcp 0 0 127.0.0.1:3456 127.0.0.1:2345 ESTABLISHED
tcp 0 0 127.0.0.1:2345 127.0.0.1:3456 ESTABLISHED
tcp 0 0 127.0.0.1:2346 127.0.0.1:3456 ESTABLISHED
这次实验验证了设置SO_REUSEADDR选项的情况
需要特别强调的是:
Linux manual中的描述如下:
xSO_REUSEPORT (since Linux 3.9)
Permits multiple AF_INET or AF_INET6 sockets to be bound to an
identical socket address. This option must be set on each
socket (including the first socket) prior to calling bind(2)
on the socket. To prevent port hijacking, all of the pro‐
cesses binding to the same address must have the same effec‐
tive UID. This option can be employed with both TCP and UDP
sockets.
For TCP sockets, this option allows accept(2) load distribu‐
tion in a multi-threaded server to be improved by using a dis‐
tinct listener socket for each thread. This provides improved
load distribution as compared to traditional techniques such
using a single accept(2)ing thread that distributes connec‐
tions, or having multiple threads that compete to accept(2)
from the same socket.
For UDP sockets, the use of this option can provide better
distribution of incoming datagrams to multiple processes (or
threads) as compared to the traditional technique of having
multiple processes compete to receive datagrams on the same
socket.
翻译成中文如下:
xxxxxxxxxx
SO_REUSEPORT (since Linux 3.9)
该允许将多个AF_INET或AF_INET6套接字绑定到相同的地址上。必须在调用bind(2)之前,
在每个套接字(包括第一个套接字)上设置该选项。为了防止端口劫持,绑定到相同地址的
所有进程必须拥有相同的UID。该选项既能被应用于TCP套接字,也能被应用于UDP套接字。
对于TCP套接字,在多线程服务器中,该选项允许通过每个线程使用不同的监听套接字的方式,
改善accept(2)负载分布。与诸如:使用单个accept(2)线程分发连接、多个线程在相同的
套接字上竞争accept(2)之类的传统技术相比,该选项有助于改善负载分布。
对于UDP套接字,与多个进程在相同的套接字上竞争接收数据报这种传统技术相比,
使用该选项可以更好地将接收到的数据报分发给多个进程(或线程)。
需要特别强调的是:
下面使用[准备工作]中的两个测试程序,进行验证:
将threaded_server.py
文件中的ThreadedServer
类的run()
方法的第二行取消注释,也就是为server socket设置SO_REUSEPORT选项。然后启动两个服务器,都监听127.0.0.1:2345:
打开两个终端,都执行:
xxxxxxxxxx
python threaded_server.py 127.0.0.1 2345
启动若干个客户端,绑定127.0.0.1:0(绑定0端口时,操作系统会随机分配一个空闲端口),并连接到127.0.0.1:2345:
打开若干个终端,都执行:
xxxxxxxxxx
python client.py 127.0.0.1 0 127.0.0.1 2345
本文中的代码在以下环境下测试通过: