内容简介:假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情:于是一个创建订单的函数,至少需要取调用三个其他服务的接口,就像这样: img写成伪码:
假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情:
- 消息通知系统:通知商家,你有一笔新的订单,请及时发货
- 推荐系统:更新用户画像,重新给用户推荐他可能感兴趣的商品
- 会员系统:更新用户的积分和等级信息
于是一个创建订单的函数,至少需要取调用三个其他服务的接口,就像这样: img
写成伪码:
createOrder(...) { doCreateOrder(...); // 调用其他服务接口 sendMsg(...); updateUserInterestedGoods(...); updateMemberCreditInfo(...); }
这样的做法,显然很挫,至少有两个问题:
- 过度耦合:如果后面创建订单时,需要触发新的动作,那就得去改代码,在原有的创建订单函数末尾,追加一行代码
- 缺少缓冲:如果创建订单时,会员系统恰好处于非常忙碌或者宕机的状态,那这是更新会员信息就会失败
因此,我们急需引入一个消息中间件,来实现解耦和缓冲的功能。
img
消息中间件的实现很多,比较常见的有kafka、rocketmq以及我们今天要讲的nsq。
相比于前面两个mq,nsq可以说是非常轻量级的,理解了它,也有助于学习kafka和rocketmq。
首先,让我们从消息队列最原始的形态开始。
Nsq 1.0 —— 我是一条队列
我们给订单系统和其他系统的中间,引入了一个消息中间件,或者说,引入了一条队列。
当订单系统创建完订单时,它只需要往队列里,塞入(push)一条topic为“order_created”的消息。
接着,我们的nsq1.0,会把这条消息,再推送给所有订阅了这个topic的消息的机器,告诉他们,“有新的订单,你们该干嘛干嘛”。
这样一个简单的队列,就解决了上面的两个问题:
- 解耦:如果后面有新的动作,需要在创建订单后执行,那么只需要让新同学自己去订阅topic为“order_created”的消息即可
- 缓冲:如果会员系统现在很忙,没空处理消息,那么只需跟nsq说,“我很忙,不要再发消息过来了”,那么nsq就不会给它推送消息,或者会员系统出了故障,消息虽然推送过去了,但是它给处理失败了,那么也只需给nsq回复一个“requeue”的命令,nsq就会把消息重新放入队列,进行重试。具体实现细节,后面再聊。
Nsq 2.0 —— channel
作为一个靠谱的中间件,你必须做到:高效、可靠、方便。
上面这个使用一条简单的队列来实现的消息中间件,肯定是不满足这三点的。
首先,假设我的会员系统,部署了三台实例,他们都订阅了topic为“order_created”的消息,那么一旦有订单创建,这三台实例就都会收到消息,并且去更新会员积分信息,而其实我只需要更新一次就ok了。
这就涉及到一个消费者组(Comsumer Group)的概念。消费者组是Kafka里提到的,在Nsq,对应的术语是channel。
会员系统的三个实例,你们收到消息时,要做的事情是一样的,并且只需要有有一个实例执行,那么你们就是一个消费者组里面的,要标识为同一个channel,比如说叫“update_memeber_credit”的channel,而短信系统和推荐系统,也要有自己的channel,用来和会员系统作区分,比如说叫“send_msg”和“update_user_interesting_goods”
当nsq收到消息时,会给每个channel复制一份消息,然后channel再给对应的消费者组,推送一条消息。消费者组里有多个实例,那么要推给谁呢?这就涉及到负载均衡,比如有一个消费者组里有ABC三个实例,这次推给了A,那么下次有可能是推送给B,再下次,也许就是C …
img
Nsq 3.0 —— nsqlookup
上面讲过,nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应topic和channel的消费者。
那么,nsq怎么知道哪些消费者订阅了topic为“order_created”的消息呢?
总不能在配置文件里写死吧?ip为10.12.65.123的,端口8878,这个消费者的topic是xxx,channel是xxx,…
因此,我们需要一个类似于微服务里头的注册中心的模块,来实现服务发现的功能,这就是nsqlookup.
nsqlookup提供了类似于etcd、zookeeper一样的kv存储服务,里面记录了topic下面都有哪些nsq。
nsqlookup提供了一个 /lookup
接口,比如你想知道哪些nsq上面,有topic为test的消息,那么只需要调一下:
curl 'http://127.0.0.1:4161/lookup?topic=test'
nsqlookup就会给你返回对应topic的nsq列表:
{ "channels": [ "xxx" ], "producers": [ { "remote_address": "127.0.0.1:52796", "hostname": "hongzeyangdeMacBook-Pro", "broadcast_address": "127.0.0.1", "tcp_port": 4150, "http_port": 4151, "version": "1.0.0-compat" } ] }
接着消费者只需要遍历返回的json串里的producers列表,把broadcast_address和tcp_port或者http_port拼起来,就可以得到有对应topic消息的nsq列表。
然后消费者会和这些nsq,逐个建立连接,nsq收到对应topic的消息后,就会给和他们建立连接的消费者,推送消息。
这个过程,可以从nsq的消费者客户端实现的代码中,很清楚的看出来。
我这里用nsq的 Java 客户端实现 brainlag/JavaNSQClient 作为例子:
和拥有对应topic的nsq建立连接:
调用lookup接口,获取拥有对应topic的nsq列表。注意看代码,里面是遍历了nsqlookup的列表,然后把所有lookup的返回结构,进行合并。
com.github.brainlag.nsq.lookup.DefaultNSQLookup#lookup:
img
画红框的地方,正是之前讲的拼凑逻辑。
接着和旧的nsq列表比较,进行删除和新增,保证本地的nsq列表数据是最新的。
com.github.brainlag.nsq.NSQConsumer#connect:
img
当然,这个过程不会只在消费者启动时才执行,而是定期去执行,不断去获取最新的nsq列表。
img
Nsq 4.0 —— nsqd集群
作为一个靠谱的中间件,你必须支持集群部署,这样才能实现可靠、高效。
nsq的集群部署非常简单,官方推荐一个生产者对应的部署一个nsqd:
What is the recommended topology for nsqd? We strongly recommend running an nsqd alongside any service(s) that produce messages.
img
这也能解释,为什么上面的 /lookup
接口,返回的属性是叫 producers
,而不是叫 nsqs
,因为nsq认为一个producer,就对应一个nsq。
当然这样的做法有不少坏处,如果生产者对应的nsq挂掉了,那它就生产不了消息了。而且每个生产者都要部署一个nsq,未免有些奢侈。
不过对于大多数业务来说,这样的nsq已经够用。如果你像有赞一样,拥有一群 Go 语言大神,那也不放对nsq做一下改造。
Get it all together
串起来
总结
参考
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 消息队列中间件(一)介绍
- 消息队列中间件的选型
- 消息队列中间件(二)使用 ActiveMQ
- MQ(1) —— 从队列到消息中间件
- 消息队列中间件(三)Kafka 入门指南
- 中间件面试题:消息队列的优缺点,区别
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Language Implementation Patterns
Terence Parr / Pragmatic Bookshelf / 2010-1-10 / USD 34.95
Knowing how to create domain-specific languages (DSLs) can give you a huge productivity boost. Instead of writing code in a general-purpose programming language, you can first build a custom language ......一起来看看 《Language Implementation Patterns》 这本书的介绍吧!