内容简介:Topic与Channel是NSQ中重要的两个概念。生产者将消息写到Topic中,一个Topic下可以有多个Channel,每个Channel都是Topic的完整副本。消费者从Channel处订阅消息,如果有多个消费者订阅同一个Channel,Channel中的消息将被传递到一个随机的消费者。
Topic与Channel
Topic与Channel是NSQ中重要的两个概念。
生产者将消息写到Topic中,一个Topic下可以有多个Channel,每个Channel都是Topic的完整副本。
消费者从Channel处订阅消息,如果有多个消费者订阅同一个Channel,Channel中的消息将被传递到一个随机的消费者。
代码只抽取部分关键内容进行解析,省略一些非核心代码。
Topic相关函数
type Topic struct { //一个Topic可以拥有多个Channel channelMap map[string]*Channel } // Topic constructor func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), } //开启一个goroutine负责监听写到该topic的Msg t.waitGroup.Wrap(t.messagePump) return t } // messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic func (t *Topic) messagePump() { var msg *Message var buf []byte var err error var chans []*Channel var memoryMsgChan chan *Message var backendChan chan []byte t.RLock() //取出Topic的所有Channel for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) > 0 && !t.IsPaused() { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } // main message loop for { select { //从memoryMsgChan读取一个Msg case msg = <-memoryMsgChan: case <-t.exitChan: goto exit } //对每个Channel写入Msg for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } //把Msg传入channel的PutMessage err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } exit: t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) }
总结上面部分,nsq很好的利用golang的特性-goroutine之间通过chan来进行通信,如果要对一个Topic写入Msg,只要往memoryMsgChan写入Msg。
每个Topic在NewTopic()创建之初都会开启一个goroutine负责监听该Topic的memoryMsgChan,一旦有消息就有复制N份Msg写入下面的Channel。
这里可以看到,通过select的方式来读取消息,NSQ的消息是无序的。
消息的产生
消息是由client主动进行pub指令进行发布的,我们看到主要的IOLoop()函数里,Exec会包含所有指令对应的函数。pub是生产者产生消息的指令,sub是消费者订阅消息的指令。
func (p *protocolV2) IOLoop(conn net.Conn) error { // ... response, err = p.Exec(client, params) // ... } func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { // ... case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) // ... } func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { // ... topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) // ... } func (t *Topic) PutMessage(m *Message) error { // ... err := t.put(m) // ... return nil } func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil }
PUB函数中,client会将Msg放入对应Topic的memoryMsgChan中,如果被阻塞,将会写入Backend中,Backend是磁盘存储,然后就是上一节所说的每个Topic一直有一个goroutine将Msg复制分发到所有的Channel中,Channel中的Msg等待client消费者去获取。
消息的分发
消费者发送sub指令后,订阅指定topic下的指定channel,获得SubEventChan。每个client连接后,IOLoop还会开启一个messagePump去把channel中的Msg发送给client。
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { var channel *Channel for { topic := p.ctx.nsqd.GetTopic(topicName) // 获取指定Topic下的指定channel,如果没有则创建 channel = topic.GetChannel(channelName) channel.AddClient(client.ID, client) if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { channel.RemoveClient(client.ID) time.Sleep(1 * time.Millisecond) continue } break } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // 把这个channel传给消费者的SubEventChan,该值会在IOLoop的第二个goroutine的messagePump中读取 client.SubEventChan <- channel } func (p *protocolV2) IOLoop(conn net.Conn) error { clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize // goroutine local state derived from client attributes // and avoid a potential race with IDENTIFY (where a client // could have changed or disabled said attributes) // 相当于标识,下面会阻塞该channel来保证goroutine的初始化的完成 messagePumpStartedChan := make(chan bool) // 如果client订阅了topic,messagePump会将Msg最终分发到client go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan } func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var memoryMsgChan chan *Message var backendMsgChan chan []byte var subChannel *Channel subEventChan := client.SubEventChan identifyEventChan := client.IdentifyEventChan outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout // signal to the goroutine that started the messagePump // that we've started up close(startedChan) for { if subChannel == nil || !client.IsReadyForMessages() { // ... } else { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-client.ReadyStateChan: // sub订阅之后可以获取subChannel case subChannel = <-subEventChan: // 每个client只能订阅一个channel subEventChan = nil case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ // inflight 队列用来实现“至少投递一次消息” subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } }
整理整个流程,Client连接的是Channel,Topic在接收到消息后会分发到左右的Channel,如果多个Client连接同一个Channel,那么从实现上来看,每个消息在由Channel分发到Client的时候实现了负载均衡。每个消息在多个Client中,只会有一个接收到。这么回头看上一篇的消息传递图,就很明了了。
image.png
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 消费端如何保证消息队列MQ的有序消费
- RocketMQ & Kafka 消息消费与消息重试
- 日常 Bug 排查:消息不消费
- RabbitMQ拉模式批量消费消息
- 消息队列的消费语义和投递语义
- RocketMQ 主从如何同步消息消费进度?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。