Redis 应用层协议解析以及在 Python 客户端中的实现

栏目: Python · 发布时间: 5年前

Redis 应用层协议解析以及在  <a href='https://www.codercto.com/topics/20097.html'>Python</a>  客户端中的实现

安禹

安禹,网易游戏高级运维工程师,主要工作方向为网易游戏 Redis Saas 的开发与运维,也关注 Python 和 Rust 的最新进展。怕什么真理无穷,进一寸有进一寸的欢喜。

Redis 的应用层协议设计

Redis 的通信协议设计得非常简单,具体可以参考 Redis Protocol specification ,简称 RESP,在此进行一个大致的介绍。

RESP 本身并没有专门的字段标记整个请求的报文长度,它的设计思路整体针对于 命令管道(Pipeline)的需求,可以很方便地将多条命令封装在一次 tcp 报文发送中,比如:当我们发送一个 ` GET A ` 命令,对应的报文如下:

*2\r\n$3\r\nGET\r\n$1\r\nA\r\n

而如果通过 Pipeline 发送 ` GET A ` `GET B` 两条命令时,并不需要什么额外处理,仅仅是将两条命令按顺序发送:

 *2\r\n$3\r\nGET\r\n$1\r\nA\r\n*2\r\n$3\r\nGET\r\n$1\r\nB\r\n

接下来我们进行一个较为详细完整的解析

RESP 规定了五种数据类型

- 简单字符串(Simple Strings):以 ` + ` 作为开头,一般用于简单的字符串回复,比如 ` set A B ` 类命令的返回报文 中的 ` OK ` ,就是封装在简单字符串类型中。

- 错误(Errors):以 ` - ` 作为开头,用于返回错误信息,比如输入了一条不存在的命令,redis 服务会返回 ` ERR unknown command 'xx' ` ,这条 错误信息就封装在错误类型报文中。

- 整数(Integers) :以 ` : ` 开头,用 于返回整数结果 ,比如 ` LLEN ` 令,当我们用它统计某个列表长度时,返回的数字就封装在整数类型中。

- 二进制安全字符串(Bulk Strings): ` $ ` 为开头,用于承载携带数据,是最重要最常用的类型,当你向 Redis 发送命令时,命令中的字符串会被封装在二进制安全字符串,比如开篇的例 子中 ` GET ` 就被封装成了 ` $3\r\nGET\r\n ` 这样一个二进制安全字符串报文,而一个正常 GET 命令的返回报文同样是 一个二进制安全字符串。

- 数组(Arrays):以 ` * ` 开头,同样是最重要最常用的类型,开篇的例子中 ` GET A ` 命令中的两个字符 ` GET ` ` A ` 分别被封装成了 ` $3\r\nGET\r\n ` ` $1\r\nA\r\n ` ,然后被进一步封装成了一个数组类型 ` *2\r\n... ` ,我们对 Redis 所有发送的命令都会被这样封装, 先是子字符串被封装成二进制安全字符串,然后二进制安全字符串被封装成数组发往服务端。

以下是更为详细的示例:

简单字符串(Simple Strings)

简单字符串的回复通常是固定的,可以类似的理解为静态字符串,这种通常表达一种确定的、可预期的结果,比如最常见的就 ` OK ` 事务中返回的 `QUEUED`    

127.0.0.1:6379> set a b    

OK    

127.0.0.1:6379> 

` OK ` 个字符串不会有任何改变,也不需要携带可变的信息,它仅仅是标识这个操作成功了,不会包含其他任何可变的数据。它 ` + ` 为开头, ` \r\n ` 结尾,比 ` OK ` 报文就 ` +OK\r\n `

错误(Errors)

错误与简单字符串非常相似,不同的是它以 ` - ` 为开头,其他并没有什么不同,它仅仅是显示一个错误信息,而这个信息在协议上并没有什么强制的规范,可以写入任意字符串信息,当然错误字符串中是不能写 ` \r\n `

