NSQ v0.1.5 源码分析

栏目: 编程工具 · 发布时间: 5年前

内容简介:NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。源码地址:https://github.com/nsqio/nsq对于一个大型的项目来讲,我个人的学习习惯于从最小版本开始学起。这是因为,在一个项目最初的时候,大体功能和架构都已经成形,最初的版本,一般来说,代码量都较少,功能集最小。学习曲线低,并且又最初版本,

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

源码地址:https://github.com/nsqio/nsq

对于一个大型的项目来讲,我个人的学习习惯于从最小版本开始学起。这是因为,在一个项目最初的时候,大体功能和架构都已经成形,最初的版本,一般来说,代码量都较少,功能集最小。学习曲线低,并且又最初版本,慢慢往高版本过渡,也能更了解项目进化的过程,也是一个学习的过程。

并且在实际使用过程中,大多数情况下,我们可能不需要那么多的功能集,并且需要根据实际情况做一些二次开发,此时的话,也许低版本的会更贴近实际使用场景和二次开发场景。

开源代码学习-nsq(v0.1.1版本)源码分析 中已经对v0.1.1版本进行了分析。其是一个单机版本,但发布订阅功能是已经实现。

本文对v0.1.5版本进行分析。此版本已经实现了分布式,分成了两个部分你nsqd:用于做订阅发布。nsqlookupd:用于做topic服务注册与发现。

这种模式就类似于微服务中的模式,增加了一个服务注册和发现模块。

每个单机nsqd用于存储topic,并提供订阅与发布功能。增加的nsqlookupd,则是用于多个单机进行注册服务,并对消费者提供查找对应topic所在的nsqd服务器。

这是一种非常简单的分布式架构。

那么看上传代码日志。分别为v0.1.1版本所在位置和v0.1.5所在位置。

NSQ v0.1.5 源码分析

那么看v0.1.5下代码目录:

NSQ v0.1.5 源码分析

代码量相对比v0.1.1版本多了好多文件。

分两个大模块开始分解:

nsqd

还是从main函数开始

github.com/nsqio/nsq/nsqd/main.go

NSQ v0.1.5 源码分析

104:初始化NSQD

105:真正入口nsqd.Main()

github.com/nsqio/nsq/nsqd/nsqd.go

NSQ v0.1.5 源码分析

初始化,并开启了一个协程idPump,这个协程用于全局生成guid的。

NSQ v0.1.5 源码分析

这个协程并非主要流程中的,就不做解释。这里对应0.1.1版本中的

uuidFactory

NSQ v0.1.5 源码分析

这里为真正的main入口。

46:相对比0.1.1版本多了一个 LookupRouter 协程处理。

53:tcpserver

60:httpserver

NSQ v0.1.5 源码分析

在nsqd中提供了一个接口, GetTopic 。少了 topicFactory

下面各个模块分析:

topic

github.com/nsqio/nsq/nsqd/topic.go

NSQ v0.1.5 源码分析

与0.1.1版本中,最大的差别就是多了 读写锁 。在0.1.1版本中,所有的操作都是在Router中,通过chan的方式,然后在单协程中处理所有请求来实现同步。

在0.1.5版本中,增加了读写锁,提供更大的并发操作。

NSQ v0.1.5 源码分析

初始化,然后开启了Router协程。

这里的操作没什么区别。

那么对于topic来说分为几个操作:

1、GetChannel

NSQ v0.1.5 源码分析

58-59:读写锁的操作

61:查找channel

63:未查找到的,生成新的channel

67:每个channel开启了一个MessagePump协程操作。这里用了sync.Once操作。

2、MessagePump 信息分发

NSQ v0.1.5 源码分析

89:从命名来看修改成了 memorMsgchan ,从此chan中获取msg

90:从backend缓冲区中获取msg

91:解码

100-108:通过加锁操作实现并发,遍历所有的channel,然后将msg分发给channel中。

