内容简介:在看Client里的属性,有同步调用chan,异步调用chan,还有一个异步记数器。然后就是一些方法:也就是Open方法传入的参数,决定了这两个通道的长度。
在 Golang 游戏leaf系列(三) NewAgent在chanrpc和skeleton中怎么通讯 (下文简称系列三)中,主要是分析了Server结构体,并没有涉及Client。本文将深入分析剩余部分。
//chanrpc.go type RetInfo struct { ret interface{} err error cb interface{} } type CallInfo struct { f interface{} args []interface{} chanRet chan *RetInfo cb interface{} } type Server struct { functions map[interface{}]interface{} ChanCall chan *CallInfo } type Client struct { s *Server chanSyncRet chan *RetInfo ChanAsynRet chan *RetInfo pendingAsynCall int }
看Client里的属性,有同步调用chan,异步调用chan,还有一个异步记数器。然后就是一些方法:
func (s *Server) Open(l int) *Client { c := NewClient(l) c.Attach(s) return c } func NewClient(l int) *Client { c := new(Client) c.chanSyncRet = make(chan *RetInfo, 1) c.ChanAsynRet = make(chan *RetInfo, l) return c } func (c *Client) Attach(s *Server) { c.s = s }
也就是Open方法传入的参数,决定了这两个通道的长度。
一、结合example_test.go来看一下使用方式
1.支持的参数类型
在源码中,有很多f0,f1,fn,相应的也有call0,call1,calln这些写法。实际上,针对的是不同参数类型:
//参数为[]interface,无返回值 func([]interface{}) //参数为[]interface,返回值为interface func([]interface{}) interface{} //参数为[]interface,返回值为[]interface func([]interface{}) []interface{}
2.注册
s := chanrpc.NewServer(10) var wg sync.WaitGroup wg.Add(1) // goroutine 1 go func() { s.Register("f0", func(args []interface{}) { }) s.Register("f1", func(args []interface{}) interface{} { return 1 }) s.Register("fn", func(args []interface{}) []interface{} { return []interface{}{1, 2, 3} }) s.Register("add", func(args []interface{}) interface{} { n1 := args[0].(int) n2 := args[1].(int) return n1 + n2 }) wg.Done() for { s.Exec(<-s.ChanCall) } }() wg.Wait() wg.Add(1)
“add”那个,也算是一个Call1类型的,然后 s.Exec(<-s.ChanCall)
就相当于在等着ChanCall来数据了。
3.call系列
// goroutine 2 go func() { c := s.Open(10) // sync err := c.Call0("f0") if err != nil { fmt.Println(err) } r1, err := c.Call1("f1") if err != nil { fmt.Println(err) } else { fmt.Println(r1) } rn, err := c.CallN("fn") if err != nil { fmt.Println(err) } else { fmt.Println(rn[0], rn[1], rn[2]) } ra, err := c.Call1("add", 1, 2) if err != nil { fmt.Println(err) } else { fmt.Println(ra) } // asyn c.AsynCall("f0", func(err error) { if err != nil { fmt.Println(err) } }) c.AsynCall("f1", func(ret interface{}, err error) { if err != nil { fmt.Println(err) } else { fmt.Println(ret) } }) c.AsynCall("fn", func(ret []interface{}, err error) { if err != nil { fmt.Println(err) } else { fmt.Println(ret[0], ret[1], ret[2]) } }) c.AsynCall("add", 1, 2, func(ret interface{}, err error) { if err != nil { fmt.Println(err) } else { fmt.Println(ret) } }) c.Cb(<-c.ChanAsynRet) c.Cb(<-c.ChanAsynRet) c.Cb(<-c.ChanAsynRet) c.Cb(<-c.ChanAsynRet) // go s.Go("f0") wg.Done() }() wg.Wait()
二、call系列
func (c *Client) Call0(id interface{}, args ...interface{}) error { f, err := c.f(id, 0) if err != nil { return err } err = c.call(&CallInfo{ f: f, args: args, chanRet: c.chanSyncRet, }, true) if err != nil { return err } ri := <-c.chanSyncRet return ri.err }
c.f(id,0)这个在call系列里都有,主要工作就是确认通过Register注册的function,参数的数量和Call系列是否一致。
然后就是真的要去执行了,这时候对比发现,call系列的执行都是一样的:
err = c.call(&CallInfo{ f: f, args: args, chanRet: c.chanSyncRet, }, true) func (c *Client) call(ci *CallInfo, block bool) (err error) { defer func() { if r := recover(); r != nil { err = r.(error) } }() if block { c.s.ChanCall <- ci } else { select { case c.s.ChanCall <- ci: default: err = errors.New("chanrpc channel full") } } return }
看一下CallInfo
type CallInfo struct { f interface{} args []interface{} chanRet chan *RetInfo cb interface{} }
发现call系列没有给cb,这说明它们是不需要回调的。然后block传的全是true,说明call系列会直接向ChanCall写入callinfo。
example_test.go里call的太多了,可以先尝试运行其中一个
ra, err := c.Call1("add", 1, 2) if err != nil { fmt.Println(err) } else { fmt.Println(ra) }
输出的是3,上面一直没提返回值的事情,就Call1来看,ra返回的正是结果3。其实看源码,是通过 ri := <-c.chanSyncRet
拿到这个值的。也就是说, s.Exec(<-s.ChanCall)
在拿到callinfo后,会写到chanSyncRet里。
在exec里有这样的代码:
// execute switch ci.f.(type) { case func([]interface{}): ci.f.(func([]interface{}))(ci.args) return s.ret(ci, &RetInfo{}) case func([]interface{}) interface{}: ret := ci.f.(func([]interface{}) interface{})(ci.args) return s.ret(ci, &RetInfo{ret: ret}) case func([]interface{}) []interface{}: ret := ci.f.(func([]interface{}) []interface{})(ci.args) return s.ret(ci, &RetInfo{ret: ret}) }
ret方法是这样的:
func (s *Server) ret(ci *CallInfo, ri *RetInfo) (err error) { if ci.chanRet == nil { return } defer func() { if r := recover(); r != nil { err = r.(error) } }() ri.cb = ci.cb ci.chanRet <- ri return }
三、异步AsynCall
现在来看一下异步调用AsynCall
c.AsynCall("add", 1, 2, func(ret interface{}, err error) { if err != nil { fmt.Println(err) } else { fmt.Println(ret) } })
这里在参数的最末尾,传入一个回调function,然后流程和同步类似
func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) { f, err := c.f(id, n) if err != nil { c.ChanAsynRet <- &RetInfo{err: err, cb: cb} return } err = c.call(&CallInfo{ f: f, args: args, chanRet: c.ChanAsynRet, cb: cb, }, false) if err != nil { c.ChanAsynRet <- &RetInfo{err: err, cb: cb} return } }
只不过black参数是false的。
if block { c.s.ChanCall <- ci } else { select { case c.s.ChanCall <- ci: default: err = errors.New("chanrpc channel full") } }
再来对比一下,如果s.ChanCall读取太慢,已经写不进去了,在同步模式下,这个callinfo会一直阻塞等在那里;而异步模式,会走到default分支,也就是报个错。然后异步模式会把结果都存到ChanAsynRet里
if err != nil { c.ChanAsynRet <- &RetInfo{err: err, cb: cb} return }
当然如果存不下,还在调用AsynCall,它会直接在AsynCall里执行回调
... // too many calls if c.pendingAsynCall >= cap(c.ChanAsynRet) { execCb(&RetInfo{err: errors.New("too many calls"), cb: cb}) return } c.asynCall(id, args, cb, n) c.pendingAsynCall++ ... func execCb(ri *RetInfo) { defer func() { if r := recover(); r != nil { if conf.LenStackBuf > 0 { buf := make([]byte, conf.LenStackBuf) l := runtime.Stack(buf, false) log.Error("%v: %s", r, buf[:l]) } else { log.Error("%v", r) } } }() // execute switch ri.cb.(type) { case func(error): ri.cb.(func(error))(ri.err) case func(interface{}, error): ri.cb.(func(interface{}, error))(ri.ret, ri.err) case func([]interface{}, error): ri.cb.(func([]interface{}, error))(assert(ri.ret), ri.err) default: panic("bug") } return } func (c *Client) Cb(ri *RetInfo) { c.pendingAsynCall-- execCb(ri) }
也就是说,异步模式会把结果存起来。什么时候执行Cb呢,别忘了skeleton.Run啊
// leaf\module\skeleton.go func (s *Skeleton) Run(closeSig chan bool) { for { select { case <-closeSig: s.commandServer.Close() s.server.Close() for !s.g.Idle() || !s.client.Idle() { s.g.Close() s.client.Close() } return case ri := <-s.client.ChanAsynRet: s.client.Cb(ri) // 等待来自通道的数据 case ci := <-s.server.ChanCall: s.server.Exec(ci) case ci := <-s.commandServer.ChanCall: s.commandServer.Exec(ci) case cb := <-s.g.ChanCb: s.g.Cb(cb) case t := <-s.dispatcher.ChanTimer: t.Cb() } } }
四、总结
ChanRPC 的调用方有 3 种调用模式:
- 同步模式,调用并等待 ChanRPC 返回
- 异步模式,调用并提供回调函数,回调函数会在 ChanRPC 返回后被调用
- Go 模式,调用并立即返回,忽略任何返回值和错误
看下来,还是 Go 模式最简单,没有回调,也没有ret返回信息。而同步模式和异步模式,暂时还没有看到使用示例。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 无限调用之链模式分析
- 直观讲解-RPC调用和HTTP调用的区别
- 调用链系列一:解读UAVStack中的调用链技术
- 调用链系列二:解读UAVStack中的调用链技术
- 调用链系列三:解读UAVStack中的调用链技术
- dubbo源码解析(二十七)远程调用——injvm本地调用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。