目录
Two Foundamental building blocks for services under heavy load:
-
Cache;
-
Message queue.
本文不是对于上述两个技术提供一个技术手册(对于详细的技术手册网络上可以找到)。而是介绍它们分别解决了什么问题,在python中如何使用该技术。因为归根到底开发者面对的最大的挑战,除了终身学习外,就是认出问题已经找出已有的解决方案。
1 Cache
Cache: a hardware or software component that stores data so future requests for that data can be served faster。
1.1 Using Memcached
Memcached:通过内存中的键值来缓存数据库的请求,减轻数据库频繁读取的压力。
使用Memcached的步骤非常简单:
-
安装Memcached(Mac下terminal:brew install memcached);启动Memcached daemon(Mac下terminal:memcached);Python3中安装memcache:pip3 install python3-memcached。
-
Memcached daemon配置IP和port,发送给Memcached client。
-
Memcached client现在可以通过类似dict的操作来进行读取。
1
2
3
4
5
import memcache
mc = memcache.Client(['127.0.0.1:11211'])
mc.set('user:18','Simple is better than complex!')
print(mc.get('user:18'))
需要注意几点:
-
memcache是存储在内存里,因此一旦断电,里面的数据就消失了。
-
如果memcache client读取一个数据,若memcache server里有,则直接读取;若没有,则从数据库读取且缓存在memcahe server里以供下次缓存读取。
-
memcache server可以有多个memcache实例,它们之间是独立的,通过memcahe server来管理。
-
memcache client读取一对键值有“2阶段哈希”,首先“键->memcache实例地址”,然后“memcache实例->数据”。
我们下面用一段代码来感受下Memcached的作用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import memcache,random,timeit,time
def fetch_square(mc,n):
data = mc.get('sq:{}'.format(n))
if data is None:
time.sleep(0.001)
data = n*n
mc.set('sq:{}'.format(n),data)
return data
def main():
mc = memcache.Client(['127.0.0.1:11211'])
def make_request():
fetch_square(mc, random.randint(0, 5000))
print('Ten successive runs:')
for i in range(1,11):
print('{:.2f}s '.format(timeit.timeit(make_request,number=2000)), end ='')
if __name__== "__main__":
main()
#output:
# Ten successive runs:
# 4.60s 3.13s 2.40s 1.68s 1.30s 1.08s 0.84s 0.75s 0.66s 0.62s
可以看到读取的时间越来越少,因为被Memcache实例缓存起来了。
1.2 Hashing and Sharding
对于前面提到的二阶哈希,它的技术关键是sharding。Sharding(分区)被广泛地用于数据库存储来将可用存储空间分成低耦合的多个shard(碎片)。
Sharding(分区): a type of database partitioning that separates very large databases the into smaller, faster, more easily managed parts called data shards. The word shard means a small part of a whole.
我们下面用一个简单的例子来说明不同哈希函数对不同服务器选择的分布。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import hashlib
def alpha_shard(word):
"""Do a poor job of assigning data to servers by using first letters."""
if word[0] < 'g': # abcdef
return 'server0'
elif word[0] < 'n': # ghijklm
return 'server1'
elif word[0] < 't': # nopqrs
return 'server2'
else: # tuvwxyz
return 'server3'
def hash_shard(word):
'''Assign data to servers using Python's built-in hash() function.'''
return 'server{}'.format(hash(word)%4)
def md5_shard(word):
"""Assign data to servers using a public hash algorithm."""
data = word.encode('utf-8')
return 'server{}'.format(hashlib.md5(data).digest()[-1] % 4)
def main():
words = open('words.txt').read().split()
for function in alpha_shard, hash_shard, md5_shard:
shardingDict = {"server0": 0, "server1": 1, "server2": 2, "server3": 3}
for word in words:
server = function(word)
shardingDict[server] +=1
print("function: {}".format(function.__name__))
for k,v in shardingDict.items():
print('{}: {:.0f}% '.format(k, 100*v/len(words)))
print('')
if __name__== "__main__":
main()
# output:
# function: alpha_shard
# server1: 20%
# server2: 32%
# server0: 33%
# server3: 15%
#
# function: hash_shard
# server1: 25%
# server2: 25%
# server0: 25%
# server3: 25%
#
# function: md5_shard
# server1: 25%
# server2: 25%
# server0: 25%
# server3: 25%
可用看到如果只用头字母来决定存储到哪个服务器,则服务器的负载分布和字典里的单词的分布有很大关系;利用哈希函数可用抹除掉字母本身的pattern,产生更均匀的分布,无论是用python 内建的hash函数还是md5哈希算法。
2 Message Queues
Message Queues:是app之间通信的一层协议,建立在Transport Layer和Application Layer之间,为了协调各个app之间的消息传输,便于将1个大型app解耦成几个独立的sub app,增加整个app的扩展性,移植性和容错率。
2.1 Using Message Queues in Python
ZeroMQ:所有网络交互所使用的API实际上是Berkeley套接字(BSD)。这个源自1980年代早期的协议是TCP/IP协议的最原始实现。而且可以说,在当今各操作系统中,它是受到最广泛支持的API,也是这些操作系统的核心组件之一。人们对BSD套接字的了解较多的是点对点的连接。点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等。一旦你解决了以上所有问题,你就进入应用协议层(如HTTP)的世界了,这里需要的是组帧、缓存和处理逻辑等。换言之,编写高性能网络协议的应用程序一点儿也不复杂。如果我们能对各种套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,这不是件很好的事情吗?这正是ZeroMQ(ØMQ/ZMQ)网络库的由来:“它提供一些跨多种传输协议(如进程内通讯、IPC、TCP和广播)的套接字供你使用。你可使用多种方式实现N对N的套接字连接,譬如:扇出、发布订阅、任务分发以及请求响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import random, threading, time, zmq
B = 32 # number of bits of precision in each random integer
def ones_and_zeros(digits):
"""Express `n` in at least `d` binary digits, with no special prefix."""
return bin(random.getrandbits(digits)).lstrip('0b').zfill(digits)
def bitsource(zcontext, url):
"""Produce random points in the unit square."""
zsock = zcontext.socket(zmq.PUB)
zsock.bind(url)
while True:
zsock.send_string(ones_and_zeros(B * 2))
time.sleep(0.01)
def always_yes(zcontext, in_url, out_url):
"""Coordinates in the lower-left quadrant are inside the unit circle."""
isock = zcontext.socket(zmq.SUB)
isock.connect(in_url)
isock.setsockopt(zmq.SUBSCRIBE, b'00')
osock = zcontext.socket(zmq.PUSH)
osock.connect(out_url)
while True:
isock.recv_string()
osock.send_string('Y')
def judge(zcontext, in_url, pythagoras_url, out_url):
"""Determine whether each input coordinate is inside the unit circle."""
isock = zcontext.socket(zmq.SUB)
isock.connect(in_url)
for prefix in b'01', b'10', b'11':
isock.setsockopt(zmq.SUBSCRIBE, prefix)
psock = zcontext.socket(zmq.REQ)
psock.connect(pythagoras_url)
osock = zcontext.socket(zmq.PUSH)
osock.connect(out_url)
unit = 2 ** (B * 2)
while True:
bits = isock.recv_string()
n, m = int(bits[::2], 2), int(bits[1::2], 2)
psock.send_json((n, m))
sumsquares = psock.recv_json()
osock.send_string('Y' if sumsquares < unit else 'N')
def pythagoras(zcontext, url):
"""Return the sum-of-squares of number sequences."""
zsock = zcontext.socket(zmq.REP)
zsock.bind(url)
while True:
numbers = zsock.recv_json()
zsock.send_json(sum(n * n for n in numbers))
def tally(zcontext, url):
"""Tally how many points fall within the unit circle, and print pi."""
zsock = zcontext.socket(zmq.PULL)
zsock.bind(url)
p=q= 0
while True:
decision = zsock.recv_string()
q += 1
if decision == 'Y':
p += 4
print(decision, p / q)
def start_thread(function, *args):
thread = threading.Thread(target=function, args=args)
thread.daemon = True # so you can easily Ctrl-C the whole program
thread.start()
def main(zcontext):
pubsub = 'tcp://127.0.0.1:6700'
reqrep = 'tcp://127.0.0.1:6701'
pushpull = 'tcp://127.0.0.1:6702'
start_thread(bitsource, zcontext, pubsub)
start_thread(always_yes, zcontext, pubsub, pushpull)
start_thread(judge, zcontext, pubsub, reqrep, pushpull)
start_thread(pythagoras, zcontext, reqrep)
start_thread(tally, zcontext, pushpull)
time.sleep(30)
if __name__ == '__main__':
main(zmq.Context())
#output: 3.1484257871064467