比如命令不存在的报 ` ERR unknown command 'tt' ` 装结果就是

`-ERR unknown command 'tt'\r\n`

整数(Integers)

整数类型也很简单,和前两种不同的是,它是可以携带数据的,类型为有符号64位整数,用于一些返回整数类型的命令,目前文档显示,会返回整数的有以下这些命令,

- SETNX

- DEL

- EXISTS

- INCR

- INCRBY

- DECR

- DECRBY

- DBSIZE

- LASTSAVE

- RENAMENX

- MOVE

- LLEN

- SADD

- SREM

- SISMEMBER

- SCARD

当然 Redis 的官方文档一直都不是很靠谱,RESP很久没更新了,目前来看至少用于 Stream 功能的 XLEN 命令和用于 HyperLog 功能的 PFCOUNT命令也是要返回整数类型。

这种数据的封装也很简单,使 `:` 作为开头, `\r\n` 作为结尾,中间为要填充的数字,整数类型可以用来标识布尔类型,比如在 EXISTS 命令中, `:1\r\n` 表示 true, `:0\r\n` 表示 fals e

因为整数类型使用64位有符号整数,所以也可以表示负数,比如对一个不存在的 key 使用 TTL 命令时,会返回 -2

10.200.27.30:6379> EXISTS AAA    

(integer) 0   # key AAA 不存在    

10.200.27.30:6379> TTL AAA    

(integer) -2  # 对 AAA 使用 TTL 命令返回 -2

二进制安全字符串(Bulk Strings)

二进制安全字符串使用如下方法进行编码:

1 . 用 `$` 字符作为开头,后接实际字符串的字节数,再添加 `\r\n` 来表示数据长度。

2. 要发送的实际字符串数据。

3. 再添加一个 `\r\n` 作为结尾

4. 单个二进制安全字符串数据字节数不超过 512MB

举例,要封装 `Hello,world` 字符串,字符串字节数为 11,所以使用 `$11\r\n` 作为开头,封装结果如下:    

$11\r\nHello,world\r\n

空字符串可以使用如下表示:  

$0\r\n\r\n

二进制安全字符串还可用来表示 NULL    

$-1\r\n

比如当我们尝试 GET 一个不存在的 key 时, 就会返回 `$-1\r\n` 以下我们使用原 生 socket 和 Redis 服务端交互:

$ python

