Nsq:如何从一条队列开始,设计出一个靠谱的消息中间件

栏目: 后端 · 发布时间: 7年前

内容简介:假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情:于是一个创建订单的函数,至少需要取调用三个其他服务的接口,就像这样: img写成伪码:

假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情:

  1. 消息通知系统:通知商家,你有一笔新的订单,请及时发货
  2. 推荐系统:更新用户画像,重新给用户推荐他可能感兴趣的商品
  3. 会员系统:更新用户的积分和等级信息

于是一个创建订单的函数,至少需要取调用三个其他服务的接口,就像这样: img

写成伪码:

createOrder(...) {
    doCreateOrder(...);
    
    // 调用其他服务接口
    sendMsg(...);
    updateUserInterestedGoods(...);
    updateMemberCreditInfo(...);
}

这样的做法,显然很挫,至少有两个问题:

  • 过度耦合:如果后面创建订单时,需要触发新的动作,那就得去改代码,在原有的创建订单函数末尾,追加一行代码
  • 缺少缓冲:如果创建订单时,会员系统恰好处于非常忙碌或者宕机的状态,那这是更新会员信息就会失败

因此,我们急需引入一个消息中间件,来实现解耦和缓冲的功能。

img

消息中间件的实现很多,比较常见的有kafka、rocketmq以及我们今天要讲的nsq。

相比于前面两个mq,nsq可以说是非常轻量级的,理解了它,也有助于学习kafka和rocketmq。

首先,让我们从消息队列最原始的形态开始。

Nsq 1.0 —— 我是一条队列

我们给订单系统和其他系统的中间,引入了一个消息中间件,或者说,引入了一条队列。

当订单系统创建完订单时,它只需要往队列里,塞入(push)一条topic为“order_created”的消息。

接着,我们的nsq1.0,会把这条消息,再推送给所有订阅了这个topic的消息的机器,告诉他们,“有新的订单,你们该干嘛干嘛”。

这样一个简单的队列,就解决了上面的两个问题:

  • 解耦:如果后面有新的动作,需要在创建订单后执行,那么只需要让新同学自己去订阅topic为“order_created”的消息即可
  • 缓冲:如果会员系统现在很忙,没空处理消息,那么只需跟nsq说,“我很忙,不要再发消息过来了”,那么nsq就不会给它推送消息,或者会员系统出了故障,消息虽然推送过去了,但是它给处理失败了,那么也只需给nsq回复一个“requeue”的命令,nsq就会把消息重新放入队列,进行重试。具体实现细节,后面再聊。

Nsq 2.0 —— channel

作为一个靠谱的中间件,你必须做到:高效、可靠、方便。

上面这个使用一条简单的队列来实现的消息中间件,肯定是不满足这三点的。

首先,假设我的会员系统,部署了三台实例,他们都订阅了topic为“order_created”的消息,那么一旦有订单创建,这三台实例就都会收到消息,并且去更新会员积分信息,而其实我只需要更新一次就ok了。

这就涉及到一个消费者组(Comsumer Group)的概念。消费者组是Kafka里提到的,在Nsq,对应的术语是channel。

会员系统的三个实例,你们收到消息时,要做的事情是一样的,并且只需要有有一个实例执行,那么你们就是一个消费者组里面的,要标识为同一个channel,比如说叫“update_memeber_credit”的channel,而短信系统和推荐系统,也要有自己的channel,用来和会员系统作区分,比如说叫“send_msg”和“update_user_interesting_goods”

当nsq收到消息时,会给每个channel复制一份消息,然后channel再给对应的消费者组,推送一条消息。消费者组里有多个实例,那么要推给谁呢?这就涉及到负载均衡,比如有一个消费者组里有ABC三个实例,这次推给了A,那么下次有可能是推送给B,再下次,也许就是C …

img

Nsq 3.0 —— nsqlookup

上面讲过,nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应topic和channel的消费者。

那么,nsq怎么知道哪些消费者订阅了topic为“order_created”的消息呢?

总不能在配置文件里写死吧?ip为10.12.65.123的,端口8878,这个消费者的topic是xxx,channel是xxx,…

因此,我们需要一个类似于微服务里头的注册中心的模块,来实现服务发现的功能,这就是nsqlookup.

nsqlookup提供了类似于etcd、zookeeper一样的kv存储服务,里面记录了topic下面都有哪些nsq。

nsqlookup提供了一个 /lookup 接口,比如你想知道哪些nsq上面,有topic为test的消息,那么只需要调一下:

curl 'http://127.0.0.1:4161/lookup?topic=test'

nsqlookup就会给你返回对应topic的nsq列表:

{
  "channels": [
    "xxx"
  ],
  "producers": [
    {
      "remote_address": "127.0.0.1:52796",
      "hostname": "hongzeyangdeMacBook-Pro",
      "broadcast_address": "127.0.0.1",
      "tcp_port": 4150,
      "http_port": 4151,
      "version": "1.0.0-compat"
    }
  ]
}

接着消费者只需要遍历返回的json串里的producers列表,把broadcast_address和tcp_port或者http_port拼起来,就可以得到有对应topic消息的nsq列表。

然后消费者会和这些nsq,逐个建立连接,nsq收到对应topic的消息后,就会给和他们建立连接的消费者,推送消息。

这个过程,可以从nsq的消费者客户端实现的代码中,很清楚的看出来。

我这里用nsq的 Java 客户端实现 brainlag/JavaNSQClient 作为例子:

和拥有对应topic的nsq建立连接:

调用lookup接口,获取拥有对应topic的nsq列表。注意看代码,里面是遍历了nsqlookup的列表,然后把所有lookup的返回结构,进行合并。

com.github.brainlag.nsq.lookup.DefaultNSQLookup#lookup:

img

画红框的地方,正是之前讲的拼凑逻辑。

接着和旧的nsq列表比较,进行删除和新增,保证本地的nsq列表数据是最新的。

com.github.brainlag.nsq.NSQConsumer#connect:

img

当然,这个过程不会只在消费者启动时才执行,而是定期去执行,不断去获取最新的nsq列表。

img

Nsq 4.0 —— nsqd集群

作为一个靠谱的中间件,你必须支持集群部署,这样才能实现可靠、高效。

nsq的集群部署非常简单,官方推荐一个生产者对应的部署一个nsqd:

What is the recommended topology for nsqd?
We strongly recommend running an nsqd alongside any service(s) that produce messages.

img

这也能解释,为什么上面的 /lookup 接口,返回的属性是叫 producers ,而不是叫 nsqs ,因为nsq认为一个producer,就对应一个nsq。

当然这样的做法有不少坏处,如果生产者对应的nsq挂掉了,那它就生产不了消息了。而且每个生产者都要部署一个nsq,未免有些奢侈。

不过对于大多数业务来说,这样的nsq已经够用。如果你像有赞一样,拥有一群 Go 语言大神,那也不放对nsq做一下改造。

Get it all together

串起来

总结

参考


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Learning PHP 5

Learning PHP 5

David Sklar / O'Reilly / July, 2004 / $29.95

Learning PHP 5 is the ideal tutorial for graphic designers, bloggers, and other web crafters who want a thorough but non-intimidating way to understand the code that makes web sites dynamic. The book ......一起来看看 《Learning PHP 5》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

随机密码生成器
随机密码生成器

多种字符组合密码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器