3、PutMessage

NSQ v0.1.5 源码分析

这里的与0.1.1版本中一样,通过incomingMessageChan,发送给Router中处理

4、Router

NSQ v0.1.5 源码分析

由于一些操作通过提供加锁的函数,使得Router简洁了非常多。

Router中只剩下对信息发布的一个操作

119:获取incomingMessagechan信息

121:写入memoryMsgChan中

123:若memoryMsgChan阻塞,则写入backend中。

小结:

topic的代码逻辑更简洁了。msg写入,分发,也都很清晰了。通过加锁的操作来实现并发的操作。

channel

github.com/nsqio/nsq/nsqd/channel.go

NSQ v0.1.5 源码分析

channel也同样增加了读写锁,为的是将之前的单协程的Router操作,分解为了可以多并发操作。

NSQ v0.1.5 源码分析

在NewChannel中,初始化之后,开启了多个协程。

其中最重要的有router,messagePump两个协程。

重要的操作有:

1、AddClient

NSQ v0.1.5 源码分析

addclient,为外部消费者来进行消费channel。这个是一个很重要的操作

这里采用了加锁的操作,来实现并发的同步

2、PutMessage 从topic中分发msg到channel中

NSQ v0.1.5 源码分析

3、router 收到topic分发的msg后的操作

NSQ v0.1.5 源码分析

处理挺简单的,监听incomingMessageChan,将msg发送到memoryMsgChan中,若其阻塞则放到backend缓冲区中。

4、messagePump 将channel中的信息分发给client

NSQ v0.1.5 源码分析

244:从memoryMsgChan中读取msg

245:从backend缓冲区中读取msg

257: 这里是向client提供msg分发的地方,是通过clientMessageChan的方式

小结:

与topic代码一样,channel的代码逻辑也简洁了很多。函数功能也单一了。

tcpserver

github.com/nsqio/nsq/nsqd/tcp.go

NSQ v0.1.5 源码分析

入口为client.Handle

server_client

github.com/nsqio/nsq/nsq/server_client.go

NSQ v0.1.5 源码分析

server_client是一个基础的类

NSQ v0.1.5 源码分析

Handle中

83:读取了传入的protocols

90:调用了protocol的IOLoop

server_protocol_v2

github.com/nsqio/nsq/nsqd/server_protocol_v2.go

NSQ v0.1.5 源码分析

在init中进行protocol的初始化

NSQ v0.1.5 源码分析

IOLoop中真正的处理地方为

43:不断读取line

55: ProtocolExecute ,为真正处理入口

75:client的write,将Response返回

github.com/nsqio/nsq/nsq/protocol.go

NSQ v0.1.5 源码分析

这里的操作与0.1.1版本类似,通过反射查找到对应操作。

回到github.com/nsqio/nsq/nsqd/server_protocol_v2.go中

SUB订阅

NSQ v0.1.5 源码分析

关键操作:

198:获取topic

199:获取channel

200:addclient,将client加入到channel中

205:开启了PushMessages协程。这里的操作和0.1.1版本中不一样。

在0.1.1版本中是要调用get操作获取message,而在这个版本中是直接提供了pushMessage一个协程操作。

PushMessage

NSQ v0.1.5 源码分析

在channel中,就有说过提供的接口chan,clientMessageChan。每个client都会在PushMessage协程中,监听对应channel的clientMessageChan,来获取msg。

126:监听clientMessageChan

134:msg encode

144:调用client的write将msg发送出去。

小结:

tcp监听,生成client,再通过protocol与channel链接起来。

httpserver

github.com/nsqio/nsq/nsqd/http.go

NSQ v0.1.5 源码分析

相对比0.1.1中,增加了很多的handle

pingHandler

NSQ v0.1.5 源码分析

不解释

putHandler

NSQ v0.1.5 源码分析

这里就是发布msg的对外接口

72:获取topic