Python 2.7.9 (default, Mar  1 2015, 12:57:24) 
[GCC 4.9.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import socket
>>> res = socket.getaddrinfo("127.0.0.1", 6379, 0, socket.SOCK_STREAM)

>>> res

[(2, 1, 6, '', ('127.0.0.1', 6379))]
>>> family, socktype, proto, canonname, socket_address = res[0]
>>> sock = socket.socket(family, socktype, proto)
>>> sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
>>> sock.connect(socket_address)

>>> # 发送 EXISTS AAA 命令

>>> sock.sendall(b"*2\r\n$6\r\nEXISTS\r\n$3\r\nAAA\r\n")
>>> sock.recv(512)

':0\r\n'

>>> # 返回整数类型 0,表示 false,即 key AAA 不存在  

>>> # 发送 GET AAA 命令   

>>> sock.sendall(b"*2\r\n$3\r\nGET\r\n$3\r\nAAA\r\n")
>>> sock.recv(512)

'$-1\r\n'

>>> # 返回一个长度为 -1 的二进制安全字符串,表示 key AAA 对应的 value 不存在

>>> # 对不存在的 key 使用 TTL

>>> sock.sendall(b"*2\r\n$3\r\nTTL\r\n$3\r\nAAA\r\n")
>>> sock.recv(512)

':-2\r\n'

>>> # 返回整数 -2

数组(Arrays)

数组类型用于发送 Redis 命令,也同样用于一些命令的返回,它使用 `*` 作为开头,后接数据元素个数,再接 `\r\n` ,之 后即可放入对应元素,元素可以是任意类型,当然也可以是一个数组,数组可以包含另一个数组。

比如包含两个整形 1 的数组

*2\r\n:1\r\n:1\r\n

当需要一个 NULL 数组时,处理方式与二进制安全字符串类似

*-1\r\n

比如 BLPOP 命令超时后,redis 服务端就会返回一个 NULL 数组,即 `*-1\r\n`

协议上对 Pipeline 的实现

如文章开头所说,RESP 设计思路一开始就充分考虑了 Pipeline 的需求,这是因为内存速度远高于网络 IO,还能大大降低 IO 的读写次数,使用 Pipeline 是挖掘 Redis 性能最具有性价比的方法。协议上对于 Pipeline 的实现也非常直接了当。

实现的方式就是将多条报文直接连在一起发送,没有其他任何额外信息发送。

还是用原生 socket 发送报文来举例:

>>> CMD_1 = b"*2\r\n$3\r\nGET\r\n$3\r\nAAA\r\n" # 命令1 GET AAA

>>> CMD_2 = b"*2\r\n$3\r\nGET\r\n$3\r\nAAB\r\n" # 命令2 GET AAB   

>>> sock.sendall(CMD_1 + CMD_2) # 直接将两条报文一起发送                      

>>> sock.recv(512)

'$-1\r\n$-1\r\n' # 可见两条命令的回复也被一起发回

协议上对 Pipeline 的实现就是这么简单。

redis-py 对于协议的实现

redis-py 对于协议解析的实现

redis-py是目前使用最多的 Python 语言下的 Redis 客户端 工具 库,它对于 Redis 协议解析在 redis/connection.py文件的 HiredisParser 和 PythonParser 对象(3.0.1 版本)中实现。

程序会根据当前包的安装情况,如果发现安装了 0.1.3 版本以上的 hiredis-py,就会 import hiredis 进行网络 IO 读取和报文解析。

hiredis-py是 Redis 官方提供的 Python 语言 Redis 客户端驱动,底层使用 C 编写,理论上拥有更好的性能,但是也要注意,如果使用 Pypy 的话,可能会出现对于 hiredis-py的兼容性问题。

我们在此主要看使用 Python 编写的 PythonParser 对象的实现,主要代码在 PythonParser 的 read_response 方法,代码很短,为了方便展示,剔除了一些类型检查和错误处理的代码:

def read_response(self):    

# 从 socket buffer 对象中读取服务端回复,读到 \r\n 为止    

response = self._buffer.readline()    

# 如果读取的内容为 空字节串 则说明连接已经断开    

if not response:        

raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)    

# 取开头的一字节,确定报文类型    

byte, response = byte_to_chr(response[0]), response[1:]    

# 如果报文类型未知报协议错误    

if byte not in ('-', '+', ':', '$', '*'):        

raise InvalidResponse("Protocol Error: %s, %s" %                                                       (str(byte), str(response)))    

# 处理错误类型报文    

if byte == '-':        

response = nativestr(response)        

# 处理一些常见错误报文,注意这只是处理一些约定俗成的错误内容,和协议规范并无关系        

# 具体可参见 BaseParser 对象        

error = self.parse_error(response)        

return error    

# 处理简单字符串    

elif byte == '+':        

pass    

# 处理整形类型    

elif byte == ':':        

# 将回复转成 64 位长整形        

response = long(response)    

# 处理二进制安全字符串    

elif byte == '$':        

# 获取二进制字符串长度        

length = int(response)        

# 如果长度为 -1,说明是个 NULL,直接返回 None        

if length == -1:            

return None        

# 读取对应长度的报文        

response = self._buffer.read(length)    

# 处理数组    

elif byte == '*':        

length = int(response)        

# 处理空数组的情况        

if length == -1:            

return None        

# 循环递归获取数组中元素,如果是 Python3,这里的 xrange 实际上被定位到了 range 函数           response = [self.read_response() for i in xrange(length)]    

