目录


1 Depolyment

Deployment: running server code, including start, log, alert and restart after failure.

Deployment有2种:

  1. 一个server对应多个client。一旦该server fail,则client就连不到server。

  2. 多个server对应多个client。(1)DNS 返回多个IP;(2) load balance 有多个server 并行。现代网络大公司通常是结合两种,返回地理上离client距离最近的load balance的IP。

2 Server Code

server分三种模式处理client的request,即response mode有3种:

  1. 单线程;

  2. 多线程;

  3. 异步。

2.1 A Simple Protocol

这里我们自己设计了一个简单的protocol,client问一次,服务器回答一次。问题和回答存储在aphorisms里。;例如client问b’Beautiful is better than?’,服务器答b’ugly.’。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#zen_utils.py
import socket,time,argparse

aphorisms = {b'Beautiful is better than?': b'ugly.',
             b'Explicit is better than?': b'implicit.',
             b'Simple is better than?': b'complex.'}


def get_answer(question):
    """Return the string response to a particular Zen-of-Python aphorism"""
    time.sleep(0.0)
    return aphorisms.get(question,b'Error: unknown aphorism')


def parse_command_line(description):
    '''Parse command line and return a socket address.'''
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument('host',help='the server interface')
    parser.add_argument('-p',help='the port number',type=int,default=1060)
    args = parser.parse_args()
    address = (args.host,args.p)
    return address


def create_srv_socket(address):
    """Build and reuturn a listener server socket."""
    listener = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    listener.bind(address)
    print('server binding at: {}'.format(address))
    listener.listen(1)
    return listener

def accept_connection_forever(listener):
    """Forever accepting client socket."""
    while True:
        sock, address = listener.accept()
        print('client binding from: {}'.format(address))
        handle_conversation(sock,address)


def handle_conversation(sock,address):
    """Converse with a client over `sock` until they are done talking."""
    try:
        while True:
            handle_requests(sock)
    except EOFError:
        print('client binding fron: {} is closed'.format(address))
    except Exception as e:
        print('client {} error: {}'.format(address,e))
    finally:
        sock.close()

def handle_requests(sock):
    """Recieve a single client request on `sock` and send the answer."""
    question = recv_until(sock, b'?')
    answer = get_answer(question)
    sock.sendall(answer)

def recv_until(sock,suffix):
    """Receive bytes over socket `sock` until we receive the 'suffix'."""
    data = b''
    data += sock.recv(4096)
    if not data:
        raise EOFError('socket closed')
    while not data.endswith(suffix):
        more = sock.recv(4096)
        if not more:
            raise IOError('received {!r}, then socket closed'.format(more))
        data += more
    return data

zen_util.py是一个自定义的库,提取了一些方法,用于后面不同的服务器,避免代码重复,提高复用。代码比较简单,主要是将server socket的listen和accept里的几个步骤分别出来。需要注意的是最后一个方法recv_until(sock,suffix),这是一个framing的方法。具体来说,该方法同时使用于client和server接收的数据的framing,对于client sendall(data)是以b'?'结尾,例如b’Beautiful is better than?’;而对于server sendall(data)是以b'.'结尾,例如b’ugly.’。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#client.py
import zen_utils,socket,random,argparse

def client(address, cause_error=False):
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

    sock.connect(address)
    print('client binding at: {}, to: {}'.format(sock.getsockname(),sock.getpeername()))
    aphorisms = list(zen_utils.aphorisms)
    if cause_error:
        sock.sendall(aphorisms[0][:-1])
        return

    for aphorism in random.sample(aphorisms,3):
        sock.sendall(aphorism)
        print(aphorism,zen_utils.recv_until(sock,b'.'))

    sock.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='client')
    parser.add_argument('host',help='hostname or IP')
    parser.add_argument('-p',metavar='port',type=int,default=1060,help='the port number')
    args = parser.parse_args()
    address = (args.host,args.p)
    client(address)

client.py有3个循环,每个循环里一次sendall(),一次recv_until()

2.2 A Single-Threaded Server

1
2
3
4
5
6
7
#srv_single.py
import zen_utils

if __name__=='__main__':
    address = zen_utils.parse_command_line("single-threaded server")
    listener = zen_utils.create_srv_socket(address)
    zen_utils.accept_connection_forever(listener)

srv_single.py是一个简单的server,永远在监听client的socket;每一个socket里监听该client的请求并回应,如果client没有更多请求,该socket则关闭以相应下一个socket。

在这个单线程server里,额外的client socket在下面两种情况下都连接不进来(假定serverlisten(1)):

  1. 当前client的socket还在连接;

  2. server有failure。

再进一步看看该server的CPU利用效率(用27.6 trace)

1
$ python3.4 -m trace -tg --ignore-dir=/usr srv_single.py ''

Network Data & Error Summary