73:生成msg

74:调用topic的接口PutMessage

mputHandler

NSQ v0.1.5 源码分析

另外提供了一个多msg的发布接口

不同之处在:

100-103:msg的分割,再将msg一个一个调用topic的PutMessage。

lookup

github.com/nsqio/nsq/nsqd/lookup.go

lookup模块是nsqd与nsqlookupd通信的模块

NSQ v0.1.5 源码分析

66-76:解析了所有lookup服务器,并防止lookupPeers中

NSQ v0.1.5 源码分析

在此协程中提供了以下操作:

86-91:15秒定时的操作,定对所有lookup服务器进行ping操作

92-98:进行channel的一个announce操作,通知lookup服务器新增channel

99-105:进行topic的一个announce操作,通知lookup服务器新增topic

106-119:进行所有的topic和channel的同步

总结:

nsqd中的模块越来越清晰简洁。

nsqlookupd

新增加的模块,在文章开头已经讲过其模块功能。

直接看源码

github.com/nsqio/nsq/nsqlookupd/nsqlookupd.go

NSQ v0.1.5 源码分析

main函数中,开启了两个模块

76:tcpserver

82:httpserver

tcpserver

github.com/nsqio/nsq/nsqlookupd/tcp.go

NSQ v0.1.5 源码分析

入口为Handle,这个已经成了标准操作了

github.com/nsqio/nsq/nsqlookupd/server_lookup_protocol_v1.go

NSQ v0.1.5 源码分析

协议注册

NSQ v0.1.5 源码分析

这个也是标准操作

入口是

32:初始化了一个 safe_map ,用于存储信息

47:protocolExecute

提供的操作有

ping

这里的ping还是很简单

announce

NSQ v0.1.5 源码分析

86:获取safe_map

88:将announce信息,存放到safe_map中

小结:

tcpserver提供给nsqd用于ping和announce操作

httpserver

github.com/nsqio/nsq/nsqlookupd/http.go

NSQ v0.1.5 源码分析

对消费者提供了两个handle

1、pingHandler

NSQ v0.1.5 源码分析

比较简单,不做解释

2、lookupHandler

NSQ v0.1.5 源码分析

提供给消费者,用于查找对应topic的信息,包括channel,所在的nsqd的ip和port

37:读取topicname

43:从safe map中查找到对应信息

50-54:信息封装

63:Response返回

总结:

nsqlookupd的功能定位简单,代码量也少,模块也清晰。

全文总结:

本文通过增加了一个nsqlookupd服务,来实现了一个分布式的消息中间件。

消费者,先从nsqlookupd,查找获取到对应topic所在的nsqd的服务ip和port。再与对应的nsqd服务进行发布与订阅操作。

nsqd则向nsqlookup进行topic与channel信息的announce操作。

此种分布式的操作,简单,直接。 后续版本,待有时间的时候,再做分析。

龚浩华

月牙寂道长

qq:29185807

2019年05月09日

如果你觉得本文对你有帮助,可以转到你的朋友圈,让更多人一起学习。

第一时间获取文章,可以关注本人公众号:月牙寂道长,也可以扫码关注

NSQ v0.1.5 源码分析


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

测试驱动的JavaScript开发

测试驱动的JavaScript开发

Christian Johansen / 赵勇、程德、凌杰、高博 / 机械工业出版社 / 2012-2-9 / 69.00元

本书是一本完整的、基于最佳实践的JavaScript敏捷测试指南,同时又有着测试驱动开发方法(TDD)所带来的质量保证。领先一步的JavaScript敏捷开发者Christian Johansen的讨论涵盖了将最先进的自动化测试用于JavaScript开发环境的方方面面,带领读者走查整个开发的生命周期,从项目启动到应用程序部署。本书的主要内容包括:掌握自动化测试和TDD;构建有效的自动化测试工作流......一起来看看 《测试驱动的JavaScript开发》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换