内容简介:之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点rabbitmq的代码,以及后来的emqtt都看过一点, 所以对消息队列这块是情有独钟。转到go后也在关注消息队列这块,nsq是一个golng的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。nsq的的话主要有三个模块构成, 这里直接复制官方的介绍
为什么选择nsq
之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点rabbitmq的代码,以及后来的emqtt都看过一点, 所以对消息队列这块是情有独钟。转到 go 后也在关注消息队列这块,nsq是一个golng的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。
nsq的架构与代码结构
nsq的的话主要有三个模块构成, 这里直接复制官方的介绍:
nsqd: is the daemon that receives, queues, and delivers messages to clients.
nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.
nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).
这里是一个消息投递的过程, 显示了消息怎么从nsqd到达consumer, 缺少了producer和nsqlookupd. nsqlookupd主要提供了两个功能:
- 向nsqd提供一个topic和channel的注册信息
- 对consumser提供了toic和channel的查询功能然后
consumer查询到nsqd之后就是上面看到的动态图了, consumer直接和nsqd通信, 下面是一个更全面一点的时序图
整个项目的代码结构也是围绕上面的三个模块构建:
- internal(公共部分的实现)
- nsqadmin(对nsqadmin的时间)
- nsqd(对nsqd的实现)
- nsqlookupd(对nsqlookupd的实现)
总共也就这四个package,是不是有很想看下去的冲动(smile).
lookupd的启动流程
经过上面的介绍,我们对lookupd有里简单的认识.首先他是一个独立的进程, 为topic和channel的发现服务. 但不参与时间的消息投递. 对lookup的实现是在nsq/apps/nsqlookupd/nsqlookupd.go和nsq/nsqlookupd/中. lookupd的启动是使用了一个叫 go-srv 的windows wrapper.通过在nsq/apps/nsqlookupd/nsqlookupd.go中实现:
type Service interface { // Init is called before the program/service is started and after it's // determined if the program is running as a Windows Service. Init(Environment) error // Start is called after Init. This method must be non-blocking. Start() error // Stop is called in response to os.Interrupt, os.Kill, or when a // Windows Service is stopped. Stop() error }
来完成整个进程的管理,go-srv帮助我们做了系统信号的管理, 下面来看下lookupd的启动流程,
实例化一个NSQLookupd对象
// apps/nsqlookupd/nsqlookupd.go daemon := nsqlookupd.New(opts) // 实例化一个NSQLookupd的对象 err := daemon.Main() // 开始启动NSQLookupd // nsq/nsqlookupd/nsqlookupd.go func New(opts *Options) *NSQLookupd { .... n := &NSQLookupd{ opts: opts, // 启动参数 DB: NewRegistrationDB(), // 内从里面的一个数据库,主要用来存储tpoic/channel以及nsqd的消息 } ... return n }
开始启动
// Main starts an instance of nsqlookupd and returns an // error if there was a problem starting up. func (l *NSQLookupd) Main() error { ctx := &Context{l} // 启动两场go routine来处理tcp/http的请求 tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) if err != nil { return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err) } httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) if err != nil { return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err) } l.tcpListener = tcpListener l.httpListener = httpListener tcpServer := &tcpServer{ctx: ctx} l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.logf) }) httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { http_api.Serve(httpListener, httpServer, "HTTP", l.logf) }) return nil }
下面是一个lookupd里面的进程模型
lookupd里的主要数据结构
在上面创建一个instance的时候我们看到创建一个NewRegistrationDB()的函数, 这里就是存储lookupd所有数据结构的地方.
每个topic/channe/clientl就是一个Registration的key, 然后value对应的就是该topic/channel对应的nsqd信息.所有的接口都是在操作上面的那个数据结构.
lookupd和其他模块的交互
在进程模型中我们看到一个tcp server和一个http seerver, 和其他模块之间的交互都是在里面完成的.看下tcp server的处理
有新的tcp连接进来,创建一个新的go routine去服务该请求
// /nsq/internal/tcp_server.go func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { for { ... go handler.Handle(clientConn) }
实例化一个protocol对象
// /nsq/nsqlookupd/tcp.go func (p *tcpServer) Handle(clientConn net.Conn) { ... prot.IOLoop(clientConn) ... }
对请求的具体处理
// /nsq/nsqlookupd/lookup_protocol_v1.go func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { ... p.Exec(client, reader, params) ... } // /nsq/nsqlookupd/lookup_protocol_v1.go func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case "PING": // NSQD的心跳包 return p.PING(client, params) case "IDENTIFY": // NQSD启动时候的indentify就是我们上面看到的peerInfo return p.IDENTIFY(client, reader, params[1:]) case "REGISTER": // 注册topic/channel信息到lookupd return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": // unregister topic/lookup 信息 return p.UNREGISTER(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
上面就是整个tcp server的流程, 每个连接都是一个go routine. 相对tcp server来说的话http server就简单很多, 如果你对httprouter熟悉的话就更简单了就是对RegistrationDB的增删查改. http测的api的话可以参考:
官方的文档总结
lookupd是其中比较简单的模块,通过源码的学习我们可以更好的掌握go的一些技巧,也鼓励大家通过一一些开源的代码来掌握语言的一些技巧。其实通过lookupd我们可以抽象一套自己的HTTP/TCP服务端架构来。
以上所述就是小编给大家介绍的《NSQ源码-nsqlookupd》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- ReactNative源码解析-初识源码
- 【源码阅读】Gson源码阅读
- Spring源码系列:BeanDefinition源码解析
- istio 源码 – Citadel 源码分析 (原创)
- istio 源码 – pilot 源码分析(原创)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。