Python SocketServer 源码阅读

Table of content:


基本的 Socket 编程

Examples

Socket Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import socket
import sys


def test_server():
# create a socket object
s = socket.socket()
print "Socket successfully created"

# reserve a port on your computer in our
# case it is 12345 but it can be anything
port = 12345

# Next bind to the port
# we have not typed any ip in the ip field
# instead we have inputted an empty string
# this makes the server listen to requests
# coming from other computers on the network
s.bind(('', port))
print "socket binded to %s" % (port)

# put the socket into listening mode
s.listen(5)
print "socket is listening"

# a forever loop until we interrupt it or
# an error occurs
while True:
# Establish connection with client.
c, addr = s.accept()
print 'Got connection from', addr

# send a thank you message to the client.
c.send('Thank you for connecting')
# Close the connection with the client
c.close()

test_server()
1
2
3
4
5
Socket successfully created
socket binded to 12345
socket is listening

Got connection from ('127.0.0.1', 57326)

对于 Server:

  • 创建一个 socket, socket.socket()
  • bind 端口和地址, s.bind((‘’, port))
  • listen 端口监听 s.listen(5)
  • accept 接受连接
  • recv, send 接收和发送数据
  • close 关闭连接

Socket Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket

def test_client():
s = socket.socket()
# Define the port on which you want to connect
port = 12345
# connect to the server on local computer
s.connect(('127.0.0.1', port))
# receive data from the server
print s.recv(1024)
# close the connection
s.close()

test_client()
1
Thank you for connecting

对于 Client:

  • 创建一个 socket
  • connect 与服务器建立连接
  • recv, send 接收和发送数据
  • close 关闭连接

中间的过程和细节

建立连接

对于一个客户端程序,建立 socket 需要两个步骤:

  • 建立一个实际的 socket
  • 连接到远端的服务器上

建立 socket 的时候需要告诉系统两件事情:

  • 通信类型, 即用什么协议来传输数据,比如 IPv4, IPv6, AFP(Apple 文件共享)。大部分时候是 Internet 通信,所以通信类型是 AF_INET
  • 协议家族, 一般是 TCP 通信的 SOCKET_STREAM 或者 UDP 通信的 SOCKET_DGRAM

连接 socket,需要提供一个 tuple, 包含远程主机 IP 和远程端口

以下是个完整的例子:

1
2
3
4
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('www.google.com', 80))

UDP 例子:

1
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

自动发现端口以及从 socket 获取信息

1
2
3
4
5
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port = socket.getservbyname('http', 'tcp')
s.connect(('www.google.com', port))
print s.getsockname() # 自己的 IP 和端口
print s.getpeername() # 远程机器的 IP 和端口

Socket Option

对于一个 socket,可以设置很多不同的选项,一个比较有趣的 option 是 SO_REUSEADDR。通常在一个服务器进程终止后,操作系统会保留几分钟它的端口,从而防止其他进程在超时之前占用这个端口。如果 SO_REUSEADDR 设置为 true,操作系统会在服务器 socket 被关闭或者服务器进程结束后马上释放该服务器的端口,这样做可以使调试程序更简单。

1
s.getsocketopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

基于 SocketServer 编程

SocketServer 是对底层的 Socket 进行封装,简化网络服务器编写工作的 Python 标准库。

源码地址:https://hg.python.org/cpython/file/2.7/Lib/SocketServer.py
文档地址:https://docs.python.org/2/library/socketserver.html

Examples

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import SocketServer

class MyTCPHandler(SocketServer.BaseRequestHandler):
"""
The request handler class for our server.

It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""

def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print "{} wrote:".format(self.client_address[0])
print self.data
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())


if __name__ == "__main__":
HOST, PORT = "localhost", 9999

# Create the server, binding to localhost on port 9999
server = SocketServer.TCPServer((HOST, PORT), MyTCPHandler)

# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()

SocketServer.BaseRequestHandler 会先 setup,handle,最后执行 finish

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BaseRequestHandler:

"""Base class for request handler classes.

This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.