用trace我们可以获得以下几个信息:

  1. server从接收client第一个request到接收第4个空请求共用时0.13s(0.0s+0.05s+0.04s+0.04s)。

  2. server第一个recv立马返回,没有时间,原因是accept()的时候,TCP3次握手已经结束。在这期间,client的第三个ack包括了第一个sendall的数据。

  3. 这个过程中CPU的闲置率是7%(0.1/0.14)。

单线程server很容易受到故意攻击,只要client一直不断socket,那么别的client就连不进来。聪明一点的server可能会设置timeout,但是道高一尺魔高一丈,client只需要在timeout前断开,发送足够频繁的socket连接,就可以霸占这条server的socket。

CPU 7%的闲置率是单线程server的主要弊端,如何提高CPU的利用率使得server在等待某个client的request的时候可以同时处理其他client的信息呢。这就要用到下面两种方法:

  1. 多线程。

  2. 异步服务器设计,在单线程里处理多个client。

2.3 Threaded and Multiprocess Servers

2.3.1 threading module

多线程实现上面代码你可能会想到需要一个master thread来listen(),然后用多个线程来accept()。幸运的是OS使得socket编程简单很多,允许每一个thread包括listen()accept()。换句话说,将整个server整体打包,在不同thread上运行在同一个端口就可以。请看

1
2
3
4
5
6
7
8
9
10
11
12
13
#srv_threaded.py
import zen_utils
from threading import Thread

def start_threads(listener,workers=4):
    t = (listener,)
    for i in range(workers):
        Thread(target=zen_utils.accept_connection_forever,args=t).start()

if __name__=='__main__':
    address = zen_utils.parse_command_line("single-threaded server")
    listener = zen_utils.create_srv_socket(address)
    start_threads(listener)

整个时候如果你将zen_util.py里的get_answer()的延时设置为10,然后开启srv_threaded.py(),接着开启多个client.py,你可以看到每个client得到的响应是并行的。

从threading.Thread转换到multiprocessing.Process可以从多线程转成多进程。优点是进程之间的数据是独立的,不会相互影响;缺点是增加了系统的开销。

2.3.2 socketserver module

上面的multithreaded的server模式是如此流行以致于python有一个标准库 21.21 socketserver专门封装了这种模式。socketserveer和srv_threaded.py不同的地方有一下两点:

  1. 将server pattern换成 handler pattern。srv_threaded.py是用listen()和accept()这种socket编程的模式打包起来,运行在不同thread;而socketserver打破了这种模式,用handler来表示server里每一个thread中的socket accept后的sock要处理的回调函数。

  2. socketserver控制不了thread个数,但是srv_threaded.py可以。因此socketserver容易受到攻击使系统瘫痪,不建议在具体软件产品中使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import zen_utils
from socketserver import BaseRequestHandler, TCPServer, ThreadingMixIn

class ZenHandler(BaseRequestHandler):
    def handle(self):
        zen_utils.handle_conversation(self.request,self.client_address)

class ZenServer(ThreadingMixIn,TCPServer):
    allow_reuse_address = 1

if __name__=='__main__':
    address = zen_utils.parse_command_line("single-threaded server")
    server = ZenServer(address, ZenHandler)
    server.serve_forever()

2.4 Async Servers

我们退后一步,从一个大的框架下来讨论什么是Async。

I/O 4种模型:Linux的I/O分为两个维度,

  1. 如何等待消息,主动询问叫同步,被动等通知叫异步
  2. 等待过程中能否分心,不能叫阻塞,可以叫非阻塞

两两相乘我们可以得到下图4种I/O模型。

Network Data & Error Summary

I/O多路复用,即异步阻塞,有3种方式select,poll,epoll(本质一样)。

它们的异同点见下图。

Network Data & Error Summary

2.4.1 select module(select,poll,epoll)

下面是利用select模块里的poll()方法来异步创建server的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#srv_async_poll.py
import zen_utils, select

def all_events_forever(poll_object):
    while True:
        for fd,event in poll_object.poll():
            yield fd,event     #返回generator避免下面主for循环中的代码缩进太深,不方面阅读。


