微服务系列(一):Go Rpc 源码解读

栏目: Go · 发布时间: 5年前

内容简介:RPC 框架在微服务中重要的一部分,熟悉和了解其原理是非常有必要的。Go 语言中源码自带实现了 RPC 功能,虽然官方已经宣布不再更新,但是因它实现简单,代码量不大,很多地方值得学习和借鉴,是阅读 RPC 源码的一个非常好的开始。源码地址:先来看看调用的官方例子:

RPC 框架在微服务中重要的一部分,熟悉和了解其原理是非常有必要的。Go 语言中源码自带实现了 RPC 功能,虽然官方已经宣布不再更新,但是因它实现简单,代码量不大,很多地方值得学习和借鉴,是阅读 RPC 源码的一个非常好的开始。

源码地址: github.com/golang/go/t…

1. 基本使用

先来看看调用的官方例子:

  1. 服务器部分代码:
// content of server.go
package main

import(
    "net"
    "net/rpc"
    "net/http"
    "errors"
    "log"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

func listenTCP(addr string) (net.Listener, string) {
    l, e := net.Listen("tcp", addr)
    if e != nil {
        log.Fatalf("net.Listen tcp :0: %v", e)
    }
    return l, l.Addr().String()
}

func main() {
    rpc.Register(new(Arith)) //注册服务
    var l net.Listener
    tcpAddr := "127.0.0.1:8080"
    l, serverAddr := listenTCP(tcpAddr) //监听TCP连接
    log.Println("RPC server listening on", serverAddr)
    go rpc.Accept(l)

    rpc.HandleHTTP() //监听HTTP连接
    httpAddr := "127.0.0.1:8081"
    l, serverAddr = listenTCP(httpAddr)
    log.Println("RPC server listening on", serverAddr)
    go http.Serve(l, nil)

    select{}
}
复制代码

rpc调用的功能就是Arith实现了一个Multiply和Divide方法。 看main函数,rpc实现了一个注册 rpc.Register(new(Arith)) 方法,然后启动监听 listenTCP(tcpAddr) ,这个是通过net包中的Listen方法,监听的对象可以是TCP连接 rpc.Accept(l) ,也可以试HTTP连接 http.Serve(l, nil) ,这个是借助net/http包启动HTTPServer.

  1. 客户端部分代码
// content of client.go
package main

import(
    "net/rpc"
    "log"
    "fmt"
)

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

func main() {
    client, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
    if err != nil {
        log.Fatal("dialing:", err)
    }

    // Synchronous call
    args := &Args{7,8}
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)

    // Asynchronous call
    clientTCP, err := rpc.Dial("tcp", "127.0.0.1:8080")
    if err != nil {
        log.Fatal("dialing:", err)
    }
    
    quotient := new(Quotient)
    divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
    replyCall := <-divCall.Done    // will be equal to divCall
    if replyCall.Error != nil {
        fmt.Println(replyCall.Error)
    } else {
        fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
    }
复制代码

客户端代码rpc 提供了两个方法 rpc.DialHTTPrpc.Dial 分别提供监听 HTTP 和 Tcp 连接。然后通过 Call 或者 Go 来调用服务器的方法,二者的区别是一个是同步调用, Go 是异步调用。

运行结果:

// server.go
➜  server ./serve
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8080
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8081
复制代码
// client.go
➜  client ./client
Arith: 7*8=56
Arith: 7/8=0...7
复制代码

2.client.go 源码分析

先来看看客户端的源码,先上一张图了解一下客户端代码的主要逻辑:

微服务系列(一):Go Rpc 源码解读
  1. Dial and DialHTTP
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
	conn, err := net.Dial(network, address)
	if err != nil {
		return nil, err
	}
	return NewClient(conn), nil
}
复制代码

Dial 建立在 net.Dial 上,返回一个client对象, DialHTTPDial 类似,只不过多了一些HTTP的处理,最终都是返回 NewClient(conn)。

  1. NewClient
func NewClient(conn io.ReadWriteCloser) *Client {
	encBuf := bufio.NewWriter(conn)
	client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
	return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
	client := &Client{
		codec:   codec,
		pending: make(map[uint64]*Call),
	}
	go client.input()
	return client
}
复制代码

NewClient 里做了2件事,第一件事是生成client结构体对象,包括序列化方式,初始化其中对象等等,Go Rpc默认采用的是gob序列化,但也可以用json或者protobuf。第二件事是启动一个goroutine协程,调用了 input 方法,这个client的核心部分,下面再讲。

  1. Call and Go 上面例子中,生成client对象后,会显式的调用 CallGo ,表示同步调用和异步调用。下面来看看源码:
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	client.send(call)
	return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}
复制代码

