目录
- 1 Depolyment
- 2 Server Code
- 3 总结
- 4 参考资料
1 Depolyment
Deployment: running server code, including start, log, alert and restart after failure.
Deployment有2种:
-
一个server对应多个client。一旦该server fail,则client就连不到server。
-
多个server对应多个client。(1)DNS 返回多个IP;(2) load balance 有多个server 并行。现代网络大公司通常是结合两种,返回地理上离client距离最近的load balance的IP。
2 Server Code
server分三种模式处理client的request,即response mode有3种:
-
单线程;
-
多线程;
-
异步。
2.1 A Simple Protocol
这里我们自己设计了一个简单的protocol,client问一次,服务器回答一次。问题和回答存储在aphorisms
里。;例如client问b’Beautiful is better than?’,服务器答b’ugly.’。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#zen_utils.py
import socket,time,argparse
aphorisms = {b'Beautiful is better than?': b'ugly.',
b'Explicit is better than?': b'implicit.',
b'Simple is better than?': b'complex.'}
def get_answer(question):
"""Return the string response to a particular Zen-of-Python aphorism"""
time.sleep(0.0)
return aphorisms.get(question,b'Error: unknown aphorism')
def parse_command_line(description):
'''Parse command line and return a socket address.'''
parser = argparse.ArgumentParser(description=description)
parser.add_argument('host',help='the server interface')
parser.add_argument('-p',help='the port number',type=int,default=1060)
args = parser.parse_args()
address = (args.host,args.p)
return address
def create_srv_socket(address):
"""Build and reuturn a listener server socket."""
listener = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
listener.bind(address)
print('server binding at: {}'.format(address))
listener.listen(1)
return listener
def accept_connection_forever(listener):
"""Forever accepting client socket."""
while True:
sock, address = listener.accept()
print('client binding from: {}'.format(address))
handle_conversation(sock,address)
def handle_conversation(sock,address):
"""Converse with a client over `sock` until they are done talking."""
try:
while True:
handle_requests(sock)
except EOFError:
print('client binding fron: {} is closed'.format(address))
except Exception as e:
print('client {} error: {}'.format(address,e))
finally:
sock.close()
def handle_requests(sock):
"""Recieve a single client request on `sock` and send the answer."""
question = recv_until(sock, b'?')
answer = get_answer(question)
sock.sendall(answer)
def recv_until(sock,suffix):
"""Receive bytes over socket `sock` until we receive the 'suffix'."""
data = b''
data += sock.recv(4096)
if not data:
raise EOFError('socket closed')
while not data.endswith(suffix):
more = sock.recv(4096)
if not more:
raise IOError('received {!r}, then socket closed'.format(more))
data += more
return data
zen_util.py是一个自定义的库,提取了一些方法,用于后面不同的服务器,避免代码重复,提高复用。代码比较简单,主要是将server socket的listen和accept里的几个步骤分别出来。需要注意的是最后一个方法recv_until(sock,suffix)
,这是一个framing的方法。具体来说,该方法同时使用于client和server接收的数据的framing,对于client sendall(data)
是以b'?'
结尾,例如b’Beautiful is better than?’;而对于server sendall(data)
是以b'.'
结尾,例如b’ugly.’。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#client.py
import zen_utils,socket,random,argparse
def client(address, cause_error=False):
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.connect(address)
print('client binding at: {}, to: {}'.format(sock.getsockname(),sock.getpeername()))
aphorisms = list(zen_utils.aphorisms)
if cause_error:
sock.sendall(aphorisms[0][:-1])
return
for aphorism in random.sample(aphorisms,3):
sock.sendall(aphorism)
print(aphorism,zen_utils.recv_until(sock,b'.'))
sock.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='client')
parser.add_argument('host',help='hostname or IP')
parser.add_argument('-p',metavar='port',type=int,default=1060,help='the port number')
args = parser.parse_args()
address = (args.host,args.p)
client(address)
client.py有3个循环,每个循环里一次sendall()
,一次recv_until()
。
2.2 A Single-Threaded Server
1
2
3
4
5
6
7
#srv_single.py
import zen_utils
if __name__=='__main__':
address = zen_utils.parse_command_line("single-threaded server")
listener = zen_utils.create_srv_socket(address)
zen_utils.accept_connection_forever(listener)
srv_single.py是一个简单的server,永远在监听client的socket;每一个socket里监听该client的请求并回应,如果client没有更多请求,该socket则关闭以相应下一个socket。
在这个单线程server里,额外的client socket在下面两种情况下都连接不进来(假定serverlisten(1)
):
-
当前client的socket还在连接;
-
server有failure。
再进一步看看该server的CPU利用效率(用27.6 trace)
1
$ python3.4 -m trace -tg --ignore-dir=/usr srv_single.py ''
用trace我们可以获得以下几个信息:
-
server从接收client第一个request到接收第4个空请求共用时0.13s(0.0s+0.05s+0.04s+0.04s)。
-
server第一个recv立马返回,没有时间,原因是accept()的时候,TCP3次握手已经结束。在这期间,client的第三个ack包括了第一个sendall的数据。
-
这个过程中CPU的闲置率是7%(0.1/0.14)。
单线程server很容易受到故意攻击,只要client一直不断socket,那么别的client就连不进来。聪明一点的server可能会设置timeout,但是道高一尺魔高一丈,client只需要在timeout前断开,发送足够频繁的socket连接,就可以霸占这条server的socket。
CPU 7%的闲置率是单线程server的主要弊端,如何提高CPU的利用率使得server在等待某个client的request的时候可以同时处理其他client的信息呢。这就要用到下面两种方法:
-
多线程。
-
异步服务器设计,在单线程里处理多个client。
2.3 Threaded and Multiprocess Servers
2.3.1 threading module
多线程实现上面代码你可能会想到需要一个master thread来listen()
,然后用多个线程来accept()
。幸运的是OS使得socket编程简单很多,允许每一个thread包括listen()
和accept()
。换句话说,将整个server整体打包,在不同thread上运行在同一个端口就可以。请看
1
2
3
4
5
6
7
8
9
10
11
12
13
#srv_threaded.py
import zen_utils
from threading import Thread
def start_threads(listener,workers=4):
t = (listener,)
for i in range(workers):
Thread(target=zen_utils.accept_connection_forever,args=t).start()
if __name__=='__main__':
address = zen_utils.parse_command_line("single-threaded server")
listener = zen_utils.create_srv_socket(address)
start_threads(listener)
整个时候如果你将zen_util.py里的get_answer()
的延时设置为10,然后开启srv_threaded.py(),接着开启多个client.py,你可以看到每个client得到的响应是并行的。
从threading.Thread转换到multiprocessing.Process可以从多线程转成多进程。优点是进程之间的数据是独立的,不会相互影响;缺点是增加了系统的开销。
2.3.2 socketserver module
上面的multithreaded的server模式是如此流行以致于python有一个标准库 21.21 socketserver专门封装了这种模式。socketserveer和srv_threaded.py不同的地方有一下两点:
-
将server pattern换成 handler pattern。srv_threaded.py是用listen()和accept()这种socket编程的模式打包起来,运行在不同thread;而socketserver打破了这种模式,用handler来表示server里每一个thread中的socket accept后的sock要处理的回调函数。
-
socketserver控制不了thread个数,但是srv_threaded.py可以。因此socketserver容易受到攻击使系统瘫痪,不建议在具体软件产品中使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import zen_utils
from socketserver import BaseRequestHandler, TCPServer, ThreadingMixIn
class ZenHandler(BaseRequestHandler):
def handle(self):
zen_utils.handle_conversation(self.request,self.client_address)
class ZenServer(ThreadingMixIn,TCPServer):
allow_reuse_address = 1
if __name__=='__main__':
address = zen_utils.parse_command_line("single-threaded server")
server = ZenServer(address, ZenHandler)
server.serve_forever()
2.4 Async Servers
我们退后一步,从一个大的框架下来讨论什么是Async。
I/O 4种模型:Linux的I/O分为两个维度,
- 如何等待消息,主动询问叫同步,被动等通知叫异步;
- 等待过程中能否分心,不能叫阻塞,可以叫非阻塞。
两两相乘我们可以得到下图4种I/O模型。
I/O多路复用,即异步阻塞,有3种方式select,poll,epoll(本质一样)。
它们的异同点见下图。
2.4.1 select module(select,poll,epoll)
下面是利用select模块里的poll()方法来异步创建server的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#srv_async_poll.py
import zen_utils, select
def all_events_forever(poll_object):
while True:
for fd,event in poll_object.poll():
yield fd,event #返回generator避免下面主for循环中的代码缩进太深,不方面阅读。
def serve(listener):
sockets = {listener.fileno(): listener}
addresses = {}
bytes_received = {}
bytes_to_send = {}
poll_object = select.poll()
poll_object.register(listener,select.POLLIN)
for fd, event in all_events_forever(poll_object):
sock = sockets[fd]
# Socket closed: remove it from our data structures.
if event & (select.POLLHUP|select.POLLERR|select.POLLNVAL):
address = addresses.pop(sock)
rb = bytes_received.pop(sock,b'')
wb = bytes_to_send.pop(sock,b'')
if rb:
print('Client {} sent {} but then closed'.format(address,rb))
elif wb:
print('Client {} closed before we send {}'.format(address,wb))
else:
print('Client {} closed socket normally'.format(address))
poll_object.unregister(fd)
del sockets[fd]
# New socket: add it to our structures.
elif sock is listener:
sock, address = sock.accept()
print('Accepted connection from {}'.format(address))
sock.setblocking(False)
sockets[sock.fileno()] = sock
addresses[sock] = address
poll_object.register(sock,select.POLLIN)
# Incoming data: keep receiving until we see the suffix.
elif event & select.POLLIN:
more = sock.recv(4096)
if not more:
sock.close()
continue
data = bytes_received.pop(sock,b'') + more
if data.endswith(b'?'):
bytes_to_send[sock] = zen_utils.get_answer(data)
poll_object.modify(sock, select.POLLOUT)
else:
bytes_received[sock] = data
# Socket ready to send: keep sending until all bytes are delivered.
elif event & select.POLLOUT:
data = bytes_to_send.pop(sock)
n = sock.send(data)
if n < len(data):
bytes_to_send[sock] = data[n:]
else:
poll_object.modify(sock,select.POLLIN)
if __name__=='__main__':
address = zen_utils.parse_command_line(('low-level async server'))
listener = zen_utils.create_srv_socket(address)
serve(listener)
理解如何利用socket的状态的改变来引导发送和接收逻辑是这代码的关键。但是我们发现业务逻辑代码占所有代码比例很小,大部分代码都是设置poll_object。
2.4.2 asyncio module
asyncio就是一个建立在select,poll,epoll上的设置好了主要循环而将业务逻辑暴露给开发者的模块。它有两种使用方法。
2.4.2.1 Callback-Style asyncio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import zen_utils, asyncio
class ZenServer(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.data = b''
print('Accpeted connection form {}'.format(self.address))
def data_received(self, data):
self.data += data
if self.data.endswith(b'?'):
answer = zen_utils.get_answer(self.data)
self.transport.write(answer)
self.data = b''
def connection_lost(self, exc):
if exc:
print('Client {} sent {} but then closed'.format(self.address,exc))
elif self.data:
print('Client {} send {} but then closed'.format(self.address,self.data))
else:
print('Client {} closed socket'.format(self.address))
if __name__=='__main__':
address = zen_utils.parse_command_line(('asyncio server using callbacks'))
loop = asyncio.get_event_loop()
coro = loop.create_server(ZenServer,*address)
server = loop.run_until_complete(coro)
print('Listening at {}'.format(address))
try:
loop.run_forever()
finally:
server.close()
print(type(server))
print(type(coro))
loop.close()
Callback主要被封装在Protocol里面的3个方法。Protocol将socket封装起来,通过Transport来传收数据。
2.4.2.2 Coroutine-Style asyncio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#srv_async_asyncio_coroutine.py
import zen_utils, asyncio
@asyncio.coroutine
def handle_conversation(reader,writer):
address = writer.get_extra_info('peername')
print('Accepted connection form {}'.format(address))
while True:
data = b''
while not data.endswith(b'?'):
more_data = yield from reader.read(4096)
if not more_data:
if data:
print('Client {} send {!r} but then closed',format(address,data))
else:
print('Client {} closed socket normally'.format(address))
return
data += more_data
answer = zen_utils.get_answer(data)
writer.write(answer)
if __name__=='__main__':
address = zen_utils.parse_command_line(('asyncio server using callbacks'))
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_conversation,*address)
server = loop.run_until_complete(coro)
print('Listening at {}'.format(address))
try:
loop.run_forever()
finally:
server.close()
loop.close()
coroutine的用法暂时不表。
2.4.3 asyncore & asynchat module
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#srv_async_asyncore.py
import zen_utils, asyncore,asynchat
class ZenRequestHandler(asynchat.async_chat):
def __init__(self,sock):
asynchat.async_chat.__init__(self,sock)
self.set_terminator(b'?')
self.data = b''
def collect_incoming_data(self, more_data):
self.data += more_data
def found_terminator(self):
answer = zen_utils.get_answer(self.data+b'?')
self.push(answer)
self.initiate_send()
self.data = b''
class Zenserver(asyncore.dispatcher):
def handle_accept(self):
sock,address = self.accept()
ZenRequestHandler(sock)
if __name__=='__main__':
address = zen_utils.parse_command_line(('legacy "asyncore" server'))
listener = zen_utils.create_srv_socket(address)
server = Zenserver(listener)
server.accepting = True
asyncore.loop()
2.5 The Best of Both Worlds
我们将上面的多线程(进程)和异步总结如下。
-
可异步服务器可以在单线程上完成同时对多个client socket的响应,但是当它达到它的效能极限或者有异常时,就block或者退出了;
-
而多线程(多进程)服务器可以在多个线程(进程)中对多个client socket进行相应,同时避免单个线程block或者退出对整体server的影响。
-
因此可以将两者结合起来,将发挥最大的性能,即用多线程(进程)跑异步服务器。
2.6 Running under inetd
略。