# 将回复报文转码为 str    

if isinstance(response, bytes):        

response = self.encoder.decode(response)    

return response

redis-py 对于协议报文构造的实现

redis-py对协议报文的构造完全由 Python 编写,主要代码在 Connection 对象 的 pack_command 方法,代码同样不长,因为只需要处理二进制安全字符串和数组两种,代码更简单

def pack_command(self, *args):    

output = []    

# 这一部份主要是为了兼容类似于 `config get XXXX` 的命令    

# 因为实现中将 `config get` 作为一个命令,但是在封装成二进制安全字符串时    

# 依然要作为两个字符串,所以在此进行分割,Token 对象只是一个缓存    

command = args[0]    

if ' ' in command:        

args = tuple(Token.get_token(s)                    

for s in command.split()) + args[1:]    

else:        

args = (Token.get_token(command),) + args[1:]    

# SYM_STAR = b'*', SYM_DOLLAR = b'$', SYM_CRLF = b'\r\n', SYM_EMPTY = b''    

# 构造数组头部,比如 `*2\r\n`    buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))    

# 代码对这个值写死为 6000    buffer_cutoff = self._buffer_cutoff    

# 对各参数进行编码,编码细节可见 Encoder 对象    for arg in imap(self.encoder.encode, args):        

# 为避免单个命令总的字节数过长,导致生成一个极长的字符串,        

# 当 buff 字节数或 arg 总数超过 6000 时,将其分块在列表的多个字符串中        

# 不过还是有些局限,比如尝试 GET 一个名称极长的 key 时,最终的结果        

# 不会把 key 名分割         

if len(buff) > buffer_cutoff or len(arg) > buffer_cutoff:            

buff = SYM_EMPTY.join(                

(buff, SYM_DOLLAR, str(len(arg)).encode(), SYM_CRLF))

output.append(buff)            

output.append(arg)            

buff = SYM_CRLF        

else:            

buff = SYM_EMPTY.join(                

(buff, SYM_DOLLAR, str(len(arg)).encode(), SYM_CRLF, arg, SYM_CRLF))  

output.append(buff)    

return output

使用 pack_command 构造报文:

$ python
Python 2.7.9 (default, Mar  1 2015, 12:57:24) 
[GCC 4.9.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from redis.connection import Connection
>>> conn = Connection()
>>> conn.pack_command("GET", "AAA")['*2\r\n$3\r\nGET\r\n$3\r\nAAA\r\n']

redis-py-cluster 与 Pipeline

redis-py-cluster 对于 Redis Cluster Pipeline 实现

和单分片的 Redis 服务相比,redis-cluster 有着不少的局限,主要体现在跨分片数据计算,比如 SDIFF、BRPOPLPUSH 这种需要以多个 key 为参数的命令在使用上限制很大,还有就是在事务和 Pipeline 上的差别。

在 Redis 中,事务和 Pipeline 是充分解耦合的,但很多实现确实会把两者结合使用,比如 redis-py中,Pipeline 就默认开启了事务(详见 client.pyRedis 对象的 pipeline 方法)。

但在 redis-py-cluster (3.0.1 版本)的 Pipeline 实现中,完全停止了对事务的支持,甚至尝试通过调用 StrictClusterPipeline 对象实例的 multi 方法时,会直接抛出 ` RedisClusterException("method multi() is not implemented") `

在此我们可以解析 redis-py-cluster 对于 Pipeline 的实现方法,来查看它的底层原理。主要实现在 [ rediscluster/pipeline.py 的 StrictClusterPipeline 对象 send _cluster_ commands 方法,感谢这个库的作者 Grokzen ,这个函数的注释比代码行数还多。

def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):  

# StrictClusterPipeline 对象将 pipeline 中的命令集合存在一个列表中,    

# 作为 stack 参数传入    

# 其实这一行我没太看懂,看起来是进行了排序,但是 position 参数默认是 None    

# 也没有对这个参数进行修改的代码,此处存疑    

attempt = sorted(stack, key=lambda x: x.position)    

# 用于将命令分类,key 为节点名,value 为要执行的命令列表    

nodes = {}    

# 对命令进行分类,逐个判断对应节点,并存储至 nodes    

for c in attempt:        

# 获取参数所在的 slot,对于含有多个 key 的命令,取第一个 key        

slot = self._determine_slot(*c.args)        

# 获取 slot 对应的节点        

node = self.connection_pool.get_node_by_slot(slot)          

# 此处给 node 对象加一个 name 参数,参数值为 “节点ip:port”,其实应该在node对象             # 创建时就应该有这个参数了,原注释也说这是一个“小小的 hack”        self.connection_pool.nodes.set_node_name(node)        

# 这个 node_name 就是上一行运行的结果,否则不会有 `name` 这个 key        

node_name = node['name']        

# 此处将命令逐个append 到 nodes 中对应节点的 value 中        

# 并对每个会涉及的节点创建一个连接        

if node_name not in nodes:            

nodes[node_name]=NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))        

