NSQ--nsqd初始化启动

栏目: JavaScript · 发布时间: 7年前

内容简介:启动的main函数在nsq/apps/nsqd/nsqd.go里.初始化处理启动处理

启动的main函数在nsq/apps/nsqd/nsqd.go里.

func main() {   
    prg := &program{}   
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {      
        log.Fatal(err)   
    }
}


使用了svc的库,程序安全退出、服务控制两个方面。
核心在于系统信号获取、Go Concurrency Patterns、以及基本的代码封装。
import "github.com/judwhite/go-svc/svc"
复制代码

初始化处理

func (p *program) Init(env svc.Environment) error 复制代码

启动处理

func (p *program) Start() error 复制代码

结束处理

func (p *program) Start() error复制代码

主要看下Start():

func (p *program) Start() error { 
    //调用nsqFlagset和Parse进行命令行参数集初始化  
    opts := nsqd.NewOptions()   
    flagSet := nsqdFlagSet(opts)   
    flagSet.Parse(os.Args[1:])   
    rand.Seed(time.Now().UTC().UnixNano())   
    if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {      
        fmt.Println(version.String("nsqd"))      
        os.Exit(0)   
    }   

    //判断config参数是否存在,若存在的话还需进行配置文件的读取
    var cfg config   
    configFile := flagSet.Lookup("config").Value.String()   
    if configFile != "" {      
        _, err := toml.DecodeFile(configFile, &cfg)      
        if err != nil {         
        log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())      
        }   
    }  
    cfg.Validate()  
    //配置文件检查通过后,创建默认配置opts,并于命令行参数和配置文件进行合并。
    options.Resolve(opts, flagSet, cfg) 
    //nsgd的代码在nsg/nsgd/nsgd.go的里面
    //创建一个nsgd  
    nsqd := nsqd.New(opts)   
    err := nsqd.LoadMetadata()   
    if err != nil {      
        log.Fatalf("ERROR: %s", err.Error())   
    }   
    err = nsqd.PersistMetadata()   
    if err != nil {      
        log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())   
    }   
    nsqd.Main()   
    p.nsqd = nsqd   
    return nil
}复制代码

LoadMetadata(),负责加载保存在文件中的Topic出来:

func (n *NSQD) LoadMetadata() error {   
    //使用atomic包中的方法来保证方法执行前和执行后isLoading值的改变
    atomic.StoreInt32(&n.isLoading, 1)   
    defer atomic.StoreInt32(&n.isLoading, 0)
    //元数据以json格式保存在nsqd可执行文件目录下的nsqd.%d.dat中。其中%d为代表该程序的ID,     //通过在启动时的命令行worker-id或者配置文件中的id指定。默认ID是通过对主机名散列后获得。     //因此保证了同一台机器每次启动的ID相同。   
    fn := newMetadataFile(n.getOpts())   
    data, err := readOrEmpty(fn)   
    if err != nil {      
        return err   
    }   
    if data == nil {      
        return nil // fresh start   
    }   
    var m meta   
    err = json.Unmarshal(data, &m)   
    if err != nil {      
        return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)   
    }   
    for _, t := range m.Topics {      
        //检查topic名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)      
        //,若不合法则忽略      
        if !protocol.IsValidTopicName(t.Name) {         
            n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)         
            continue      
        }      
        //使用GetTopic函数通过名字获得topic对象      
        topic := n.GetTopic(t.Name)      
        //判断当前topic对象是否处于暂停状态,是的话调用Pause函数暂停topic      
        if t.Paused {         
            topic.Pause()      
        }     
         //获取当前topic下所有的channel,并且遍历channel,执行的操作与topic基本一致      
        for _, c := range t.Channels {         
            //检查channel名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)         
            //,若不合法则忽略         
            if !protocol.IsValidChannelName(c.Name) {            
                n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)            
                continue         
            }         
            //使用GetChannel函数通过名字获得channel对象         
            channel := topic.GetChannel(c.Name)         
            //判断当前channel对象是否处于暂停状态,是的话调用Pause函数暂停channel         
            if c.Paused {            
                channel.Pause()         
            }      
        }      
        topic.Start()   
    }   
    return nil
}复制代码

PersistMetadata()将当前的topic和channel信息写入nsqd.%d.dat文件中:

func (n *NSQD) PersistMetadata() error {   
    // persist metadata about what topics/channels we have, across restarts   
    fileName := newMetadataFile(n.getOpts())   
    n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)   
    js := make(map[string]interface{})   
    topics := []interface{}{}   
    //将topicMap转成map[string]interface{}的map,转成Json   
    for _, topic := range n.topicMap {      
        if topic.ephemeral {         
        continue      
        }      
        topicData := make(map[string]interface{})      
        topicData["name"] = topic.name      
        topicData["paused"] = topic.IsPaused()      
        channels := []interface{}{}      
        topic.Lock()      
        for _, channel := range topic.channelMap {         
           channel.Lock()         
            if channel.ephemeral {            
                channel.Unlock()            
                continue         
            }         
            channelData := make(map[string]interface{})         
            channelData["name"] = channel.name         
            channelData["paused"] = channel.IsPaused()         
            channels = append(channels, channelData)         
            channel.Unlock()      
        }      
        topic.Unlock()      
        topicData["channels"] = channels      
        topics = append(topics, topicData)   
    }   
    js["version"] = version.Binary   
    js["topics"] = topics   
    data, err := json.Marshal(&js)   
    if err != nil {      
        return err   
    }   
    //写入文件时先创建扩展名为tmp的临时文件   
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())   
    //将topic和channel列表json序列化后写回文件中   
    err = writeSyncFile(tmpFileName, data)   
    if err != nil {      
        return err   
    }   
    // 写入内容后并保存后再调用atomicRename函数将tmp文件重命名为nsqd.%d.dat。   
    err = os.Rename(tmpFileName, fileName)   
    if err != nil {      
        return err   
    }   
    // technically should fsync DataPath here   
    return nil
}复制代码

参考至https://www.cnblogs.com/zhangboyu/p/7457061.html


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Think Python

Think Python

Allen B. Downey / O'Reilly Media / 2012-8-23 / GBP 29.99

Think Python is an introduction to Python programming for students with no programming experience. It starts with the most basic concepts of programming, and is carefully designed to define all terms ......一起来看看 《Think Python》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具