The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define arbitrary other instance variariables.
"""

def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()

这样下来,只需要:

  • 实现一个类,继承 SocketServer.BaseRequestHandler
  • 重写 handle,加上自己的逻辑

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import socket
import sys

HOST, PORT = "localhost", 9999
data = " ".join(sys.argv[1:])

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
# Connect to server and send data
sock.connect((HOST, PORT))
sock.sendall(data + "\n")

# Receive data from the server and shut down
received = sock.recv(1024)
finally:
sock.close()

print "Sent: {}".format(data)
print "Received: {}".format(received)

类和继承关系

1
2
3
4
5
6
7
8
9
10
11
12
13
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+

RequestHandler 包括 StreamRequestHandler 和 DatagramRequestHandler, 都继承自 BaseRequestHandler

StreamRequestHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class StreamRequestHandler(BaseRequestHandler):

"""Define self.rfile and self.wfile for stream sockets."""

# Default buffer sizes for rfile, wfile.
# We default rfile to buffered because otherwise it could be
# really slow for large data (a getc() call per byte); we make
# wfile unbuffered because (a) often after a write() we want to
# read and we need to flush the line; (b) big writes to unbuffered
# files are typically optimized by stdio even when big reads
# aren't.
rbufsize = -1
wbufsize = 0

# A timeout to apply to the request socket, if not None.
timeout = None

# Disable nagle algorithm for this socket, if True.
# Use only when wbufsize != 0, to avoid small packets.
disable_nagle_algorithm = False

def setup(self):
self.connection = self.request
if self.timeout is not None:
self.connection.settimeout(self.timeout)
if self.disable_nagle_algorithm:
self.connection.setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY, True)
self.rfile = self.connection.makefile('rb', self.rbufsize)
self.wfile = self.connection.makefile('wb', self.wbufsize)

def finish(self):
if not self.wfile.closed:
try:
self.wfile.flush()
except socket.error:
# A final socket error may have occurred here, such as
# the local error ECONNABORTED.
pass
self.wfile.close()
+
self.rfile.close()

先定义了的 rbufsize 和 wbufsize 分别表示读缓冲区大小和写缓冲区大小

disable_nagle_algorithm 是用来控制是否启用 nagle 算法。TCP 有一个数据流接口,应用程序可以将任意尺寸的数据放入 TCP 栈中。即使一次只放一个字节也可以,但是每个 TCP 段中至少装载了 40 字节的标记和头部,所以如果 TCP 发送了大量包含少量数据的分组,网络的性能就会严重下降。Nagle 算法试图在发送一个分组之前,将大量的 TCP 数据绑定在一起,以提高网络效率, 但是会带来的问题是,小的 HTTP 报文可能无法填满一个分组,可能会因为等待那些永远不会到来的额外数据而产生时延。对于某些交互性很强的应用程序来说是不允许的。默认情况下是启用了这个算法的。

HTTP 应用程序通常在自己的栈中设置参数 TCP_NODELAY 禁用 Nagle 算法,提高性能。如果要这么做,一定要确保向 TCP 写入大块数据,这样就不会产生一堆小分组了。

self.request 看到是分别在 TCPServer 和 UDPServer 里面的 self.get_request() ,即 socket 建立连接的对象。

TCPServer

以下是一部分核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

self.__is_shut_down = threading.Event()

def _eintr_retry(func, *args):
"""restart a system call interrupted by EINTR"""
while True:
try:
return func(*args)
except (OSError, select.error) as e:
if e.args[0] != errno.EINTR:
raise


def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.

Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
# XXX: Consider using another file descriptor or
# connecting to the socket to wake this up instead of
# polling. Polling reduces our responsiveness to a
# shutdown request and wastes cpu at all other times.
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
if self in r:
self._handle_request_noblock()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()

todo:

  • threading.Event
  • select

MixIn

关于 Mixin, maemual 推荐的是看赖勇浩大叔的 Mixin 扫盲班。还是比较通俗易懂地讲出了一些关键问题的。

Reference

关于头图

拍摄自韩国首尔

compare and with all
防御式编程 EAFP 和 LBYL 的一些思考