nodes[node_name].append(c)    

# 取出命令,逐个节点发送所有命令

node_commands = nodes.values()

for n in node_commands:

n.write()    

# 逐个节点等待命令返回,其实这一部分有很大优化空间

# 理论上使用 select 可以提升不少性能,当节点很多时,

# 这样收发的效率其实很低

for n in node_commands:

n.read()    

# 释放连接    

for n in nodes.values():        

self.connection_pool.release(n.connection)    

# 当出现错误进行重试    

attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)

if attempt and allow_redirections:        

# 原注释中,作者认为出现了错误需要重试时,应该将正确性提升为最优先要求,        

#为了重试可以牺牲一些性能

self.connection_pool.nodes.increment_reinitialize_counter(len(attempt))

for c in attempt:

try:                

# 逐个命令逐个节点进行收发,不再一口气发送接收所有命令和回复

c.result = super(StrictClusterPipeline, self).execute_command(*c.args, **c.options)            

except RedisError as e:                

c.result = e    

# 其实这个 sorted 好像依然没什么必要= =    

# 将结果依照命令顺序 排序 放进结果列表

response = [c.result for c in sorted(stack, key=lambda x: x.position)]

if raise_on_error:        

# 如果重试后依然有错误,将第一个错误转码抛出

self.raise_first_error(stack)

return response

redis-py-cluster 在 Pipeline 上的局限性

从实现来看,redis-py-cluster 在 Pipeline 的实现上彻底抛弃了对事务的直接支持,当然如果一定要用事务的话,比如说可以确定事务中操作的 key 都在一个 slot 中(对同个 key 多次操作或者使用自定义 tag),还是可以直接使 ` execute_command("MULTI") ` ` execute_command("EXEC") ` 来通过命令进行对于单个 slot 的事务操作。

目前来看最主要的局限性还是在于操作多个分片时的数据安全问题,从实现上来看,当使用 Pipeline 进行的操作涉及多个节点的话,有可能在某些节点成功但是在某些节点失败,对于数据的安全性可能是一个很大的隐患,当出现了这种情况,命令的再次重试也是一个比较麻烦的问题,redis-py-cluster 实现的重试可能并不能满足所有需求。

次要的问题就是性能上并不能发挥出 Redis Cluster 最大性能,因为实现的收发逻辑比较简陋,再加上出错时比较低效的重试方式,Pipeline 中命令涉及的节点越多,Pipeline 对性能的提升就有可能越不明显。这些都是在使用 redis-py-cluster 时应该注意的问题。

RESP3 :redis 团队画的大饼

如前文所示,RESP2 非常的简单,有些地方甚至是有点简陋甚至混乱,用 RESP3 文档中原话来说

RESP3 abandons the confusing wording of the second version of RESP

