内容简介:在看Client里的属性,有同步调用chan,异步调用chan,还有一个异步记数器。然后就是一些方法:也就是Open方法传入的参数,决定了这两个通道的长度。
在 Golang 游戏leaf系列(三) NewAgent在chanrpc和skeleton中怎么通讯 (下文简称系列三)中,主要是分析了Server结构体,并没有涉及Client。本文将深入分析剩余部分。
//chanrpc.go
type RetInfo struct {
ret interface{}
err error
cb interface{}
}
type CallInfo struct {
f interface{}
args []interface{}
chanRet chan *RetInfo
cb interface{}
}
type Server struct {
functions map[interface{}]interface{}
ChanCall chan *CallInfo
}
type Client struct {
s *Server
chanSyncRet chan *RetInfo
ChanAsynRet chan *RetInfo
pendingAsynCall int
}
看Client里的属性,有同步调用chan,异步调用chan,还有一个异步记数器。然后就是一些方法:
func (s *Server) Open(l int) *Client {
c := NewClient(l)
c.Attach(s)
return c
}
func NewClient(l int) *Client {
c := new(Client)
c.chanSyncRet = make(chan *RetInfo, 1)
c.ChanAsynRet = make(chan *RetInfo, l)
return c
}
func (c *Client) Attach(s *Server) {
c.s = s
}
也就是Open方法传入的参数,决定了这两个通道的长度。
一、结合example_test.go来看一下使用方式
1.支持的参数类型
在源码中,有很多f0,f1,fn,相应的也有call0,call1,calln这些写法。实际上,针对的是不同参数类型:
//参数为[]interface,无返回值
func([]interface{})
//参数为[]interface,返回值为interface
func([]interface{}) interface{}
//参数为[]interface,返回值为[]interface
func([]interface{}) []interface{}
2.注册
s := chanrpc.NewServer(10)
var wg sync.WaitGroup
wg.Add(1)
// goroutine 1
go func() {
s.Register("f0", func(args []interface{}) {
})
s.Register("f1", func(args []interface{}) interface{} {
return 1
})
s.Register("fn", func(args []interface{}) []interface{} {
return []interface{}{1, 2, 3}
})
s.Register("add", func(args []interface{}) interface{} {
n1 := args[0].(int)
n2 := args[1].(int)
return n1 + n2
})
wg.Done()
for {
s.Exec(<-s.ChanCall)
}
}()
wg.Wait()
wg.Add(1)
“add”那个,也算是一个Call1类型的,然后 s.Exec(<-s.ChanCall)
就相当于在等着ChanCall来数据了。
3.call系列
// goroutine 2
go func() {
c := s.Open(10)
// sync
err := c.Call0("f0")
if err != nil {
fmt.Println(err)
}
r1, err := c.Call1("f1")
if err != nil {
fmt.Println(err)
} else {
fmt.Println(r1)
}
rn, err := c.CallN("fn")
if err != nil {
fmt.Println(err)
} else {
fmt.Println(rn[0], rn[1], rn[2])
}
ra, err := c.Call1("add", 1, 2)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ra)
}
// asyn
c.AsynCall("f0", func(err error) {
if err != nil {
fmt.Println(err)
}
})
c.AsynCall("f1", func(ret interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret)
}
})
c.AsynCall("fn", func(ret []interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret[0], ret[1], ret[2])
}
})
c.AsynCall("add", 1, 2, func(ret interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret)
}
})
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
// go
s.Go("f0")
wg.Done()
}()
wg.Wait()
二、call系列
func (c *Client) Call0(id interface{}, args ...interface{}) error {
f, err := c.f(id, 0)
if err != nil {
return err
}
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.chanSyncRet,
}, true)
if err != nil {
return err
}
ri := <-c.chanSyncRet
return ri.err
}
c.f(id,0)这个在call系列里都有,主要工作就是确认通过Register注册的function,参数的数量和Call系列是否一致。
然后就是真的要去执行了,这时候对比发现,call系列的执行都是一样的:
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.chanSyncRet,
}, true)
func (c *Client) call(ci *CallInfo, block bool) (err error) {
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
if block {
c.s.ChanCall <- ci
} else {
select {
case c.s.ChanCall <- ci:
default:
err = errors.New("chanrpc channel full")
}
}
return
}
看一下CallInfo
type CallInfo struct {
f interface{}
args []interface{}
chanRet chan *RetInfo
cb interface{}
}
发现call系列没有给cb,这说明它们是不需要回调的。然后block传的全是true,说明call系列会直接向ChanCall写入callinfo。
example_test.go里call的太多了,可以先尝试运行其中一个
ra, err := c.Call1("add", 1, 2)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ra)
}
输出的是3,上面一直没提返回值的事情,就Call1来看,ra返回的正是结果3。其实看源码,是通过 ri := <-c.chanSyncRet
拿到这个值的。也就是说, s.Exec(<-s.ChanCall)
在拿到callinfo后,会写到chanSyncRet里。
在exec里有这样的代码:
// execute
switch ci.f.(type) {
case func([]interface{}):
ci.f.(func([]interface{}))(ci.args)
return s.ret(ci, &RetInfo{})
case func([]interface{}) interface{}:
ret := ci.f.(func([]interface{}) interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
case func([]interface{}) []interface{}:
ret := ci.f.(func([]interface{}) []interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
}
ret方法是这样的:
func (s *Server) ret(ci *CallInfo, ri *RetInfo) (err error) {
if ci.chanRet == nil {
return
}
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
ri.cb = ci.cb
ci.chanRet <- ri
return
}
三、异步AsynCall
现在来看一下异步调用AsynCall
c.AsynCall("add", 1, 2, func(ret interface{}, err error) {
if err != nil {
fmt.Println(err)
} else {
fmt.Println(ret)
}
})
这里在参数的最末尾,传入一个回调function,然后流程和同步类似
func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) {
f, err := c.f(id, n)
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.ChanAsynRet,
cb: cb,
}, false)
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
}
只不过black参数是false的。
if block {
c.s.ChanCall <- ci
} else {
select {
case c.s.ChanCall <- ci:
default:
err = errors.New("chanrpc channel full")
}
}
再来对比一下,如果s.ChanCall读取太慢,已经写不进去了,在同步模式下,这个callinfo会一直阻塞等在那里;而异步模式,会走到default分支,也就是报个错。然后异步模式会把结果都存到ChanAsynRet里
if err != nil {
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
当然如果存不下,还在调用AsynCall,它会直接在AsynCall里执行回调
...
// too many calls
if c.pendingAsynCall >= cap(c.ChanAsynRet) {
execCb(&RetInfo{err: errors.New("too many calls"), cb: cb})
return
}
c.asynCall(id, args, cb, n)
c.pendingAsynCall++
...
func execCb(ri *RetInfo) {
defer func() {
if r := recover(); r != nil {
if conf.LenStackBuf > 0 {
buf := make([]byte, conf.LenStackBuf)
l := runtime.Stack(buf, false)
log.Error("%v: %s", r, buf[:l])
} else {
log.Error("%v", r)
}
}
}()
// execute
switch ri.cb.(type) {
case func(error):
ri.cb.(func(error))(ri.err)
case func(interface{}, error):
ri.cb.(func(interface{}, error))(ri.ret, ri.err)
case func([]interface{}, error):
ri.cb.(func([]interface{}, error))(assert(ri.ret), ri.err)
default:
panic("bug")
}
return
}
func (c *Client) Cb(ri *RetInfo) {
c.pendingAsynCall--
execCb(ri)
}
也就是说,异步模式会把结果存起来。什么时候执行Cb呢,别忘了skeleton.Run啊
// leaf\module\skeleton.go
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
// 等待来自通道的数据
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
四、总结
ChanRPC 的调用方有 3 种调用模式:
- 同步模式,调用并等待 ChanRPC 返回
- 异步模式,调用并提供回调函数,回调函数会在 ChanRPC 返回后被调用
- Go 模式,调用并立即返回,忽略任何返回值和错误
看下来,还是 Go 模式最简单,没有回调,也没有ret返回信息。而同步模式和异步模式,暂时还没有看到使用示例。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 无限调用之链模式分析
- 直观讲解-RPC调用和HTTP调用的区别
- 调用链系列一:解读UAVStack中的调用链技术
- 调用链系列二:解读UAVStack中的调用链技术
- 调用链系列三:解读UAVStack中的调用链技术
- dubbo源码解析(二十七)远程调用——injvm本地调用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Cyberwar
Kathleen Hall Jamieson / Oxford University Press / 2018-10-3 / USD 16.96
The question of how Donald Trump won the 2016 election looms over his presidency. In particular, were the 78,000 voters who gave him an Electoral College victory affected by the Russian trolls and hac......一起来看看 《Cyberwar》 这本书的介绍吧!