可以看到, client.Call 方法其实也是调用 client.Go ,只不过通过 chan 进行阻塞。

生成一个Call的结构体,将服务器的调用方法、参数、返回参数,调用结束标记进行组装,然后调用 client.send 的方法,将call结构体发给服务器。服务器拿到这些参数后,会通过反射出具体的方法,然后执行对应的函数。 下面是 Call 结构体的定义:

// Call 
type Call struct {
	ServiceMethod string      // The name of the service and method to call. 服务方法名
	Args          interface{} // The argument to the function (*struct). 请求参数
	Reply         interface{} // The reply from the function (*struct). 返回参数
	Error         error       // After completion, the error status. 错误状态
	Done          chan *Call  // Strobes when call is complete. 
}
复制代码
  1. client.send
func (client *Client) send(call *Call) {
	client.reqMutex.Lock()
	defer client.reqMutex.Unlock()
	// Register this call.
	client.mutex.Lock()
	if client.shutdown || client.closing {
		client.mutex.Unlock()
		call.Error = ErrShutdown
		call.done()
		return
	}
	seq := client.seq
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()
	// Encode and send the request.
	client.request.Seq = seq
	client.request.ServiceMethod = call.ServiceMethod
	err := client.codec.WriteRequest(&client.request, call.Args)
	if err != nil {
		client.mutex.Lock()
		call = client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}
复制代码

send 方法是将刚才的 call 结构体中的信息发给服务器,首先数据不是直接发给服务器的,而是将请求参数和服务器的方法先赋值给client结构体中的Request结构体,同时在赋值的过程需要加锁。然后再调用Gob的WriteRequest方法,将数据刷到缓存区。

  1. client.input client.send 方法是将数据发给Server,而input则相反,获取Server的返回结果Response给客户端。
func (client *Client) input() {
	var err error
	var response Response
	for err == nil {
		response = Response{}
		err = client.codec.ReadResponseHeader(&response)
		if err != nil {
			break
		}
		seq := response.Seq
		client.mutex.Lock()
		call := client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()

		switch {
		case call == nil:
			
			err = client.codec.ReadResponseBody(nil)
			
            ....
			
            }
			call.done()
		}
	}
	
    ...
	
	}
}

复制代码

主要逻辑是不断循环读取TCP上的流,把Header解析成Response对象,以及将Body解析到call.Reply对象,解析完后触发call中的done函数。这样客户端就可以拿到Reply对象就是服务器返回的结果,可以打印获取其中的值。

总结:

描述完这几个方法,在回头看开始的 client.go 的流程图就清晰了,可以说是分两条线,一条线显示的调用发送请求数据,另外一条线则起协程获取服务器的返回数据。

3. server.go 源码分析

话不多说,先来一张图了解一下大概:

微服务系列(一):Go Rpc 源码解读

整体分三部分,第一部分注册服务器定义的方法,第二部分监听客户端的请求,解析获取到客户端的请求参数。第三部分拿到请求参数执行服务器的调用函数,将返回结果返回给客户端。

整个过程其实可以对比是一次socket的调用过程。

  1. register 首先来看一下server的结构体:
type methodType struct {
	sync.Mutex // protects counters
	method     reflect.Method
	ArgType    reflect.Type
	ReplyType  reflect.Type
	numCalls   uint
}

type service struct {
	name   string                 // name of service
	rcvr   reflect.Value          // receiver of methods for the service
	typ    reflect.Type           // type of the receiver
	method map[string]*methodType // registered methods
}

type Server struct {
	serviceMap sync.Map   // map[string]*service
	reqLock    sync.Mutex // protects freeReq
	freeReq    *Request
	respLock   sync.Mutex // protects freeResp
	freeResp   *Response
}
复制代码

看英语注释就比起清楚具体是做什么的,Server存储服务器的service以及其请求的Request和Response,这二个就是跟客户端约定的协议,如下:

type Request struct {
	ServiceMethod string   // format: "Service.Method"
	Seq           uint64   // sequence number chosen by client
	next          *Request // for free list in Server
}

type Response struct {
	ServiceMethod string    // echoes that of the Request
	Seq           uint64    // echoes that of the request
	Error         string    // error, if any.
	next          *Response // for free list in Server
}
复制代码

service 存储服务器需要注册的方法,methodType就是具体方法的属性。

所以要想客户端进行远程调用服务器的方法,前提是在调用之前,服务器的方法均已加载在Server结构体中,所以需要服务器显示的调用register方法,下面看一下里面核心的代码:

func (server *Server) register(rcvr interface{}, name string, useName bool) error {
	s := new(service)
	s.typ = reflect.TypeOf(rcvr)
	s.rcvr = reflect.ValueOf(rcvr)
	sname := reflect.Indirect(s.rcvr).Type().Name()
	...
	s.name = sname

	// Install the methods
	s.method = suitableMethods(s.typ, true)

	...
    
	if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
		...   
	}
    ...
}


