内容简介:nsqd为nsq daemon的简写,是nsq组件最主要的服务。nsqd提供一个tcp服务、一个http服务以及一个可选的https服务,tcp服务于客户端(生产者或消费者),http则提供API(可用于创建、删除topic与channel,生产数据,清空数据等)。nsqd的启动入口为apps/nsqd/nsqd.go文件里的main函数。
Nsqd源码阅读
简介
nsqd为nsq daemon的简写,是nsq组件最主要的服务。
nsqd提供一个tcp服务、一个http服务以及一个可选的https服务,tcp服务于客户端(生产者或消费者),http则提供API(可用于创建、删除topic与channel,生产数据,清空数据等)。
初始化
nsqd的启动入口为apps/nsqd/nsqd.go文件里的main函数。
首先定义了一个program的结构体,用于对程序的控制。结构体内元素为指向NSQD的指针。
main函数里面定义了一个具体的prg,然后Run它。
Run函数负责启动prg并阻塞,直至接收到对应的信号(对于nsqd为SIGINT或者SIGTERM信号)。
type program struct { nsqd *nsqd.NSQD } func main() { //定义一个具体的prg,并启动它 prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { log.Fatal(err) } }
Run函数会调用program的Start方法,并调用Main()来启动nsqd服务。
func (p *program) Start() error { opts := nsqd.NewOptions() ... //配置读取过程,会修改opt nsqd := nsqd.New(opts) ... //Metadata的处理,以后再说 nsqd.Main() //启动nsqd服务 p.nsqd = nsqd return nil }
启动
nsqd首先启动一个Tcp服务、一个Http服务以及一个可选的Https服务,然后调用queueScanLoop函数来处理in-flight与defered数据。
func (n *NSQD) Main() { var httpListener net.Listener var httpsListener net.Listener ctx := &context{n} //连续启动Tcp、Https、Http服务 tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress) ... if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) ... } httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) ... n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } }
客户端连接
客户端连接nsqd的tcp server以后,nsqd会启动一个IOLoop,IOLoop里面首先启动messagePump,然后启动循环处理后续请求。
messagePump负责将Channel里面的消息取出来,并push给客户端。
func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) // messagePump初始化会用到client的一些参数,这里的messagePumpStartedChan保证了初始化完成以后才会接收新的请求,避免了IDENTIFY请求对client的参数可能进行的修改。 messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan for { ... //读取下一次请求 line, err = client.Reader.ReadSlice('\n') ... params := bytes.Split(line, separatorBytes) p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) //处理请求 response, err = p.Exec(client, params) ... if response != nil { //发送响应 err = p.Send(client, frameTypeResponse, response) ... } } ... }
数据生产
topic创建
调用http的"/topic/create"接口、"/pub"接口,tcp的SUB/PUB请求等都会触发topic的创建。
创建topic位于NSQD的GetTopic方法。
首先使用读锁判断topic是否存在,如果存在则直接返回;如果不存在,则加写锁,然后调用NewTopic函数创建新的topic。
func (n *NSQD) GetTopic(topicName string) *Topic { //读锁判断topic是否存在,如果存在则直接返回 // most likely, we already have this topic, so try read lock first. n.RLock() t, ok := n.topicMap[topicName] n.RUnlock() if ok { return t } n.Lock() //获取写锁后再次判断topic是否存在,如果存在则直接返回 t, ok = n.topicMap[topicName] if ok { n.Unlock() return t } deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } //创建topic t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // release our global nsqd lock, and switch to a more granular topic lock while we init our // channels from lookupd. This blocks concurrent PutMessages to this topic. t.Lock() n.Unlock() // 使用lookup的相关处理,如果不使用可以先忽略 ... t.Unlock() // 触发messagePump更新channel状态 select { case t.channelUpdateChan <- 1: case <-t.exitChan: } return t }
生产消息
调用http的"/pub"接口,或者tcp的PUB操作,都可以将消息发送给nsqd,nsqd首先将消息存入topic中作为过渡。
两种处理过程分别在(s httpServer)的doPUB方法与(p protocolV2)的PUB方法,二者殊途同归,都会调用(t *Topic)的PutMessage方法,将消息写入topic中。
func (t *Topic) PutMessage(m *Message) error { ... succ, err := t.put(m) ... } func (t *Topic) put(m *Message) (succ bool, err error) { if t.putMode == PUTMODE_NORMAL{ select { //将消息写入到Topic的memoryMsgChan case t.memoryMsgChan <- m: default: t.put2Disk(m) } }else{ ... } }
数据消费
channel创建
消费者的“订阅”(SUB)请求会触发channel的创建,在tcp服务的SUB处理里面。
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { ... topicName := string(params[1]) ... channelName := string(params[2]) ... // 防止该topic或者channel是正处于退出状态 var channel *Channel for { //获取或者创建topic topic := p.ctx.nsqd.GetTopic(topicName) //获取或者创建channel channel = topic.GetChannel(channelName) channel.AddClient(client.ID, client) if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { channel.RemoveClient(client.ID) time.Sleep(1 * time.Millisecond) continue } break } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // 将channel告知client client.SubEventChan <- channel return okBytes, nil }
topic的消息复制到channel
topic创建时调用的NewTopic函数会启动messagePump函数,负责更新channel,并将topic中的消息复制到所有channel。
// messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic func (t *Topic) messagePump() { //获取所有channel t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) > 0 { memoryMsgChan = t.memoryMsgChan limitedMsgChan = t.limitedMsgChan backendChan = t.backend.ReadChan() } for { select { //接收消息 case msg = <-memoryMsgChan: if msg == nil { continue } ... //更新channel case <-t.channelUpdateChan: chans = chans[:0] t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { limitedMsgChan = t.limitedMsgChan memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue ... case <-t.exitChan: goto exit } if msg == nil { continue } //将消息发送到所有channel for i, channel := range chans { ... chanMsg := msg //为每一个channel单独复制一份数据 if i > 0 { chanMsg = NewMessage(msg.ID,msg.MsgType, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } ... //将消息存储到channel //此处的PutMessage与Topic的同名方法类似,也是将消息写到channel的memoryMsgChan err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } exit: t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) } //PutMessage会调用到c.put(msg) func (c *Channel) put(m *Message) (succ bool, err error) { succ = true if c.putMode == PUTMODE_NORMAL { select { //将数据发送到Channel的memoryMsgChan case c.memoryMsgChan <- m: default: ... } }else{ ... } }
数据push到消费者
回忆下前面介绍的两点:一是客户端连接时,会启动messagePump负责将Channel里面的消息取出来,并push给客户端;二是channel创建时,会将创建的channel告知client。
messagePump获取创建的这个channel,并从channel的memoryMsgChan接收消息,然后push给消费者。
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { ... //channel创建成功后,通过SubEventChan告知client subEventChan := client.SubEventChan for { if subChannel == nil || !client.IsReadyForMessages() { ... } else if flushed { ... } else { // 获取channel的memoryMsgChan memoryMsgChan = subChannel.memoryMsgChan ... } select { ... case subChannel = <-subEventChan: p.ctx.nsqd.logf(LOG_INFO, "get subEventChan:%+v", subChannel) // you can't SUB anymore subEventChan = nil ... //从memoryMsgChan里接收消息,并push给客户端 case msg := <-memoryMsgChan: if msg == nil{ continue } if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ //将消息放入in-flight队列 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() p.ctx.nsqd.logf(LOG_INFO, "get memory msg:%+v", msg) //将消息push给消费者 err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false ... case <-client.ExitChan: goto exit } } exit: ... }
in-flight数据与deferred数据处理
回一下前面讲到两点:一是queueScanLoop函数会处理in-flight数据与deferred数据;二是消息push给消费者之前会调用StartInFlightTimeout将该消息放入in-flight队列。
queueScanLoop管理一个queueScanWorker pool(默认大小为4),各个worker并发处理channel数据。
in-flight数据的存储与清理
in-flight数据存储时会记录下该消息的到期时间,以便到期后将该消息重新push给消费者。
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error { now := time.Now() msg.clientID = clientID msg.deliveryTS = now //存储到期时间 msg.pri = now.Add(timeout).UnixNano() err := c.pushInFlightMessage(msg) if err != nil { return err } c.addToInFlightPQ(msg) return nil }
如果消费者成功接收,则会回应一个"FIN",nsqd收到"FIN"则将该消息从in-flight队列中清除。
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { ... id, err := getMessageID(params[1]) if err != nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error()) } //将该消息从in-flight队列中清除 err = client.Channel.FinishMessage(client.ID, *id) if err != nil { return nil, protocol.NewClientErr(err, "E_FIN_FAILED", fmt.Sprintf("FIN %s failed %s", *id, err.Error())) } client.FinishedMessage() return nil, nil }
defered数据的存储与清理
如果消费者收到消息以后,如果一时间自己处理不过来,可以通过"REQ"将该消息重新入队,并可以设定多长时间后重新消费,时间为0的话则立即消费,否则延迟消费。
延迟消费的处理方式与in-flight数据类似,也是先写入到一个队列,并设定到期时间,等待重新读取。
下面介绍这两部分数据时如何重新消费的,主要是queueScanLoop的处理逻辑。
worker的创建与销毁
worker的创建与销毁是在resizePool函数。
worker的完美个数为channel总数的四分之一,但是不能大于QueueScanWorkerPoolMax。
1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
所有的worker都会监听同一个workCh、closeCh,如果worker过多,则只需要向closeCh写入一个“通知”,收到这个“通知”的worker就会被销毁。
一次for循环只创建或销毁一个worker,直至worker数目达到idealPoolSize。
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // contract closeCh <- 1 n.poolSize-- } else { // expand n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } }
channel的选择
queueScanLoop的处理方法模仿了 Redis 的概率到期算法(probabilistic expiration algorithm):每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择,从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel,如果某个被选中channel的任何一个queue有事可做,则认为该channel为“脏”channel。如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%),则不投入睡眠,直接进行下一次概率选择。
channel缓存每QueueScanRefreshInterval(默认5s)刷新一次。
queueScanLoop与worker的交互
queueScanLoop与worker之间通过workCh与responseCh来进行交互。
- workCh:queueScanLoop随机选择一定数目的channel后,通过workCh告诉worker。
- responseCh:worker处理完成后,通过responseCh反馈该channel是否为“脏”。
func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) //根据channel数目,创建worker channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: //更新channel缓存,并据此创建或者销毁worker channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } //workTicker到期,且channels长度不为0时,会走到这里。 num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: //随机选择num个channel,并传入workCh for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } //等待这num个channel的处理结果(是否为“脏”channel) numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } //如果“脏”channel达到一定比例,直接进行下次处理 if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } exit: n.logf(LOG_INFO, "QUEUESCAN: closing") close(closeCh) workTicker.Stop() refreshTicker.Stop() }
worker处理
worker从queueScanLoop接收需要处理的channel,处理该channel的in-flight数据与deferred数据。processInFlightQueue与processDeferredQueue函数都会调用c.put(msg),将数据发送到Channel的memoryMsgChan,进而重新被push到消费者。
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false //处理in-flight消息 if c.processInFlightQueue(now) { dirty = true } //处理defered消息 if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: return } } }
func (c *Channel) processInFlightQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.inFlightMutex.Lock() //获取超时的消息 msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true //判断该消息是否属于这个client _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } //将消息重新写入channel c.put(msg) } exit: return dirty }
processDeferredQueue的处理与此类似。
以上所述就是小编给大家介绍的《【源码阅读】Nsqd》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- 【源码阅读】Gson源码阅读
- 如何阅读Java源码 ,阅读java的真实体会
- 我的源码阅读之路:redux源码剖析
- JDK源码阅读(六):HashMap源码分析
- 如何阅读源码?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。