python grpc bidirectional stream 的坑

栏目: 服务器 · 发布时间: 6年前

内容简介:gRPC 除了最常见的 Unary Call,也提供了三种 stream 模式,今天我们就来试试其中的 Bidirectional Stream……首先定义一下接口协议:然后生成对应的 pb 文件:

gRPC 除了最常见的 Unary Call,也提供了三种 stream 模式,今天我们就来试试其中的 Bidirectional Stream……

Proto

首先定义一下接口协议:

syntax = "proto3";
 
package schema;
 
service Gateway {
    rpc Call(stream Request) returns (stream Response){}
}
 
message Request {
    int64 num = 1;
}
 
message Response {
    int64 num = 1;
}

然后生成对应的 pb 文件:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. schema.proto

这样子就生成了 schema_pb2.py 和 schema_pb2_grpc.py 两个文件。

Server

接着我们来写一个简单的Server:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import grpc
import time
import schema_pb2
import schema_pb2_grpc
from concurrent import futures
 
 
class GatewayServer(schema_pb2_grpc.GatewayServicer):
 
    def Call(self, request_iterator, context):
        for req in request_iterator:
            yield schema_pb2.Response(num=req.num+1)
            time.sleep(1)
 
 
def main():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    schema_pb2_grpc.add_GatewayServicer_to_server(GatewayServer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        server.stop(0)
 
 
if __name__ == "__main__":
    main()

功能很简单,每收到一个请求,就对 num+1 后返回。

Client

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
 
import grpc
import Queue
import schema_pb2
import schema_pb2_grpc
 
queue = Queue.Queue()
 
 
def main():
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = schema_pb2_grpc.GatewayStub(channel)
        queue.put(1)
        resp = stub.Call(generate_message())
        for r in resp:
            num = r.num
            queue.put(num)
 
 
def generate_message():
    while True:
        num = queue.get()
        print num
        yield schema_pb2.Request(num=num)
 
 
if __name__ == "__main__":
    main()

在 Call() 中传入一个生成器 generate_message(),它不断地从队列 queue 中读取数据后发送给 Server,而 main() 则将 Server 返回的数据写入到 queue 中。

所以,结合 Server 和 Client 的代码,这就是一个很简单的计数器,不断地进行 +1 的操作。

Exception

正常情况下,这两段示例代码看起来并无异常。然而,在网络或者服务异常的情况下会是怎样子的呢?

我们首先让Server和Client都正常跑起来,然后试试重启Server,结果可以看到Client报错:

Traceback (most recent call last):
  File "client.py", line 31, in <module>
    main()
  File "client.py", line 18, in main
    for r in resp:
  File "/Users/jachua/virtualenvs/pyenv/lib/python2.7/site-packages/grpc/_channel.py", line 367, in next
    return self._next()
  File "/Users/jachua/virtualenvs/pyenv/lib/python2.7/site-packages/grpc/_channel.py", line 358, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "Received RST_STREAM with error code 2"
        debug_error_string = "{"created":"@1546759917.268606000","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1017,"grpc_message":"Received RST_STREAM with error code 2","grpc_status":13}"
>

不同的网络情况下,可能还会出现其他类型的报错,比如:

status = StatusCode.UNKNOWN
details = "Stream closed"

在某些时候,我们可能不希望 Client 就这样子直接退出了,而是能够自动重新连上Server,接着处理数据(比如聊天)。

于是,我们就来简单地修改下 Client 的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
 
import time
import grpc
import Queue
import schema_pb2
import schema_pb2_grpc
 
queue = Queue.Queue()
 
 
def main():
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = schema_pb2_grpc.GatewayStub(channel)
        while True:
            try:
                queue.put(1)
                resp = stub.Call(generate_message())
                for r in resp:
                    num = r.num
                    queue.put(num)
            except grpc.RpcError as e:
                print "stream call err, code: %s, msg: %s" % (e.code(), e.details())
                time.sleep(1)
 
 
def generate_message():
    while True:
        num = queue.get()
        print num
        yield schema_pb2.Request(num=num)
 
 
if __name__ == "__main__":
    main()

简单的添加了一个 while 循环,并捕获 grpc.RpcError 的报错,这样子看起来似乎没什么问题?

接着上一步,我们来测试一下 client,启动正常。然后再重启一下 server,这时候问题来了:

$ python client.py
// 正常输出
1
2
3
// 这时候服务器重启,出现了一个报错
stream call err, code: StatusCode.INTERNAL, msg: Received RST_STREAM with error code 2
// 卡住了,只输出了第一个1
1

而在 server 这一边,我们也没有看到任何后续请求,为啥咧?

问题就出在队列 queue 上!

当 server 重启时,client 报错后重新调用 stub.Call() 会新开启一个线程来执行 generate_message(),这时候就会有两个 generate_message() 的线程同时从 queue 中读取数据。而且,第一个线程把数据从 queue 获取后,由于该线程所属的stream连接已经断开了,并不能把数据发送给 server;而第二个线程虽然连接正常,但却阻塞在 queue.get() 。

因此,generate_message() 中也存在线程泄露的问题。如果我们在代码中用 threading.active_count() 将可以看到线程的数量越来越多。

Fix

弄清楚了上面的原因,我们就可以很容易再次修改 client 了:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
 
import time
import grpc
import Queue
import schema_pb2
import schema_pb2_grpc
 
 
def main():
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = schema_pb2_grpc.GatewayStub(channel)
        while True:
            queue = Queue.Queue()
            queue.put(1)
            try:
                resp = stub.Call(generate_message(queue))
                for r in resp:
                    num = r.num
                    queue.put(num)
            except grpc.RpcError as e:
                print "stream call err, code: %s, msg: %s" % (e.code(), e.details())
            except Exception as e:
                print "unknown err:", e
            finally:
                queue.put("exit")
                time.sleep(1)
 
 
def generate_message(queue):
    while True:
        num = queue.get()
        if num == "exit":
            return
        print num
        yield schema_pb2.Request(num=num)
 
 
if __name__ == "__main__":
    main()

我们在 client 报错后往 queue 中写入了一个 “exit” 标志,让 generage_message() 的线程能够正常退出。

Finally

虽然上面的例子看起来很简单,并且异常似乎也很容易排查。但其实在实际业务中,从 client 到 server 整条链路包含了内网网、负载均衡、反向代理,从表面现象定位到最终的代码问题,却也花费了不少时间。而恰好是这个卡住的问题,也发现了 generage_message() 导致的线程泄露的问题,刚好就一并解决了。


以上所述就是小编给大家介绍的《python grpc bidirectional stream 的坑》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Music Recommendation and Discovery

Music Recommendation and Discovery

Òscar Celma / Springer / 2010-9-7 / USD 49.95

With so much more music available these days, traditional ways of finding music have diminished. Today radio shows are often programmed by large corporations that create playlists drawn from a limited......一起来看看 《Music Recommendation and Discovery》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

html转js在线工具
html转js在线工具

html转js在线工具