内容简介:最开始接触到Leaf,就是被它的网络消息功能吸引的。那么先看看这部分功能吧。从文档中得知:Leaf 可以单独使用 TCP 协议或 WebSocket 协议,也可以同时使用两者,换而言之,服务器可以同时接受 TCP 连接和 WebSocket 连接,对开发者而言消息来自 TCP 还是 WebSocket 是完全透明的。这个功能在源码中是如何实现的呢,看看network目录下tcp开头的,和ws开头的,有xx_conn,xx_msg,xx_server,正好各有3个文件。在conn.go里有个Conn inte
最开始接触到Leaf,就是被它的网络消息功能吸引的。那么先看看这部分功能吧。从文档中得知:
Leaf 可以单独使用 TCP 协议或 WebSocket 协议,也可以同时使用两者,换而言之,服务器可以同时接受 TCP 连接和 WebSocket 连接,对开发者而言消息来自 TCP 还是 WebSocket 是完全透明的。
一、network和gate
这个功能在源码中是如何实现的呢,看看network目录下tcp开头的,和ws开头的,有xx_conn,xx_msg,xx_server,正好各有3个文件。在conn.go里有个Conn interface,所以xx_conn肯定是实现这个接口的两个不同类型。按照这个思路,顺便看一下processor.go里的解析器接口,也是有json.go和protobuf.go两种实现。
type Conn interface {
ReadMsg() ([]byte, error)
WriteMsg(args ...[]byte) error
LocalAddr() net.Addr
RemoteAddr() net.Addr
Close()
Destroy()
}
1.gate目录
然后xx_conn这两种连接方式,要对外透明,是封装在gate包下面,一起使用的。先看一下agent.go:
type Agent interface {
WriteMsg(msg interface{})
LocalAddr() net.Addr
RemoteAddr() net.Addr
Close()
Destroy()
UserData() interface{}
SetUserData(data interface{})
}
在gate.go里,会有一个agent 结构体来实现Agent接口。除了Agent接口中的方法,agent还实现了Run方法和OnClose方法。
type agent struct {
conn network.Conn
gate *Gate
userData interface{}
}
这个结构体又引入了一个Gate,这是啥?在gate.go里也能找到:
type Gate struct {
MaxConnNum int
PendingWriteNum int
MaxMsgLen uint32
Processor network.Processor
AgentChanRPC *chanrpc.Server
// websocket
WSAddr string
HTTPTimeout time.Duration
CertFile string
KeyFile string
// tcp
TCPAddr string
LenMsgLen int
LittleEndian bool
}
看起来有一些配置参数,还有一个数据解析器Processor,和AgentChanRPC *chanrpc.Server,看一下怎么用的吧。
Gate只有两个方法,OnDestroy目前是空的,还有一个是Run,不出意外的话,应该是解析那些配置参数,启动服务:
func (gate *Gate) Run(closeSig chan bool) {
var wsServer *network.WSServer
if gate.WSAddr != "" {
wsServer = new(network.WSServer)
wsServer.Addr = gate.WSAddr
wsServer.MaxConnNum = gate.MaxConnNum
wsServer.PendingWriteNum = gate.PendingWriteNum
wsServer.MaxMsgLen = gate.MaxMsgLen
wsServer.HTTPTimeout = gate.HTTPTimeout
wsServer.CertFile = gate.CertFile
wsServer.KeyFile = gate.KeyFile
wsServer.NewAgent = func(conn *network.WSConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}
var tcpServer *network.TCPServer
if gate.TCPAddr != "" {
tcpServer = new(network.TCPServer)
tcpServer.Addr = gate.TCPAddr
tcpServer.MaxConnNum = gate.MaxConnNum
tcpServer.PendingWriteNum = gate.PendingWriteNum
tcpServer.LenMsgLen = gate.LenMsgLen
tcpServer.MaxMsgLen = gate.MaxMsgLen
tcpServer.LittleEndian = gate.LittleEndian
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}
if wsServer != nil {
wsServer.Start()
}
if tcpServer != nil {
tcpServer.Start()
}
<-closeSig
if wsServer != nil {
wsServer.Close()
}
if tcpServer != nil {
tcpServer.Close()
}
}
这里启动了两种不同类型的Server,closeSig那个暂时忽略不说。Server使用NewAgent回调,把gate传走了,呃,有点懵逼。还是回到官方例子中看看整个使用流程吧
二、 官方例子 LeafServer 中的Module
1.Module初始化
首先在main.go中
leaf.Run(
game.Module,
gate.Module,
login.Module,
)
这里gate.Module实际上是由gate包里的external.go暴露出来的(这也是leaf的使用习惯,所有module都这样暴露)。
//src/server/gate/external.go
type Module struct {
*gate.Gate
}
func (m *Module) OnInit() {
m.Gate = &gate.Gate{
MaxConnNum: conf.Server.MaxConnNum,
PendingWriteNum: conf.PendingWriteNum,
MaxMsgLen: conf.MaxMsgLen,
WSAddr: conf.Server.WSAddr,
HTTPTimeout: conf.HTTPTimeout,
CertFile: conf.Server.CertFile,
KeyFile: conf.Server.KeyFile,
TCPAddr: conf.Server.TCPAddr,
LenMsgLen: conf.LenMsgLen,
LittleEndian: conf.LittleEndian,
Processor: msg.Processor,
AgentChanRPC: game.ChanRPC,
}
}
匿名结构体Gate了,又额外实现一个OnInit方法,感觉像是有一个IModule这样的接口呢,找一找:
在源码的module.go中,确实找到了:
type Module interface {
OnInit()
OnDestroy()
Run(closeSig chan bool)
}
结合上面第一部分说的Gate实现了OnDestroy和Run方法,官方例子中的gate/external.go确是实现了Module接口。注意其OnInit中,除了一堆属性从conf配置中读取,还引入了msg.Processor,这明显是个网络消息解析器。然后game.ChanRPC,这看起来是转到game模块去了,所以在一开始main.go中的leaf.Run中,也是先传入的game.Module,然后才是gate.Module。
//leaf.go
func Run(mods ...module.Module) {
...
log.Release("Leaf %v starting up", version)
// module
for i := 0; i < len(mods); i++ {
module.Register(mods[i])
}
module.Init()
...
2.module是怎么运行起来的
再次回到源码module.go,节选一部分代码过来
type module struct {
mi Module
closeSig chan bool
wg sync.WaitGroup
}
var mods []*module
func Register(mi Module) {
m := new(module)
m.mi = mi
m.closeSig = make(chan bool, 1)
mods = append(mods, m)
}
func Init() {
for i := 0; i < len(mods); i++ {
mods[i].mi.OnInit()
}
for i := 0; i < len(mods); i++ {
m := mods[i]
m.wg.Add(1)
go run(m)
}
}
func run(m *module) {
m.mi.Run(m.closeSig)
m.wg.Done()
}
看到这些,是不是想起来官方文档说的这段话:
Leaf 首先会在同一个 goroutine 中按模块注册顺序执行模块的 OnInit 方法,等到所有模块 OnInit 方法执行完成后则为每一个模块启动一个 goroutine 并执行模块的 Run 方法。最后,游戏服务器关闭时(Ctrl + C 关闭游戏服务器)将按模块注册相反顺序在同一个 goroutine 中执行模块的 OnDestroy 方法。
三、综述
1.流程
现在来理一理思路。从main.go里开始,leaf.Run注册并运行了game,gate,login三个module。重点关注gate这个module,这个module通过组合方式实现了Module接口,即自己项目里实现OnInit方法,通过匿名结构体gate.Gate在源码里实现OnDestroy和Run方法。其中,OnInit方法里把gate.Gate制造出来了,部分属性读取conf的配置,Processor指定成自己项目的消息解析,AgentChanRPC指定了自己项目里的game模块。
... Processor: msg.Processor, AgentChanRPC: game.ChanRPC, ...
然后按照流程继续走,gate模块的OnInit执行完,就要去执行Run了。这个方法在本文第一部分就看过了,当时卡在一个懵逼的地方:
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{conn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
现在有点感觉了吧,也就是说tcpServer执行NewAgent时,看单词名字意思是一个新连接事件发生时,实际会转交给gate.AgentChanRPC去执行,也就是例子中的game.ChanRPC。转交方式是 .Go("NewAgent", a)
,就像抛出一个事件一样,有一个名称,有一个参数。可以去game模块的chanrpc.go验证一下
//game.internal.chanrpc.go
func init() {
skeleton.RegisterChanRPC("NewAgent", rpcNewAgent)
skeleton.RegisterChanRPC("CloseAgent", rpcCloseAgent)
}
func rpcNewAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
func rpcCloseAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
2.tcpServer什么时候执行NewAgent
在gate的Run方法中,提到了tcpServer会根据参数生成并运行
...
if tcpServer != nil {
tcpServer.Start()
}
...
然后去tcp_server.go看一下
func (server *TCPServer) Start() {
server.init()
go server.run()
}
init和run细节有点多,先忽略掉吧。我们是来找NewAgent的,终于在run中找到了:
...
tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
agent := server.NewAgent(tcpConn)
go func() {
agent.Run()
// cleanup
tcpConn.Close()
server.mutexConns.Lock()
delete(server.conns, conn)
server.mutexConns.Unlock()
agent.OnClose()
server.wgConns.Done()
}()
首先这段代码是在一个for循环中的,也就是收到tcp消息时,才会执行。具体基础知识参考 Golang socket websocket 。agent在拿到具体的tcpConn,会执行自己的Run方法,回到源码gate.go的agent结构体可以看到:
type agent struct {
conn network.Conn
gate *Gate
userData interface{}
}
func (a *agent) Run() {
for {
data, err := a.conn.ReadMsg()
if err != nil {
log.Debug("read message: %v", err)
break
}
if a.gate.Processor != nil {
msg, err := a.gate.Processor.Unmarshal(data)
...
}
开始使用相应的Processor去读取数据了。
本篇暂时先到这里,还有许多细节,留待后续系列慢慢深究。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- HTML5/CSS3响应式布局介绍以及设计流程概述
- 服务端指南 服务端概述 | 微服务架构概述
- influxdb 源码分析-概述
- zookeeper入门系列:概述
- 云原生架构概述
- 1.Spring 框架概述
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Design for Hackers
David Kadavy / Wiley / 2011-10-18 / USD 39.99
Discover the techniques behind beautiful design?by deconstructing designs to understand them The term ?hacker? has been redefined to consist of anyone who has an insatiable curiosity as to how thin......一起来看看 《Design for Hackers》 这本书的介绍吧!
JS 压缩/解压工具
在线压缩/解压 JS 代码
图片转BASE64编码
在线图片转Base64编码工具