内容简介:NSQ 是一款基于Go语言的分布式消息队列,这种消息中间件,已经有很多了,比如RabbitMQ,比如阿里开发的RocketMQ,比如Kafka,NSQ一款比较清爽的消息中间件,尽管功能上不如Kafka这么大而全,但是轻量,简单,入手简单,而且大部分情况下,无论是性能还是功能基本够用。至于消息中间件的作用,无非是解耦,缓冲之类的,工作中遇到类似困境的自然懂,遇不到类似困境的,多说也无益。最新稳定版本的NSQ可以从如下地方下载:
前言
NSQ 是一款基于 Go 语言的分布式消息队列,这种消息中间件,已经有很多了,比如RabbitMQ,比如阿里开发的RocketMQ,比如Kafka,NSQ一款比较清爽的消息中间件,尽管功能上不如Kafka这么大而全,但是轻量,简单,入手简单,而且大部分情况下,无论是性能还是功能基本够用。
至于消息中间件的作用,无非是解耦,缓冲之类的,工作中遇到类似困境的自然懂,遇不到类似困境的,多说也无益。
设计原理
最新稳定版本的NSQ可以从如下地方下载:
https://nsq.io/deployment/installing.html
对于我们 Linux 来讲,下载如下版本并解压:
nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
压缩包的内容如下:
nsq-1.1.0.linux-amd64.go1.10.3/ nsq-1.1.0.linux-amd64.go1.10.3/bin/ nsq-1.1.0.linux-amd64.go1.10.3/bin/nsq_to_file nsq-1.1.0.linux-amd64.go1.10.3/bin/nsqlookupd nsq-1.1.0.linux-amd64.go1.10.3/bin/nsq_tail nsq-1.1.0.linux-amd64.go1.10.3/bin/nsqadmin nsq-1.1.0.linux-amd64.go1.10.3/bin/nsq_to_http nsq-1.1.0.linux-amd64.go1.10.3/bin/nsq_stat nsq-1.1.0.linux-amd64.go1.10.3/bin/nsqd nsq-1.1.0.linux-amd64.go1.10.3/bin/to_nsq nsq-1.1.0.linux-amd64.go1.10.3/bin/nsq_to_nsq
为了更好的理解NSQ的设计原理,甚至是消息中间件的原理,我们递进的分析 作为一个消息中间件需要做哪些事情,已经NSQ是如何做的。下面部分深度的参考了知乎大神柳树的文章 MQ(1)—— 从队列到消息中间件 ,我无意抄袭,只是大神讲解的确实漂亮,强烈建议对消息中间件感兴趣的筒子,阅读大神的系列文章,总是光荣属于前辈,我只是一个小学生。
NSQ 1.0,我需要一个消息队列
生产者和消费者模型,不是一个新的东西,生产者负责源源不断地生产任务,而消费者不需要管消息的来源,只需要源源不断地处理任务。既然是生产者只负责把产生任务,而消费者负责处理任务,问题就来了,任务存放到那里。这时候消息队列就横空出世了。
当有任务需要处理的时候,生产者进程就负责往消息队列里面Push一条消息,而消费者能够及时地知道消息队列里面有消息,从而取出任务来处理。 上面这段话有两个问题:
-
任务有很多种类型,并不是所有的生产者会生产出所有类型的任务,同样,也不是所有的消费者都关心所有类型的消息,因此引入了topic的概念。
-
消费者如何知道消息队列中存在消息。这里面就有了消息队列中Push和Pull的区别。不同的消息中间件,采用的不同的方法,Kafka采用Pull,而我们介绍的NSQ采用的是Push。
有了这个队列,解决了两个问题:解耦和缓冲。如果消费者处理失败,可以给消息队列回复 requeue, NSQ会将消息重新放入队列,进行重试。
NSQ 2.0 Channel
从生产者的角度来说,放入不同topic的队列,但是从消费者的角度来说,不同的消费者,可能要关注不同的topic。 更详细点说,
- 对于一个集群来讲,收到消息A,到底由哪个消费者负责处理该消息?
- 一条消息,能不能同时发给多个消费者,多个消费者都来处理该消息
这就是消费组(Consumer Group)的概念。Kafka里面,这个概念叫消费组,而NSQ里面叫Channel。
我们还是考虑集群,对于一个集群来讲,有些任务是集群层面的任务,即,不需要每个节点都处理该任务,只需要集群中选出一个代表,负责把该任务处理即可。这事一种很common的场景。
我们用官方给的gif来解释这种场景:
对于topic clicks, 三个消费者同时关注了该topic,同时都属于metrics channel,或者说metrics 消费组,这样,NSQ收到消息后,会给每一个channel复制一份消息,对于metrics这个消费组,有三个Consumer 实例,那么该消息会发给which Consumer?还是说同时发给三个consumer,答案是对于metrics这个channel,只会发给一个consumer,至于要发给谁,那就是负载均衡逻辑。即这次发给消费者A,下一次发给另一消费者。
对于一个集群来讲,一个消息,每个消费者(不同主机上),都要执行该怎么办?
:warning: NSQ采用的是Push的策略,即,nsqd都会负责push消息到所有关注了该topic的channel。
NSQ 3.0 nsqlookup
上面讲到,nsq收到生产者生产的消息之后,会将消息复制多份,推送给关注该topic的所有channel。 问题是,nsq怎么知道哪些消费者订阅了对应topic的的消息呢?
最简单的方法是写死,有个配置文件,ip是 xx.xx.xx.xx,端口为yyyy,消费者关注topic-xx,channel是zzz,这样最大的问题是不灵活。 我们需要的是一个叫做服务发现的功能。这个功能就是nsqlookup.
nsqlookup提供了一个类似etcd的kv存储,里面记录了topic下面都有哪些nsq。 nsqlookup 提供了一个 /lookup API,可以实时查询哪些 nsq下面有某个topic的消息。
curl "http://127.0.0.1:4161/lookup?topic=x_topic"
输出如下:
{ "producers" : [ { "version" : "1.1.0", "tcp_port" : 4150, "broadcast_address" : "manu-Inspiron-5748", "hostname" : "manu-Inspiron-5748", "http_port" : 4151, "remote_address" : "127.0.0.1:50662" } ], "channels" : [] }
如果我启用了消费者,关注x_topic,并且channel是 work_group_a,那么输出如下:
manu-Inspiron-5748 ~ » curl "http://127.0.0.1:4161/lookup?topic=x_topic" 2>/dev/null |json_pp { "producers" : [ { "tcp_port" : 4150, "version" : "1.1.0", "remote_address" : "127.0.0.1:50662", "hostname" : "manu-Inspiron-5748", "broadcast_address" : "manu-Inspiron-5748", "http_port" : 4151 } ], "channels" : [ "work_group_a" ] }
消费者就可以通过nsqlookup,获取到producer的列表,根据列表中的broadcast_address和tcp_port ,就可以拿到url 地址。 消费者就会和这些nsq逐个建立连接。当有消息到来的时候,nsq就会给和他建立联系的消费者Push 消息。
小节
我们可以总结下,NSQ的主要组件有三个:
- nsqd : 一个负责接收、排队、转发消息到客户端的守护进程
- nsqlookupd: 管理拓扑信息并提供最终一致性的服务发现 daemon
- nsqadmin: 这是一个WEB用户界面,可选。事实上也可以不启动。实时查看集群的统计数据,并执行管理任务。
开始操练
我们可以简单地使用下NSQ。
只需要将上一节中的可执行文件放入/usr/bin/目录下。
1 启动nsqlookupd
nsqlookupd
可以看到
manu-Inspiron-5748 Python/nsq » nsqlookupd 130 ↵ [nsqlookupd] 2019/03/10 22:05:55.189652 INFO: nsqlookupd v1.1.0 (built w/go1.10.3) [nsqlookupd] 2019/03/10 22:05:55.190135 INFO: HTTP: listening on [::]:4161 [nsqlookupd] 2019/03/10 22:05:55.190155 INFO: TCP: listening on [::]:4160
我们可以看到版本信息是1.1.0,listen了4161和6160两个端口。
2 运行nsqd实例:
nsqd --lookupd-tcp-address=127.0.0.1:4160
3 启用 nsqadmin
nsqadmin --lookupd-http-address=127.0.0.1:4161
启用nsqadmin之后,我们就可以查看WEB 界面了:
接下来我们通过程序来创建topic,书写生产者和消费者的程序:
import nsq import tornado.ioloop import time import random import json def pub_message(): message = {} message['number'] = random.randint(1,1000) writer.pub('x_topic', json.dumps(message), finish_pub) def finish_pub(conn, data): print data writer = nsq.Writer(['127.0.0.1:4150']) tornado.ioloop.PeriodicCallback(pub_message, 1000).start() nsq.run()
我们创建一个x_topic的topic,并且生产者往里面每秒钟扔一个消息,消息的内容为
{'number': 409}
其中数字部分随机产生,而消费者,负责接受消息,判断数字是否是素数:
import nsq from tornado import gen from functools import partial import ujson as json def is_prime(n): n = int(n) if n < 2: return False; if n % 2 == 0: return n == 2 # return False k = 3 while k*k <= n: if n % k == 0: return False k += 2 return True @gen.coroutine def write_message(topic, data, writer): response = yield gen.Task(writer.pub, topic, data) if isinstance(response, nsq.Error): print "Error with Message: {}:{}".format(data, response) else: print "Published Message: ", data def calculate_prime(message, writer): message.enable_async() data = json.loads(message.body) prime = is_prime(data["number"]) data["prime"] = prime if prime: topic = "primes" else: topic = "non_primes" output_message = json.dumps(data) write_message(topic, output_message,writer) message.finish() if __name__ == "__main__": writer = nsq.Writer(['127.0.0.1:4150',]) handler = partial(calculate_prime, writer=writer) reader = nsq.Reader( message_handler = handler, nsqd_tcp_addresses = ['127.0.0.1:4150'], topic = 'x_topic', channel = 'work_group_a') nsq.run()
我们将两个程序跑起来:
manu-Inspiron-5748 Python/nsq » python nsq_producer.py & ; python nsq_consumer.py [1] 14833 OK Published Message: {"prime":false,"number":669} OK Published Message: {"prime":false,"number":275} OK Published Message: {"prime":false,"number":214} OK Published Message: {"prime":false,"number":518} OK Published Message: {"prime":true,"number":739} OK Published Message: {"prime":false,"number":184} OK Published Message: {"prime":true,"number":521} OK
以上所述就是小编给大家介绍的《NSQ 简介》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
C++ Primer Plus
Stephen Prata / Addison Wesley / 2011-10-18 / GBP 39.99
C++ Primer Plus, Sixth Edition New C++11 Coverage C++ Primer Plus is a carefully crafted, complete tutorial on one of the most significant and widely used programming languages today. An accessible an......一起来看看 《C++ Primer Plus》 这本书的介绍吧!