内容简介:最开始接触到Leaf,就是被它的网络消息功能吸引的。那么先看看这部分功能吧。从文档中得知:Leaf 可以单独使用 TCP 协议或 WebSocket 协议,也可以同时使用两者,换而言之,服务器可以同时接受 TCP 连接和 WebSocket 连接,对开发者而言消息来自 TCP 还是 WebSocket 是完全透明的。这个功能在源码中是如何实现的呢,看看network目录下tcp开头的,和ws开头的,有xx_conn,xx_msg,xx_server,正好各有3个文件。在conn.go里有个Conn inte
最开始接触到Leaf,就是被它的网络消息功能吸引的。那么先看看这部分功能吧。从文档中得知:
Leaf 可以单独使用 TCP 协议或 WebSocket 协议,也可以同时使用两者,换而言之,服务器可以同时接受 TCP 连接和 WebSocket 连接,对开发者而言消息来自 TCP 还是 WebSocket 是完全透明的。
一、network和gate
这个功能在源码中是如何实现的呢,看看network目录下tcp开头的,和ws开头的,有xx_conn,xx_msg,xx_server,正好各有3个文件。在conn.go里有个Conn interface,所以xx_conn肯定是实现这个接口的两个不同类型。按照这个思路,顺便看一下processor.go里的解析器接口,也是有json.go和protobuf.go两种实现。
type Conn interface { ReadMsg() ([]byte, error) WriteMsg(args ...[]byte) error LocalAddr() net.Addr RemoteAddr() net.Addr Close() Destroy() }
1.gate目录
然后xx_conn这两种连接方式,要对外透明,是封装在gate包下面,一起使用的。先看一下agent.go:
type Agent interface { WriteMsg(msg interface{}) LocalAddr() net.Addr RemoteAddr() net.Addr Close() Destroy() UserData() interface{} SetUserData(data interface{}) }
在gate.go里,会有一个agent 结构体来实现Agent接口。除了Agent接口中的方法,agent还实现了Run方法和OnClose方法。
type agent struct { conn network.Conn gate *Gate userData interface{} }
这个结构体又引入了一个Gate,这是啥?在gate.go里也能找到:
type Gate struct { MaxConnNum int PendingWriteNum int MaxMsgLen uint32 Processor network.Processor AgentChanRPC *chanrpc.Server // websocket WSAddr string HTTPTimeout time.Duration CertFile string KeyFile string // tcp TCPAddr string LenMsgLen int LittleEndian bool }
看起来有一些配置参数,还有一个数据解析器Processor,和AgentChanRPC *chanrpc.Server,看一下怎么用的吧。
Gate只有两个方法,OnDestroy目前是空的,还有一个是Run,不出意外的话,应该是解析那些配置参数,启动服务:
func (gate *Gate) Run(closeSig chan bool) { var wsServer *network.WSServer if gate.WSAddr != "" { wsServer = new(network.WSServer) wsServer.Addr = gate.WSAddr wsServer.MaxConnNum = gate.MaxConnNum wsServer.PendingWriteNum = gate.PendingWriteNum wsServer.MaxMsgLen = gate.MaxMsgLen wsServer.HTTPTimeout = gate.HTTPTimeout wsServer.CertFile = gate.CertFile wsServer.KeyFile = gate.KeyFile wsServer.NewAgent = func(conn *network.WSConn) network.Agent { a := &agent{conn: conn, gate: gate} if gate.AgentChanRPC != nil { gate.AgentChanRPC.Go("NewAgent", a) } return a } } var tcpServer *network.TCPServer if gate.TCPAddr != "" { tcpServer = new(network.TCPServer) tcpServer.Addr = gate.TCPAddr tcpServer.MaxConnNum = gate.MaxConnNum tcpServer.PendingWriteNum = gate.PendingWriteNum tcpServer.LenMsgLen = gate.LenMsgLen tcpServer.MaxMsgLen = gate.MaxMsgLen tcpServer.LittleEndian = gate.LittleEndian tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent { a := &agent{conn: conn, gate: gate} if gate.AgentChanRPC != nil { gate.AgentChanRPC.Go("NewAgent", a) } return a } } if wsServer != nil { wsServer.Start() } if tcpServer != nil { tcpServer.Start() } <-closeSig if wsServer != nil { wsServer.Close() } if tcpServer != nil { tcpServer.Close() } }
这里启动了两种不同类型的Server,closeSig那个暂时忽略不说。Server使用NewAgent回调,把gate传走了,呃,有点懵逼。还是回到官方例子中看看整个使用流程吧
二、 官方例子 LeafServer 中的Module
1.Module初始化
首先在main.go中
leaf.Run( game.Module, gate.Module, login.Module, )
这里gate.Module实际上是由gate包里的external.go暴露出来的(这也是leaf的使用习惯,所有module都这样暴露)。
//src/server/gate/external.go type Module struct { *gate.Gate } func (m *Module) OnInit() { m.Gate = &gate.Gate{ MaxConnNum: conf.Server.MaxConnNum, PendingWriteNum: conf.PendingWriteNum, MaxMsgLen: conf.MaxMsgLen, WSAddr: conf.Server.WSAddr, HTTPTimeout: conf.HTTPTimeout, CertFile: conf.Server.CertFile, KeyFile: conf.Server.KeyFile, TCPAddr: conf.Server.TCPAddr, LenMsgLen: conf.LenMsgLen, LittleEndian: conf.LittleEndian, Processor: msg.Processor, AgentChanRPC: game.ChanRPC, } }
匿名结构体Gate了,又额外实现一个OnInit方法,感觉像是有一个IModule这样的接口呢,找一找:
在源码的module.go中,确实找到了:
type Module interface { OnInit() OnDestroy() Run(closeSig chan bool) }
结合上面第一部分说的Gate实现了OnDestroy和Run方法,官方例子中的gate/external.go确是实现了Module接口。注意其OnInit中,除了一堆属性从conf配置中读取,还引入了msg.Processor,这明显是个网络消息解析器。然后game.ChanRPC,这看起来是转到game模块去了,所以在一开始main.go中的leaf.Run中,也是先传入的game.Module,然后才是gate.Module。
//leaf.go func Run(mods ...module.Module) { ... log.Release("Leaf %v starting up", version) // module for i := 0; i < len(mods); i++ { module.Register(mods[i]) } module.Init() ...
2.module是怎么运行起来的
再次回到源码module.go,节选一部分代码过来
type module struct { mi Module closeSig chan bool wg sync.WaitGroup } var mods []*module func Register(mi Module) { m := new(module) m.mi = mi m.closeSig = make(chan bool, 1) mods = append(mods, m) } func Init() { for i := 0; i < len(mods); i++ { mods[i].mi.OnInit() } for i := 0; i < len(mods); i++ { m := mods[i] m.wg.Add(1) go run(m) } } func run(m *module) { m.mi.Run(m.closeSig) m.wg.Done() }
看到这些,是不是想起来官方文档说的这段话:
Leaf 首先会在同一个 goroutine 中按模块注册顺序执行模块的 OnInit 方法,等到所有模块 OnInit 方法执行完成后则为每一个模块启动一个 goroutine 并执行模块的 Run 方法。最后,游戏服务器关闭时(Ctrl + C 关闭游戏服务器)将按模块注册相反顺序在同一个 goroutine 中执行模块的 OnDestroy 方法。
三、综述
1.流程
现在来理一理思路。从main.go里开始,leaf.Run注册并运行了game,gate,login三个module。重点关注gate这个module,这个module通过组合方式实现了Module接口,即自己项目里实现OnInit方法,通过匿名结构体gate.Gate在源码里实现OnDestroy和Run方法。其中,OnInit方法里把gate.Gate制造出来了,部分属性读取conf的配置,Processor指定成自己项目的消息解析,AgentChanRPC指定了自己项目里的game模块。
... Processor: msg.Processor, AgentChanRPC: game.ChanRPC, ...
然后按照流程继续走,gate模块的OnInit执行完,就要去执行Run了。这个方法在本文第一部分就看过了,当时卡在一个懵逼的地方:
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent { a := &agent{conn: conn, gate: gate} if gate.AgentChanRPC != nil { gate.AgentChanRPC.Go("NewAgent", a) } return a }
现在有点感觉了吧,也就是说tcpServer执行NewAgent时,看单词名字意思是一个新连接事件发生时,实际会转交给gate.AgentChanRPC去执行,也就是例子中的game.ChanRPC。转交方式是 .Go("NewAgent", a)
,就像抛出一个事件一样,有一个名称,有一个参数。可以去game模块的chanrpc.go验证一下
//game.internal.chanrpc.go func init() { skeleton.RegisterChanRPC("NewAgent", rpcNewAgent) skeleton.RegisterChanRPC("CloseAgent", rpcCloseAgent) } func rpcNewAgent(args []interface{}) { a := args[0].(gate.Agent) _ = a } func rpcCloseAgent(args []interface{}) { a := args[0].(gate.Agent) _ = a }
2.tcpServer什么时候执行NewAgent
在gate的Run方法中,提到了tcpServer会根据参数生成并运行
... if tcpServer != nil { tcpServer.Start() } ...
然后去tcp_server.go看一下
func (server *TCPServer) Start() { server.init() go server.run() }
init和run细节有点多,先忽略掉吧。我们是来找NewAgent的,终于在run中找到了:
... tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser) agent := server.NewAgent(tcpConn) go func() { agent.Run() // cleanup tcpConn.Close() server.mutexConns.Lock() delete(server.conns, conn) server.mutexConns.Unlock() agent.OnClose() server.wgConns.Done() }()
首先这段代码是在一个for循环中的,也就是收到tcp消息时,才会执行。具体基础知识参考 Golang socket websocket 。agent在拿到具体的tcpConn,会执行自己的Run方法,回到源码gate.go的agent结构体可以看到:
type agent struct { conn network.Conn gate *Gate userData interface{} } func (a *agent) Run() { for { data, err := a.conn.ReadMsg() if err != nil { log.Debug("read message: %v", err) break } if a.gate.Processor != nil { msg, err := a.gate.Processor.Unmarshal(data) ... }
开始使用相应的Processor去读取数据了。
本篇暂时先到这里,还有许多细节,留待后续系列慢慢深究。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- HTML5/CSS3响应式布局介绍以及设计流程概述
- 服务端指南 服务端概述 | 微服务架构概述
- influxdb 源码分析-概述
- zookeeper入门系列:概述
- 云原生架构概述
- 1.Spring 框架概述
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。