内容简介:Topic与Channel是NSQ中重要的两个概念。生产者将消息写到Topic中,一个Topic下可以有多个Channel,每个Channel都是Topic的完整副本。消费者从Channel处订阅消息,如果有多个消费者订阅同一个Channel,Channel中的消息将被传递到一个随机的消费者。
Topic与Channel
Topic与Channel是NSQ中重要的两个概念。
生产者将消息写到Topic中,一个Topic下可以有多个Channel,每个Channel都是Topic的完整副本。
消费者从Channel处订阅消息,如果有多个消费者订阅同一个Channel,Channel中的消息将被传递到一个随机的消费者。
代码只抽取部分关键内容进行解析,省略一些非核心代码。
Topic相关函数
type Topic struct { //一个Topic可以拥有多个Channel channelMap map[string]*Channel } // Topic constructor func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), } //开启一个goroutine负责监听写到该topic的Msg t.waitGroup.Wrap(t.messagePump) return t } // messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic func (t *Topic) messagePump() { var msg *Message var buf []byte var err error var chans []*Channel var memoryMsgChan chan *Message var backendChan chan []byte t.RLock() //取出Topic的所有Channel for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) > 0 && !t.IsPaused() { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } // main message loop for { select { //从memoryMsgChan读取一个Msg case msg = <-memoryMsgChan: case <-t.exitChan: goto exit } //对每个Channel写入Msg for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } //把Msg传入channel的PutMessage err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } exit: t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) }
总结上面部分,nsq很好的利用golang的特性-goroutine之间通过chan来进行通信,如果要对一个Topic写入Msg,只要往memoryMsgChan写入Msg。
每个Topic在NewTopic()创建之初都会开启一个goroutine负责监听该Topic的memoryMsgChan,一旦有消息就有复制N份Msg写入下面的Channel。
这里可以看到,通过select的方式来读取消息,NSQ的消息是无序的。
消息的产生
消息是由client主动进行pub指令进行发布的,我们看到主要的IOLoop()函数里,Exec会包含所有指令对应的函数。pub是生产者产生消息的指令,sub是消费者订阅消息的指令。
func (p *protocolV2) IOLoop(conn net.Conn) error { // ... response, err = p.Exec(client, params) // ... } func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { // ... case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) // ... } func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { // ... topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) // ... } func (t *Topic) PutMessage(m *Message) error { // ... err := t.put(m) // ... return nil } func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil }
PUB函数中,client会将Msg放入对应Topic的memoryMsgChan中,如果被阻塞,将会写入Backend中,Backend是磁盘存储,然后就是上一节所说的每个Topic一直有一个goroutine将Msg复制分发到所有的Channel中,Channel中的Msg等待client消费者去获取。
消息的分发
消费者发送sub指令后,订阅指定topic下的指定channel,获得SubEventChan。每个client连接后,IOLoop还会开启一个messagePump去把channel中的Msg发送给client。
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { var channel *Channel for { topic := p.ctx.nsqd.GetTopic(topicName) // 获取指定Topic下的指定channel,如果没有则创建 channel = topic.GetChannel(channelName) channel.AddClient(client.ID, client) if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { channel.RemoveClient(client.ID) time.Sleep(1 * time.Millisecond) continue } break } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // 把这个channel传给消费者的SubEventChan,该值会在IOLoop的第二个goroutine的messagePump中读取 client.SubEventChan <- channel } func (p *protocolV2) IOLoop(conn net.Conn) error { clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize // goroutine local state derived from client attributes // and avoid a potential race with IDENTIFY (where a client // could have changed or disabled said attributes) // 相当于标识,下面会阻塞该channel来保证goroutine的初始化的完成 messagePumpStartedChan := make(chan bool) // 如果client订阅了topic,messagePump会将Msg最终分发到client go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan } func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var memoryMsgChan chan *Message var backendMsgChan chan []byte var subChannel *Channel subEventChan := client.SubEventChan identifyEventChan := client.IdentifyEventChan outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout // signal to the goroutine that started the messagePump // that we've started up close(startedChan) for { if subChannel == nil || !client.IsReadyForMessages() { // ... } else { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-client.ReadyStateChan: // sub订阅之后可以获取subChannel case subChannel = <-subEventChan: // 每个client只能订阅一个channel subEventChan = nil case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ // inflight 队列用来实现“至少投递一次消息” subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } }
整理整个流程,Client连接的是Channel,Topic在接收到消息后会分发到左右的Channel,如果多个Client连接同一个Channel,那么从实现上来看,每个消息在由Channel分发到Client的时候实现了负载均衡。每个消息在多个Client中,只会有一个接收到。这么回头看上一篇的消息传递图,就很明了了。
image.png
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 消费端如何保证消息队列MQ的有序消费
- RocketMQ & Kafka 消息消费与消息重试
- 日常 Bug 排查:消息不消费
- RabbitMQ拉模式批量消费消息
- 消息队列的消费语义和投递语义
- RocketMQ 主从如何同步消息消费进度?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Letting Go of the Words
Janice (Ginny) Redish / Morgan Kaufmann / 2007-06-11 / USD 49.95
"Redish has done her homework and created a thorough overview of the issues in writing for the Web. Ironically, I must recommend that you read her every word so that you can find out why your customer......一起来看看 《Letting Go of the Words》 这本书的介绍吧!
随机密码生成器
多种字符组合密码
RGB CMYK 转换工具
RGB CMYK 互转工具