def serve(listener):
    sockets = {listener.fileno(): listener}
    addresses = {}
    bytes_received = {}
    bytes_to_send = {}

    poll_object = select.poll()
    poll_object.register(listener,select.POLLIN)

    for fd, event in all_events_forever(poll_object):
        sock = sockets[fd]

        # Socket closed: remove it from our data structures.

        if event & (select.POLLHUP|select.POLLERR|select.POLLNVAL):
            address = addresses.pop(sock)
            rb = bytes_received.pop(sock,b'')
            wb = bytes_to_send.pop(sock,b'')

            if rb:
                print('Client {} sent {} but then closed'.format(address,rb))
            elif wb:
                print('Client {} closed before we send {}'.format(address,wb))
            else:
                print('Client {} closed socket normally'.format(address))
            poll_object.unregister(fd)
            del sockets[fd]

        # New socket: add it to our structures.

        elif sock is listener:
            sock, address = sock.accept()
            print('Accepted connection from {}'.format(address))
            sock.setblocking(False)
            sockets[sock.fileno()] = sock
            addresses[sock] = address
            poll_object.register(sock,select.POLLIN)

        # Incoming data: keep receiving until we see the suffix.

        elif event & select.POLLIN:
            more = sock.recv(4096)
            if not more:
                sock.close()
                continue
            data = bytes_received.pop(sock,b'') + more
            if data.endswith(b'?'):
                bytes_to_send[sock] = zen_utils.get_answer(data)
                poll_object.modify(sock, select.POLLOUT)
            else:
                bytes_received[sock] = data

        # Socket ready to send: keep sending until all bytes are delivered.

        elif event & select.POLLOUT:
            data = bytes_to_send.pop(sock)
            n = sock.send(data)
            if n < len(data):
                bytes_to_send[sock] = data[n:]
            else:
                poll_object.modify(sock,select.POLLIN)


if __name__=='__main__':
    address = zen_utils.parse_command_line(('low-level async server'))
    listener = zen_utils.create_srv_socket(address)
    serve(listener)

理解如何利用socket的状态的改变来引导发送和接收逻辑是这代码的关键。但是我们发现业务逻辑代码占所有代码比例很小,大部分代码都是设置poll_object。

2.4.2 asyncio module

asyncio就是一个建立在select,poll,epoll上的设置好了主要循环而将业务逻辑暴露给开发者的模块。它有两种使用方法。

2.4.2.1 Callback-Style asyncio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import zen_utils, asyncio

class ZenServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print('Accpeted connection form {}'.format(self.address))

    def data_received(self, data):
        self.data += data
        if self.data.endswith(b'?'):
            answer = zen_utils.get_answer(self.data)
            self.transport.write(answer)
            self.data = b''

    def connection_lost(self, exc):
        if exc:
            print('Client {} sent {} but then closed'.format(self.address,exc))
        elif self.data:
            print('Client {} send {} but then closed'.format(self.address,self.data))
        else:
            print('Client {} closed socket'.format(self.address))


if __name__=='__main__':
    address = zen_utils.parse_command_line(('asyncio server using callbacks'))
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ZenServer,*address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        print(type(server))
        print(type(coro))
        loop.close()

Callback主要被封装在Protocol里面的3个方法。Protocol将socket封装起来,通过Transport来传收数据。

2.4.2.2 Coroutine-Style asyncio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#srv_async_asyncio_coroutine.py
import zen_utils, asyncio

@asyncio.coroutine
def handle_conversation(reader,writer):
    address = writer.get_extra_info('peername')
    print('Accepted connection form {}'.format(address))
    while True:
        data = b''
        while not data.endswith(b'?'):
            more_data = yield from reader.read(4096)
            if not more_data:
                if data:
                    print('Client {} send {!r} but then closed',format(address,data))
                else:
                    print('Client {} closed socket normally'.format(address))
                return
            data += more_data
        answer = zen_utils.get_answer(data)
        writer.write(answer)


if __name__=='__main__':
    address = zen_utils.parse_command_line(('asyncio server using callbacks'))
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_conversation,*address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

coroutine的用法暂时不表。

2.4.3 asyncore & asynchat module

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#srv_async_asyncore.py
import zen_utils, asyncore,asynchat

class ZenRequestHandler(asynchat.async_chat):

    def __init__(self,sock):
        asynchat.async_chat.__init__(self,sock)
        self.set_terminator(b'?')
        self.data = b''

    def collect_incoming_data(self, more_data):
        self.data += more_data

    def found_terminator(self):
        answer = zen_utils.get_answer(self.data+b'?')
        self.push(answer)
        self.initiate_send()
        self.data = b''


class Zenserver(asyncore.dispatcher):
    def handle_accept(self):
        sock,address = self.accept()
        ZenRequestHandler(sock)

if __name__=='__main__':
    address = zen_utils.parse_command_line(('legacy "asyncore" server'))
    listener = zen_utils.create_srv_socket(address)
    server = Zenserver(listener)
    server.accepting = True
    asyncore.loop()

2.5 The Best of Both Worlds

我们将上面的多线程(进程)和异步总结如下。

  1. 可异步服务器可以在单线程上完成同时对多个client socket的响应,但是当它达到它的效能极限或者有异常时,就block或者退出了;

  2. 而多线程(多进程)服务器可以在多个线程(进程)中对多个client socket进行相应,同时避免单个线程block或者退出对整体server的影响。

  3. 因此可以将两者结合起来,将发挥最大的性能,即用多线程(进程)跑异步服务器。

2.6 Running under inetd

略。

3 总结

服务器架构 summary.png

4 参考资料


Share Post

Twitter Google+

Shunmian

The only programmers in a position to see all the differences in power between the various languages are those who understand the most powerful one.