比如用 ` $-1\r\n ` 表示 NULL,用 ` *-1\r\n ` 示空列表,错误类型直接是一个字符串,协议上对错误格式没有详细规范这些等等,都在 RESP3 的文档中也有提及,并且作者还列出了更多的缺点。

在此我们没有必要花费太多时间去了解 RESP3 的具体规范,毕竟这份协议还没有一个初具规模的具体实现,但是我们可以通过这份规范来尝试推测一下 Redis 团队将来可能会推出的新的功能。

可能会出现更好用的统计集合相关的命令

RESP3 中添加了浮点型和大数(超过64位)的支持, 形似 `,1.23\r\n` ,我推测可能会出现一些比较好用的数字统计功能,比如对一个 LIST 中的数字求平均数或方差标准差,对一个 Sorted SET 中的数据求平均权重,甚至可能支持浮点型的权重值,浮点型权重值是一个相当棒的功能。

而且 RESP3 中还添加了对空值的实现,形如 `_\r\n` ,那有没有可能可以在 Sorted SET 支持元素的权重值为空?比如对于一个新的元素,它的权重值未知,权重值置为 0 又会污染计算结果,那现在有了标准的空值,是否可以对新元素的权重置空呢?从协议上来说这并不是不可能实现。

可能会出现更为规范的错误类型返回

RESP3 提供了新的错误类型 Blob error,和 RESP2 的错误类型相比,它提供了二进制安全的错误信息,和 Bulk String 非常 像,形如 ` !21\r\nSYNTAX invalid syntax\r\n ` ,这对于 Redis 驱动库是一个非常好的消息,这意味这错误信息更为详细,更为可读,更重要的是更为规范。

让我们看下 redis-py 中对于 RESP2 的错误归类:

EXCEPTION_CLASSES = {    
    'ERR': {
        'max number of clients reached': ConnectionError    },
    'EXECABORT': ExecAbortError,
    'LOADING': BusyLoadingError,
    'NOSCRIPT': NoScriptError,
    'READONLY': ReadOnlyError,
}

可以说是相当简陋了。

相比之下,RESP3 的二进制安全错误类型非常值得期待,这对于将来各种 Redis 库甚至于 Redis Cluster 都有重大意义,Redis Cluster 的实现中大量使用 MOVED 错误,新的错误类型或许意味着更加强大的重定向功能,这也许能催生出更加可用的 Redis 中间件。

Redis 可能会成为更泛用的缓存服务器

在文档的 TODO 中,Redis 团队提出了 一个 `Document streaming of big strings` 这意味着 Redis 将来可能会让字符串类型突破 512MB 的限制,这也使得 Redis 能适用于更多的场景,比如大文件缓存服务器,这对于 CDN 之类的服务可能有很大的作用。

Redis 目前的协议规范和实现都有自己的亮点,也有这样那样的缺点,通信协议作为一个服务基础中的基础,必然还会不断的演变,就算将来 Redis 过时了,RESP 作为一个协议依然可能会被继续广泛使用。

本文只是一个大概的描述,很多细节还需要从源代码中细抠,第四部份则是笔者不负责任的开脑洞,博君一笑尔。

关注我们,获一手游戏运维方案

Redis 应用层协议解析以及在 Python 客户端中的实现


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

国家窃听

国家窃听

真溱 / 中信出版社 / 2015-8 / 48.00元

《国家窃听》以轻松而略带调侃的“冷幽默”风格,讲述了美国情报监视帝国大量不为人知的故事。本书以严谨而专业的视角,将“斯诺登事件”放在21世纪以来美国“全球反恐战争”以及美国情报界几十年发展的大背景下进行考察,揭示出这一事件的内在逻辑和历史必然。作者前期搜集、筛选、整理的一手素材在故事叙述过程中清晰而多层次地呈现,令本书堪称一部非虚构的美国情报界演义。一起来看看 《国家窃听》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具