NSQ--nsqd初始化启动
栏目: JavaScript · 发布时间: 7年前
内容简介:启动的main函数在nsq/apps/nsqd/nsqd.go里.初始化处理启动处理
启动的main函数在nsq/apps/nsqd/nsqd.go里.
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
使用了svc的库,程序安全退出、服务控制两个方面。
核心在于系统信号获取、Go Concurrency Patterns、以及基本的代码封装。
import "github.com/judwhite/go-svc/svc"
复制代码
初始化处理
func (p *program) Init(env svc.Environment) error 复制代码
启动处理
func (p *program) Start() error 复制代码
结束处理
func (p *program) Start() error复制代码
主要看下Start():
func (p *program) Start() error {
//调用nsqFlagset和Parse进行命令行参数集初始化
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
//判断config参数是否存在,若存在的话还需进行配置文件的读取
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
//配置文件检查通过后,创建默认配置opts,并于命令行参数和配置文件进行合并。
options.Resolve(opts, flagSet, cfg)
//nsgd的代码在nsg/nsgd/nsgd.go的里面
//创建一个nsgd
nsqd := nsqd.New(opts)
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
p.nsqd = nsqd
return nil
}复制代码
LoadMetadata(),负责加载保存在文件中的Topic出来:
func (n *NSQD) LoadMetadata() error {
//使用atomic包中的方法来保证方法执行前和执行后isLoading值的改变
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
//元数据以json格式保存在nsqd可执行文件目录下的nsqd.%d.dat中。其中%d为代表该程序的ID, //通过在启动时的命令行worker-id或者配置文件中的id指定。默认ID是通过对主机名散列后获得。 //因此保证了同一台机器每次启动的ID相同。
fn := newMetadataFile(n.getOpts())
data, err := readOrEmpty(fn)
if err != nil {
return err
}
if data == nil {
return nil // fresh start
}
var m meta
err = json.Unmarshal(data, &m)
if err != nil {
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}
for _, t := range m.Topics {
//检查topic名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)
//,若不合法则忽略
if !protocol.IsValidTopicName(t.Name) {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
//使用GetTopic函数通过名字获得topic对象
topic := n.GetTopic(t.Name)
//判断当前topic对象是否处于暂停状态,是的话调用Pause函数暂停topic
if t.Paused {
topic.Pause()
}
//获取当前topic下所有的channel,并且遍历channel,执行的操作与topic基本一致
for _, c := range t.Channels {
//检查channel名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)
//,若不合法则忽略
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
//使用GetChannel函数通过名字获得channel对象
channel := topic.GetChannel(c.Name)
//判断当前channel对象是否处于暂停状态,是的话调用Pause函数暂停channel
if c.Paused {
channel.Pause()
}
}
topic.Start()
}
return nil
}复制代码
PersistMetadata()将当前的topic和channel信息写入nsqd.%d.dat文件中:
func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())
n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)
js := make(map[string]interface{})
topics := []interface{}{}
//将topicMap转成map[string]interface{}的map,转成Json
for _, topic := range n.topicMap {
if topic.ephemeral {
continue
}
topicData := make(map[string]interface{})
topicData["name"] = topic.name
topicData["paused"] = topic.IsPaused()
channels := []interface{}{}
topic.Lock()
for _, channel := range topic.channelMap {
channel.Lock()
if channel.ephemeral {
channel.Unlock()
continue
}
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
channels = append(channels, channelData)
channel.Unlock()
}
topic.Unlock()
topicData["channels"] = channels
topics = append(topics, topicData)
}
js["version"] = version.Binary
js["topics"] = topics
data, err := json.Marshal(&js)
if err != nil {
return err
}
//写入文件时先创建扩展名为tmp的临时文件
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
//将topic和channel列表json序列化后写回文件中
err = writeSyncFile(tmpFileName, data)
if err != nil {
return err
}
// 写入内容后并保存后再调用atomicRename函数将tmp文件重命名为nsqd.%d.dat。
err = os.Rename(tmpFileName, fileName)
if err != nil {
return err
}
// technically should fsync DataPath here
return nil
}复制代码
参考至https://www.cnblogs.com/zhangboyu/p/7457061.html
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Android 应用启动性能:延迟初始化
- Composer 的 Autoload 源码实现——启动与初始化
- SpringBoot 启动分析(三) — Environment 的初始化流程
- Spring MVC源码(一) ----- 启动过程与组件初始化
- 如何用Spring Boot解决项目启动时初始化资源?
- Activiti6.0教程(2) - 初始化表, 部署流程, 启动流程, 创建的表介绍
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
MySQL技术内幕
姜承尧 / 机械工业出版社 / 2013-5 / 79.00元
《MySQL技术内幕:InnoDB存储引擎(第2版)》由国内资深MySQL专家亲自执笔,国内外多位数据库专家联袂推荐。作为国内唯一一本关于InnoDB的专著,《MySQL技术内幕:InnoDB存储引擎(第2版)》的第1版广受好评,第2版不仅针对最新的MySQL 5.6对相关内容进行了全面的补充,还根据广大读者的反馈意见对第1版中存在的不足进行了完善,《MySQL技术内幕:InnoDB存储引擎(第2......一起来看看 《MySQL技术内幕》 这本书的介绍吧!