内容简介:RPC 框架在微服务中重要的一部分,熟悉和了解其原理是非常有必要的。Go 语言中源码自带实现了 RPC 功能,虽然官方已经宣布不再更新,但是因它实现简单,代码量不大,很多地方值得学习和借鉴,是阅读 RPC 源码的一个非常好的开始。源码地址:先来看看调用的官方例子:
RPC 框架在微服务中重要的一部分,熟悉和了解其原理是非常有必要的。Go 语言中源码自带实现了 RPC 功能,虽然官方已经宣布不再更新,但是因它实现简单,代码量不大,很多地方值得学习和借鉴,是阅读 RPC 源码的一个非常好的开始。
源码地址: github.com/golang/go/t…
1. 基本使用
先来看看调用的官方例子:
- 服务器部分代码:
// content of server.go package main import( "net" "net/rpc" "net/http" "errors" "log" ) type Args struct { A, B int } type Quotient struct { Quo, Rem int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func (t *Arith) Divide(args *Args, quo *Quotient) error { if args.B == 0 { return errors.New("divide by zero") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func listenTCP(addr string) (net.Listener, string) { l, e := net.Listen("tcp", addr) if e != nil { log.Fatalf("net.Listen tcp :0: %v", e) } return l, l.Addr().String() } func main() { rpc.Register(new(Arith)) //注册服务 var l net.Listener tcpAddr := "127.0.0.1:8080" l, serverAddr := listenTCP(tcpAddr) //监听TCP连接 log.Println("RPC server listening on", serverAddr) go rpc.Accept(l) rpc.HandleHTTP() //监听HTTP连接 httpAddr := "127.0.0.1:8081" l, serverAddr = listenTCP(httpAddr) log.Println("RPC server listening on", serverAddr) go http.Serve(l, nil) select{} } 复制代码
rpc调用的功能就是Arith实现了一个Multiply和Divide方法。 看main函数,rpc实现了一个注册 rpc.Register(new(Arith))
方法,然后启动监听 listenTCP(tcpAddr)
,这个是通过net包中的Listen方法,监听的对象可以是TCP连接 rpc.Accept(l)
,也可以试HTTP连接 http.Serve(l, nil)
,这个是借助net/http包启动HTTPServer.
- 客户端部分代码
// content of client.go package main import( "net/rpc" "log" "fmt" ) type Args struct { A, B int } type Quotient struct { Quo, Rem int } func main() { client, err := rpc.DialHTTP("tcp", "127.0.0.1:8081") if err != nil { log.Fatal("dialing:", err) } // Synchronous call args := &Args{7,8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("arith error:", err) } fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply) // Asynchronous call clientTCP, err := rpc.Dial("tcp", "127.0.0.1:8080") if err != nil { log.Fatal("dialing:", err) } quotient := new(Quotient) divCall := clientTCP.Go("Arith.Divide", args, quotient, nil) replyCall := <-divCall.Done // will be equal to divCall if replyCall.Error != nil { fmt.Println(replyCall.Error) } else { fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem) } 复制代码
客户端代码rpc 提供了两个方法 rpc.DialHTTP
和 rpc.Dial
分别提供监听 HTTP 和 Tcp 连接。然后通过 Call
或者 Go
来调用服务器的方法,二者的区别是一个是同步调用, Go
是异步调用。
运行结果:
// server.go ➜ server ./serve 2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8080 2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8081 复制代码
// client.go ➜ client ./client Arith: 7*8=56 Arith: 7/8=0...7 复制代码
2.client.go 源码分析
先来看看客户端的源码,先上一张图了解一下客户端代码的主要逻辑:
-
Dial
andDialHTTP
// Dial connects to an RPC server at the specified network address. func Dial(network, address string) (*Client, error) { conn, err := net.Dial(network, address) if err != nil { return nil, err } return NewClient(conn), nil } 复制代码
Dial
建立在 net.Dial 上,返回一个client对象, DialHTTP
跟 Dial
类似,只不过多了一些HTTP的处理,最终都是返回 NewClient(conn)。
-
NewClient
func NewClient(conn io.ReadWriteCloser) *Client { encBuf := bufio.NewWriter(conn) client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} return NewClientWithCodec(client) } // NewClientWithCodec is like NewClient but uses the specified // codec to encode requests and decode responses. func NewClientWithCodec(codec ClientCodec) *Client { client := &Client{ codec: codec, pending: make(map[uint64]*Call), } go client.input() return client } 复制代码
NewClient
里做了2件事,第一件事是生成client结构体对象,包括序列化方式,初始化其中对象等等,Go Rpc默认采用的是gob序列化,但也可以用json或者protobuf。第二件事是启动一个goroutine协程,调用了 input
方法,这个client的核心部分,下面再讲。
-
Call
andGo
上面例子中,生成client对象后,会显式的调用Call
或Go
,表示同步调用和异步调用。下面来看看源码:
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { call := new(Call) call.ServiceMethod = serviceMethod call.Args = args call.Reply = reply if done == nil { done = make(chan *Call, 10) // buffered. } else { if cap(done) == 0 { log.Panic("rpc: done channel is unbuffered") } } call.Done = done client.send(call) return call } // Call invokes the named function, waits for it to complete, and returns its error status. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done return call.Error } 复制代码
可以看到, client.Call
方法其实也是调用 client.Go
,只不过通过 chan
进行阻塞。
生成一个Call的结构体,将服务器的调用方法、参数、返回参数,调用结束标记进行组装,然后调用 client.send
的方法,将call结构体发给服务器。服务器拿到这些参数后,会通过反射出具体的方法,然后执行对应的函数。 下面是 Call
结构体的定义:
// Call type Call struct { ServiceMethod string // The name of the service and method to call. 服务方法名 Args interface{} // The argument to the function (*struct). 请求参数 Reply interface{} // The reply from the function (*struct). 返回参数 Error error // After completion, the error status. 错误状态 Done chan *Call // Strobes when call is complete. } 复制代码
-
client.send
func (client *Client) send(call *Call) { client.reqMutex.Lock() defer client.reqMutex.Unlock() // Register this call. client.mutex.Lock() if client.shutdown || client.closing { client.mutex.Unlock() call.Error = ErrShutdown call.done() return } seq := client.seq client.seq++ client.pending[seq] = call client.mutex.Unlock() // Encode and send the request. client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod err := client.codec.WriteRequest(&client.request, call.Args) if err != nil { client.mutex.Lock() call = client.pending[seq] delete(client.pending, seq) client.mutex.Unlock() if call != nil { call.Error = err call.done() } } } 复制代码
send 方法是将刚才的 call
结构体中的信息发给服务器,首先数据不是直接发给服务器的,而是将请求参数和服务器的方法先赋值给client结构体中的Request结构体,同时在赋值的过程需要加锁。然后再调用Gob的WriteRequest方法,将数据刷到缓存区。
-
client.input
client.send
方法是将数据发给Server,而input则相反,获取Server的返回结果Response给客户端。
func (client *Client) input() { var err error var response Response for err == nil { response = Response{} err = client.codec.ReadResponseHeader(&response) if err != nil { break } seq := response.Seq client.mutex.Lock() call := client.pending[seq] delete(client.pending, seq) client.mutex.Unlock() switch { case call == nil: err = client.codec.ReadResponseBody(nil) .... } call.done() } } ... } } 复制代码
主要逻辑是不断循环读取TCP上的流,把Header解析成Response对象,以及将Body解析到call.Reply对象,解析完后触发call中的done函数。这样客户端就可以拿到Reply对象就是服务器返回的结果,可以打印获取其中的值。
总结:
描述完这几个方法,在回头看开始的 client.go
的流程图就清晰了,可以说是分两条线,一条线显示的调用发送请求数据,另外一条线则起协程获取服务器的返回数据。
3. server.go 源码分析
话不多说,先来一张图了解一下大概:
整体分三部分,第一部分注册服务器定义的方法,第二部分监听客户端的请求,解析获取到客户端的请求参数。第三部分拿到请求参数执行服务器的调用函数,将返回结果返回给客户端。
整个过程其实可以对比是一次socket的调用过程。
-
register
首先来看一下server的结构体:
type methodType struct { sync.Mutex // protects counters method reflect.Method ArgType reflect.Type ReplyType reflect.Type numCalls uint } type service struct { name string // name of service rcvr reflect.Value // receiver of methods for the service typ reflect.Type // type of the receiver method map[string]*methodType // registered methods } type Server struct { serviceMap sync.Map // map[string]*service reqLock sync.Mutex // protects freeReq freeReq *Request respLock sync.Mutex // protects freeResp freeResp *Response } 复制代码
看英语注释就比起清楚具体是做什么的,Server存储服务器的service以及其请求的Request和Response,这二个就是跟客户端约定的协议,如下:
type Request struct { ServiceMethod string // format: "Service.Method" Seq uint64 // sequence number chosen by client next *Request // for free list in Server } type Response struct { ServiceMethod string // echoes that of the Request Seq uint64 // echoes that of the request Error string // error, if any. next *Response // for free list in Server } 复制代码
service 存储服务器需要注册的方法,methodType就是具体方法的属性。
所以要想客户端进行远程调用服务器的方法,前提是在调用之前,服务器的方法均已加载在Server结构体中,所以需要服务器显示的调用register方法,下面看一下里面核心的代码:
func (server *Server) register(rcvr interface{}, name string, useName bool) error { s := new(service) s.typ = reflect.TypeOf(rcvr) s.rcvr = reflect.ValueOf(rcvr) sname := reflect.Indirect(s.rcvr).Type().Name() ... s.name = sname // Install the methods s.method = suitableMethods(s.typ, true) ... if _, dup := server.serviceMap.LoadOrStore(sname, s); dup { ... } ... } func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType { methods := make(map[string]*methodType) for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) mtype := method.Type mname := method.Name argType := mtype.In(1) ... replyType := mtype.In(2) ... methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType} } return methods } 复制代码
这段代码就是通过反射把结构体实现的方法的一些属性获取到,包括本身可执行的方法对象、名称、请求参数、返回参数。
最终存储到server的serviceMap中。 客户端调用服务器的方法的结构为 struct.method,这样只需要按 . 进行分割,拿到struct名称和method名称则可以通过再serviceMap获取到方法,执行获得结果。
注册完方法后,接下来就是监听客户端的请求了。
-
Accept
先来看看 Accept
的代码:
func (server *Server) Accept(lis net.Listener) { for { conn, err := lis.Accept() if err != nil { log.Print("rpc.Serve: accept:", err.Error()) return } go server.ServeConn(conn) 复制代码
通过 net 包中的监听tcp端口,然后起了一个协程,来看看这个协程里做了什么?
func (server *Server) ServeConn(conn io.ReadWriteCloser) { buf := bufio.NewWriter(conn) srv := &gobServerCodec{ rwc: conn, dec: gob.NewDecoder(conn), enc: gob.NewEncoder(buf), encBuf: buf, } server.ServeCodec(srv) } func (server *Server) ServeCodec(codec ServerCodec) { sending := new(sync.Mutex) wg := new(sync.WaitGroup) for { service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) ... go service.call(server, sending, wg, mtype, req, argv, replyv, codec) } ... } 复制代码
这段也好理解,ServeConn 将gob序列化的方法和conn保存到gobServerCodec结构体,然后调用了server.ServeCodec方法,这个方式做的事情就是将客户端传过来的包解析序列化解析,将请求参数,待返回的变量,以及是调服务器哪个方法,这些均在上面的 server.readRequest方法处理。
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) { service, mtype, req, keepReading, err = server.readRequestHeader(codec) ... } func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) { // Grab the request header. req = server.getRequest() ... dot := strings.LastIndex(req.ServiceMethod, ".") ... serviceName := req.ServiceMethod[:dot] methodName := req.ServiceMethod[dot+1:] // Look up the request. svci, ok := server.serviceMap.Load(serviceName) ... svc = svci.(*service) mtype = svc.method[methodName] ... } return } 复制代码
核心的功能再 readRequestHeader 中,做的一件事就是将客户端传过来的 struct.method,按 . 进行分割,然后拿到serviceName和methodName,然后再去server.serviceMap中拿到具体的服务和方法执行对象。
拿到之后,会起一个协程,调service.call方法,这里面做的事情就是执行服务器服务的方法,拿到返回结果,再调用WriteReponse,将数据写回去。然后客户端的 input 方法循环获取结果。这样形成闭环。
下面看看service.call方法:
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) { ... function := mtype.method.Func returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) ... server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) ... } 复制代码
实现的功能跟上面分析的一样,通过mtype拿到函数对象,然后调用反射的Call方法执行得到结果,最后调用server.sendResponse发送发回结果。
看到这里再回过来看上面画的Server代码流程图,就非常清晰了。
Go Rpc源码解读就到这里。
4. 总结
Go RPC源码目前官方已经没有维护,官方推荐使用grpc,下一篇计划分析grpc的源码。
下面总结一下优缺点:
- 优点:
- 代码精简,可扩展性高。
- 缺点:
- 同步调用时,通过chan阻塞异步的 Go 方法,并没有处理超时,这样如果超时将导致大量的协程无法释放。
- 可能存在内存泄漏的情况,因为客户端的请求数据在Server结构体中,如果Server端不返回则不会清理其中的数据,客户端的Go函数退出并不会清理其中的内容,所以Server结构体会一直存储,从而内存泄漏。
目前开源的RPC框架已经不是像这种简单的网络调用了,还会包括很多服务治理的功能,比如服务注册与发现、限流熔断、监控等等。这个以后接触新的rpc再分享,最终达到可以自己完整写一个rpc框架的目的。
更多Rpc相关文章和讨论,请关注公众号:『 天澄技术杂谈 』
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Phoenix解读 | Phoenix源码解读之索引
- Phoenix解读 | Phoenix源码解读之SQL
- Redux 源码解读 —— 从源码开始学 Redux
- AQS源码详细解读
- SDWebImage源码解读《一》
- MJExtension源码解读
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Effective JavaScript
David Herman / Addison-Wesley Professional / 2012-12-6 / USD 39.99
"It's uncommon to have a programming language wonk who can speak in such comfortable and friendly language as David does. His walk through the syntax and semantics of JavaScript is both charming and h......一起来看看 《Effective JavaScript》 这本书的介绍吧!