func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
	methods := make(map[string]*methodType)
	for m := 0; m < typ.NumMethod(); m++ {
		method := typ.Method(m)
		mtype := method.Type
		mname := method.Name
		
		argType := mtype.In(1)
		
        ...
	
		replyType := mtype.In(2)
		
        ...
        
		methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
	}
	return methods
}
复制代码

这段代码就是通过反射把结构体实现的方法的一些属性获取到,包括本身可执行的方法对象、名称、请求参数、返回参数。

最终存储到server的serviceMap中。 客户端调用服务器的方法的结构为 struct.method,这样只需要按 . 进行分割,拿到struct名称和method名称则可以通过再serviceMap获取到方法,执行获得结果。

注册完方法后,接下来就是监听客户端的请求了。

  1. Accept

先来看看 Accept 的代码:

func (server *Server) Accept(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Print("rpc.Serve: accept:", err.Error())
			return
		}
		go server.ServeConn(conn)
复制代码

通过 net 包中的监听tcp端口,然后起了一个协程,来看看这个协程里做了什么?

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	buf := bufio.NewWriter(conn)
	srv := &gobServerCodec{
		rwc:    conn,
		dec:    gob.NewDecoder(conn),
		enc:    gob.NewEncoder(buf),
		encBuf: buf,
	}
	server.ServeCodec(srv)
}

func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	wg := new(sync.WaitGroup)
	for {
		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		
       ...
       
		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
	}
  ...
}
复制代码

这段也好理解,ServeConn 将gob序列化的方法和conn保存到gobServerCodec结构体,然后调用了server.ServeCodec方法,这个方式做的事情就是将客户端传过来的包解析序列化解析,将请求参数,待返回的变量,以及是调服务器哪个方法,这些均在上面的 server.readRequest方法处理。

func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {

	service, mtype, req, keepReading, err = server.readRequestHeader(codec)
	...
}

func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
	// Grab the request header.
	req = server.getRequest()
	...
	dot := strings.LastIndex(req.ServiceMethod, ".")
	...
	serviceName := req.ServiceMethod[:dot]
	methodName := req.ServiceMethod[dot+1:]

	// Look up the request.
	svci, ok := server.serviceMap.Load(serviceName)
	...
	svc = svci.(*service)
	mtype = svc.method[methodName]
	...
	}
	return
}
复制代码

核心的功能再 readRequestHeader 中,做的一件事就是将客户端传过来的 struct.method,按 . 进行分割,然后拿到serviceName和methodName,然后再去server.serviceMap中拿到具体的服务和方法执行对象。

拿到之后,会起一个协程,调service.call方法,这里面做的事情就是执行服务器服务的方法,拿到返回结果,再调用WriteReponse,将数据写回去。然后客户端的 input 方法循环获取结果。这样形成闭环。

下面看看service.call方法:

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
	...
	function := mtype.method.Func
	returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
	...
	server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
	...
}
复制代码

实现的功能跟上面分析的一样,通过mtype拿到函数对象,然后调用反射的Call方法执行得到结果,最后调用server.sendResponse发送发回结果。

看到这里再回过来看上面画的Server代码流程图,就非常清晰了。

Go Rpc源码解读就到这里。

4. 总结

Go RPC源码目前官方已经没有维护,官方推荐使用grpc,下一篇计划分析grpc的源码。

下面总结一下优缺点:

  • 优点:
    • 代码精简,可扩展性高。
  • 缺点:
    • 同步调用时,通过chan阻塞异步的 Go 方法,并没有处理超时,这样如果超时将导致大量的协程无法释放。
    • 可能存在内存泄漏的情况,因为客户端的请求数据在Server结构体中,如果Server端不返回则不会清理其中的数据,客户端的Go函数退出并不会清理其中的内容,所以Server结构体会一直存储,从而内存泄漏。

目前开源的RPC框架已经不是像这种简单的网络调用了,还会包括很多服务治理的功能,比如服务注册与发现、限流熔断、监控等等。这个以后接触新的rpc再分享,最终达到可以自己完整写一个rpc框架的目的。

更多Rpc相关文章和讨论,请关注公众号:『 天澄技术杂谈 』

微服务系列(一):Go Rpc 源码解读

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Effective JavaScript

Effective JavaScript

David Herman / Addison-Wesley Professional / 2012-12-6 / USD 39.99

"It's uncommon to have a programming language wonk who can speak in such comfortable and friendly language as David does. His walk through the syntax and semantics of JavaScript is both charming and h......一起来看看 《Effective JavaScript》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

html转js在线工具
html转js在线工具

html转js在线工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具