内容简介:nsq是一个实时分布式的消息队列平台。核心部分是一个叫nsqd的模块,它负责接收和转发消息。同时在go-nsq的包中,提供了consumer和producer的核心接口。在读nsq源码的时候,很好奇它的数据是怎么从producer给到了consumer的,于是从源码的层面梳理了一下代码的实现细节。这部分先记录一下producer和consumer的代码细节,方便后续再查看相关代码。后面准备把nsqd和nsqdlookup相关的东西记录一下,包括数据分发、数据缓存、服务发现等实现细节。go-nsq里的prod
nsq是一个实时分布式的消息队列平台。
核心部分是一个叫nsqd的模块,它负责接收和转发消息。同时在go-nsq的包中,提供了consumer和producer的核心接口。在读nsq源码的时候,很好奇它的数据是怎么从producer给到了consumer的,于是从源码的层面梳理了一下代码的实现细节。这部分先记录一下producer和consumer的代码细节,方便后续再查看相关代码。后面准备把nsqd和nsqdlookup相关的东西记录一下,包括数据分发、数据缓存、服务发现等实现细节。
go-nsq里的producer和consumer实现的功能就是一句话,提供消息接受和分发的接口。但是它内部的实现确很有意思。
nsq demo
学习源码还是要先从demo起步,首先装好nsq,可以git下来源码编译或者执行下载二进制文件。先写个producer.go,如下
package main import ( "github.com/nsqio/go-nsq" "log" ) func main() { cfg := nsq.NewConfig() r := []byte("hello consumer") addr := "127.0.0.1:4150" p, err := nsq.NewProducer(addr, cfg) if err != nil { log.Print(err) } err = p.Publish("serving123", r) if err != nil { log.Println(err) } } 复制代码
例子里,我创建了一个producer,然后向serving123的topic里发送消息。
下面是consumer的代码,consumer.go
import ( "fmt" "github.com/nsqio/go-nsq" "math/rand" "os" "time" ) type SimpleHandler struct { } func (sh *SimpleHandler) HandleMessage(m *nsq.Message) error { _, err := os.Stdout.Write(m.Body) if err != nil { fmt.Println(err) } return nil } func main() { pause := make(chan bool) caddr := "127.0.0.1:4150" cfg := nsq.NewConfig() channel := fmt.Sprintf("tail%06d#ephemeral", rand.Int()%999999) c, _ := nsq.NewConsumer("serving123", channel, cfg) c.AddHandler(&SimpleHandler{}) c.ConnectToNSQD(caddr) <-pause } 复制代码
consumer,我定义了一个SimpleHandler,它实现了HandleMessage的接口,功能就是向标准输出打印消息。 main函数里,我新建了一个想要消费serving123这个topic的consumer,然后连接上nsqd服务。
运行
直接执行nsqd启一个服务。先执行consumer,然后执行producer,可以看到consumer里打印出的消息。
producer源码解析
完成上面的demo之后,先来看一下producer是怎么玩的。
producer实例,会去调用 Publish
方法去发布消息,这个方法接收了topic和message body。
func (w *Producer) Publish(topic string, body []byte) error { return w.sendCommand(Publish(topic, body)) } 复制代码
内部的另外一个 Publish
方法执行结果作为参数传入 sendCommand
, Publish
方法,构造了一个 Command
的三元消息体
func Publish(topic string, body []byte) *Command { var params = [][]byte{[]byte(topic)} return &Command{[]byte("PUB"), params, body} } 复制代码
sendCommand
方法里,初始化一个名叫doneChan的 ProducerTransaction
的指针,然后,cmd消息三元体,和doneChan一起传入 sendCommandAsync
方法里。最后监听doneChan的管道输出,然后返回error。
func (w *Producer) sendCommand(cmd *Command) error { doneChan := make(chan *ProducerTransaction) err := w.sendCommandAsync(cmd, doneChan, nil) if err != nil { close(doneChan) return err } t := <-doneChan return t.Error } 复制代码
ProducerTransaction
结构体很有意思,它持有一个自己相同类型的指针。目的是将最终的内容自己保存起来。在最终clearup的时候,它会调用一个 finish
方法去触发上面的chan监听返回,从而最终返回退出 sendCommand
。
type ProducerTransaction struct { cmd *Command doneChan chan *ProducerTransaction Error error // the error (or nil) of the publish command Args []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync } 复制代码
这个finish方法会把 ProducerTransaction
它自己本身进行传递,也就是在 t.doneChan <- t
这里
func (t *ProducerTransaction) finish() { if t.doneChan != nil { t.doneChan <- t } } 复制代码
看一下 sendCommandAsync
函数。首先,它会去调用原子操作 atomic.AddInt32
去记录并发producer数量。上面说的doneChan指针,作为一个新的 ProducerTransaction
的参数传入,最终这个新的 ProducerTransaction
的数据传给了 w.transactionChan
监听的channel。
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction, args []interface{}) error { // keep track of how many outstanding producers we're dealing with // in order to later ensure that we clean them all up... atomic.AddInt32(&w.concurrentProducers, 1) defer atomic.AddInt32(&w.concurrentProducers, -1) if atomic.LoadInt32(&w.state) != StateConnected { err := w.connect()//建立和nsqd的连接 if err != nil { return err } } t := &ProducerTransaction{ cmd: cmd, doneChan: doneChan, Args: args, } select { case w.transactionChan <- t: case <-w.exitChan: return ErrStopped } return nil } 复制代码
建立和nsqd的连接,以及发送数据都写在w.connect()。了解了上面channel异步操作,看下这个connect函数
func (w *Producer) connect() error { w.guard.Lock() defer w.guard.Unlock() if atomic.LoadInt32(&w.stopFlag) == 1 { return ErrStopped } switch state := atomic.LoadInt32(&w.state); state { case StateInit: case StateConnected: return nil default: return ErrNotConnected } w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr) logger, logLvl := w.getLogger() w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w}) w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id)) _, err := w.conn.Connect() if err != nil { w.conn.Close() w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err) return err } atomic.StoreInt32(&w.state, StateConnected) w.closeChan = make(chan int) w.wg.Add(1) go w.router() return nil } 复制代码
函数开始部分很好理解,就是进行一些状态验证。 _, err := w.conn.Connect()
这里会实际去建立和nsqd的连接,会在里面跑一个readLoop和一个writeloop去进行读写的相关操作,东西比较多就不再赘述。这里关心的是这个 w.router
方法。
router方法里面开了一个for循环来监听producer的channel,包括transactionChan、responseChan、errorChan、closeChan、exitChan,如果是w.transactionChan,则把这个transaction塞进producer的transactions数组里, 然后向conn里去写消息即向nsqd发送数据 。如果是收到了返回信号或者是错误信号,则会弹出一个transaction。如果收到关闭或者是退出信号,则到exit里面,清理所有transaction,并退出。
func (w *Producer) router() { for { select { case t := <-w.transactionChan: w.transactions = append(w.transactions, t) err := w.conn.WriteCommand(t.cmd) if err != nil { w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err) w.close() } case data := <-w.responseChan: w.popTransaction(FrameTypeResponse, data) case data := <-w.errorChan: w.popTransaction(FrameTypeError, data) case <-w.closeChan: goto exit case <-w.exitChan: goto exit } } exit: w.transactionCleanup() w.wg.Done() w.log(LogLevelInfo, "exiting router") } 复制代码
popTransaction
方法,会把transactions第一个元素弹出,保存剩下的元素。然后回去调用这个弹出的 ProducerTransaction
的 finish()
方法,也就是上面说的finish()方法。上面的 sendCommand
方法会受到通知退出,这样就完成了消息的发布过程。
func (w *Producer) popTransaction(frameType int32, data []byte) { t := w.transactions[0] w.transactions = w.transactions[1:] if frameType == FrameTypeError { t.Error = ErrProtocol{string(data)} } t.finish() } 复制代码
总结
限于篇幅,consumer相关准备起另外一篇去写,consumer和producer在和nsqd通信的地方会复用一些代码。
关于nsq的producer,我们能学习到的是它channel的使用方法,以及进行数据传输的时候,是如何运载数据和如何通知和监听channel。核心的部分在于 ProducerTransaction
这个结构体,它负责了消息的运载。在上面提到的readLoop和writeloop,里面有许多代理的方法也值得关注和学习,具体内容可以自己看一眼。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- ReactNative源码解析-初识源码
- 【源码阅读】Gson源码阅读
- Spring源码系列:BeanDefinition源码解析
- istio 源码 – Citadel 源码分析 (原创)
- istio 源码 – pilot 源码分析(原创)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。