内容简介:RPC是远程过程调用(Remote Procedure Call)的简称,通过RPC我们可以像调用本地方法一样调用位于其他位置的函数。大家更常见的可能是HTTP API调用,简单来对比的话,RPC比起HTTP调用封装更完善,调用者不必手动处理序列化和反序列化,使用成本更低一些(虽然学习成本可能会更高)。出于学习目的,这次的目标是使用go语言来实现一个自己的RPC。在现实世界里,对于一个RPC工具,除了方法调用以外,人们更看重的是其他功能比如服务发现、负载均衡、熔断降级之类的功能,这里暂时不会涉及,而是仅关注
RPC是远程过程调用(Remote Procedure Call)的简称,通过RPC我们可以像调用本地方法一样调用位于其他位置的函数。大家更常见的可能是HTTP API调用,简单来对比的话,RPC比起HTTP调用封装更完善,调用者不必手动处理序列化和反序列化,使用成本更低一些(虽然学习成本可能会更高)。
出于学习目的,这次的目标是使用 go 语言来实现一个自己的RPC。在现实世界里,对于一个RPC工具,除了方法调用以外,人们更看重的是其他功能比如服务发现、负载均衡、熔断降级之类的功能,这里暂时不会涉及,而是仅关注实现一个可以工作的方法调用。
在之前的文章里大致了解了go语言自带的rpc框架,其中就提到go rpc预留了codec接口,可以让用户在go rpc使用自己的序列化协议,这次就尝试实现一个自己的codec来实现自己的RPC。
准备工作
序列化协议
要实现一个RPC,基本的元素大概有这几个:序列化协议、网络模型和线程模型。而go rpc里的codec基本上实现的就是序列化协议。
本来想着用比较熟悉的thrift协议,但是使用thrift本身实现了RPC流程,所以它并不是一个单纯的序列化协议,它的序列化逻辑可能无法和go rpc很好的契合,再加上还需要书写IDL定义,增加复杂度。本来就是为了熟悉go,所以这里先从简单的开始,于是选择messagepack作为序列化协议。
messagepack是一个比较轻量级的序列化协议,它的逻辑和json类似,但是使用的是二进制形式,所以比json序列化更快,序列化后产生的数据也更小,基本上可以认为是一个二进制版本的json。
创建类定义
要实现自己的codec,需要分别实现go rpc中提供个两个接口:ServerCodec和ClientCodec,很明显他们分别表示服务端和客户端的逻辑,两个接口的定义具体如下:
type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(interface{}) error WriteResponse(*Response, interface{}) error Close() error } type ClientCodec interface { WriteRequest(*Request, interface{}) error ReadResponseHeader(*Response) error ReadResponseBody(interface{}) error Close() error } 复制代码
可以看到,go rpc将一次请求/响应抽象成了header+body的形式,读取数据时分为读取head和读取body,写入数据时只需写入body部分,go rpc会替我们加上head部分。 接下来我们定义两个结构,用来表示一次请求/响应的完整数据:
type MsgpackReq struct { rpc.Request //head Arg interface{} //body } type MsgpackResp struct { rpc.Response //head Reply interface{} //body } 复制代码
这里的msgpackReq和msgpackResp直接内嵌了go rpc里自带的Request和Response,自带的Request和Response定义了序号、方法名等信息。
接下来就是自定义Codec的声明:
type MessagePackServerCodec struct { rwc io.ReadWriteCloser //用于读写数据,实际是一个网络连接 req MsgpackReq //用于缓存解析到的请求 closed bool //标识codec是否关闭 } type MessagePackClientCodec struct { rwc io.ReadWriteCloser resp MsgpackResp //用于缓存解析到的请求 closed bool } func NewServerCodec(conn net.Conn) *MessagePackServerCodec { return &MessagePackServerCodec{conn, MsgpackReq{}, false} } func NewClientCodec(conn net.Conn) *MessagePackClientCodec { return &MessagePackClientCodec{conn, MsgpackResp{}, false} } 复制代码
在之前的文章里提到了,codec需要包含一个数据源用于读写数据,这里直接将网路连接传递进去。
实现Codec方法
实现思路
接下来是具体的方法实现,处于简单起见,这里将反序列化部分的两步合并为一步,在读取head部分时就将所有的数据解析好并缓存起来,读取body时直接返回缓存的结果。具体的思路就是:
- 客户端在发送请求时,将数据包装成一个MsgpackReq,然后用messagepack序列化并发送出去
- 服务端在读取请求head部分时,将收到的数据用messagepack反序列化成一个MsgpackReq,并将得到的结果缓存起来
- 服务端在读取请求body部分时,从缓存的MsgpackReq中获取到Arg字段并返回
- 服务端在发送响应时,将数据包装成一个MsgpackResp,然后用messagepack序列化并发送出去
- 客户端在读取响应head部分时,将收到的数据用messagepack反序列化成一个MsgpackResp,并将得到的结果缓存起来
- 客户端在读取响应body部分时,从缓存的MsgpackResp中获取到Reply或者Error字段并返回
Client实现
这里直接上代码:
func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error { //先判断codec是否已经关闭,如果是则直接返回 if c.closed { return nil } //将r和arg组装成一个MsgpackReq并序列化 request := &MsgpackReq{*r, arg} reqData, err := msgpack.Marshal(request) if err != nil { panic(err) return err } //先发送数据长度 head := make([]byte, 4) binary.BigEndian.PutUint32(head, uint32(len(reqData))) _, err = c.rwc.Write(head) //再将序列化产生的数据发送出去 _, err = c.rwc.Write(reqData) return err } func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error { //先判断codec是否已经关闭,如果是则直接返回 if c.closed { return nil } //读取数据 data, err := readData(c.rwc) if err != nil { //client一旦初始化就会开始轮询数据,所以要处理连接close的情况 if strings.Contains(err.Error(), "use of closed network connection") { return nil } panic(err) //简单起见,出现异常直接panic } //将读取到的数据反序列化成一个MsgpackResp var response MsgpackResp err = msgpack.Unmarshal(data, &response) if err != nil { panic(err) //简单起见,出现异常直接panic } //根据读取到的数据设置request的各个属性 r.ServiceMethod = response.ServiceMethod r.Seq = response.Seq //同时将读取到的数据缓存起来 c.resp = response return nil } func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error { //这里直接用缓存的数据返回即可 if "" != c.resp.Error {//如果返回的是异常 return errors.New(c.resp.Error) } if reply != nil { //正常返回,通过反射将结果设置到reply变量,因为reply一定是指针类型,所以不必检查CanSet reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply)) } return nil } func (c *MessagePackClientCodec) Close() error { c.closed = true //关闭时将closed设置为true if c.rwc != nil { return c.rwc.Close() } return nil } 复制代码
以上就是client部分的实现,值得注意的有几点:
- 读写数据前,需要检查codec是否已经关闭了
- 读写数据时需要处理拆包粘包(通过readData函数处理)
Server实现
同样直接上代码:
func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error { //先判断codec是否已经关闭,如果是则直接返回 if c.closed { return nil } //将r和reply组装成一个MsgpackResp并序列化 response := &MsgpackResp{*r, reply} respData, err := msgpack.Marshal(response) if err != nil { panic(err) return err } head := make([]byte, 4) binary.BigEndian.PutUint32(head, uint32(len(respData))) _, err = c.rwc.Write(head) //将序列化产生的数据发送出去 _, err = c.rwc.Write(respData) return err } func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error { //先判断codec是否已经关闭,如果是则直接返回 if c.closed { return nil } //读取数据 data, err := readData(c.rwc) if err != nil { //这里不能直接panic,需要处理EOF和reset的情况 if err == io.EOF { return err } if strings.Contains(err.Error(), "connection reset by peer") { return err } panic(err) //其他异常直接panic } //将读取到的数据反序列化成一个MsgpackReq var request MsgpackReq err = msgpack.Unmarshal(data, &request) if err != nil { panic(err) //简单起见,出现异常直接panic } //根据读取到的数据设置request的各个属性 r.ServiceMethod = request.ServiceMethod r.Seq = request.Seq //同时将解析到的数据缓存起来 c.req = request return nil } func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error { if arg != nil { //参数不为nil,通过反射将结果设置到arg变量 reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg)) } return nil } func (c *MessagePackServerCodec) Close() error { c.closed = true if c.rwc != nil { return c.rwc.Close() } return nil } 复制代码
实际上server端的实现几乎和client端逻辑的一样,只是request和response的角色不同而已。其中有几点需要注意:
- server端读取数据时需要处理EOF和连接reset的情况
- server在返回数据时没有显式处理接口产生的error,只是将reply传递了回去,这是因为error在rpc.Request里存着,不用codec处理
处理拆包粘包
具体思路参考 go语言处理TCP拆包/粘包 ,这里附上readData的实现:
func readData(conn io.ReadWriteCloser) (data []byte, returnError error) { const HeadSize = 4 //设定长度部分占4个字节 headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize)) headData := make([]byte, HeadSize) for { readSize, err := conn.Read(headData) if err != nil { returnError = err return } headBuf.Write(headData[0:readSize]) if headBuf.Len() == HeadSize { break } else { headData = make([]byte, HeadSize-readSize) } } bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes())) bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen)) bodyData := make([]byte, bodyLen) for { readSize, err := conn.Read(bodyData) if err != nil { returnError = err return } bodyBuf.Write(bodyData[0:readSize]) if bodyBuf.Len() == bodyLen { break } else { bodyData = make([]byte, bodyLen-readSize) } } data = bodyBuf.Bytes() returnError = nil return } 复制代码
测试代码
接下来我们通过简单的Echo调用测试一下我们的codec:
//声明接口类 type EchoService struct {} //定义方法Echo func (service *EchoService) Echo(arg string, result *string) error { *result = arg return nil } //服务端启动逻辑 func RegisterAndServeOnTcp() { err := rpc.Register(&EchoService{})//注册并不是注册方法,而是注册EchoService的一个实例 if err != nil { log.Fatal("error registering", err) return } tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234") if err != nil { log.Fatal("error resolving tcp", err) } listener, err := net.ListenTCP("tcp", tcpAddr) for { conn, err := listener.Accept() if err != nil { log.Fatal("error accepting", err) } else { //这里先通过NewServerCodec获得一个实例,然后调用rpc.ServeCodec来启动服务 rpc.ServeCodec(msgpk.NewServerCodec(conn)) } } } //客户端调用逻辑 func CallEcho(method string, arg interface{}) (result interface{}, err error) { var client *rpc.Client conn, err := net.Dial("tcp", ":1234") client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn)) defer func() { conn.Close() client.Close() }() if err != nil { return "", err } err = client.Call(method, arg, &result) //通过类型加方法名指定要调用的方法 if err != nil { return "", err } return result, err } //main函数 func main() { go server.RegisterAndServeOnTcp() //先启动服务端 time.Sleep(1e9) wg := new(sync.WaitGroup) //waitGroup用于阻塞主线程防止提前退出 callTimes := 10 wg.Add(callTimes) for i := 0; i < callTimes; i++ { go func() { //使用hello world加一个随机数作为参数 argString := "hello world "+strconv.Itoa(rand.Int()) resultString, err := client.Echo(argString) if err != nil { log.Fatal("error calling:", err) } if resultString != argString { fmt.Println("error") } else { fmt.Printf("echo:%s\n", resultString) } wg.Done() }() } wg.Wait() } 复制代码
上面的例子里首先通过go server.RegisterAndServeOnTcp()启动了服务端,然后同时启动了10个go routine来发起请求,客户端在收到响应之后会打印对应的结果。最后执行main函数,控制台会输出结果(后面的随机数可能会不同):
echo:hello world 8674665223082153551 echo:hello world 6129484611666145821 echo:hello world 5577006791947779410 echo:hello world 605394647632969758 echo:hello world 4037200794235010051 echo:hello world 3916589616287113937 echo:hello world 894385949183117216 echo:hello world 1443635317331776148 echo:hello world 2775422040480279449 echo:hello world 6334824724549167320 复制代码
结语
到这里,一个简单的自定义的go语言rpc就已经完成了,虽然自定义部分只有序列化协议部分而已,比如线程模型仍是go rpc自带的逻辑,除此之外也没有前言里提到的各种高级功能。后续再考虑尝试用go语言从零开始实现一个RPC吧。
以上所述就是小编给大家介绍的《go语言实现自己的RPC:go rpc codec》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
GitHub入门与实践
[日] 大塚弘记 / 支鹏浩、刘斌 / 人民邮电出版社 / 2015-7 / 39.00元
本书从Git的基本知识和操作方法入手,详细介绍了GitHub的各种功能,GitHub与其他工具或服务的协作,使用GitHub的开发流程以及如何将GitHub引入到企业中。在讲解GitHub的代表功能Pull Request时,本书专门搭建了供各位读者实践的仓库,邀请各位读者进行Pull Request并共同维护。一起来看看 《GitHub入门与实践》 这本书的介绍吧!