NSQ--nsqd初始化启动
栏目: JavaScript · 发布时间: 7年前
内容简介:启动的main函数在nsq/apps/nsqd/nsqd.go里.初始化处理启动处理
启动的main函数在nsq/apps/nsqd/nsqd.go里.
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
使用了svc的库,程序安全退出、服务控制两个方面。
核心在于系统信号获取、Go Concurrency Patterns、以及基本的代码封装。
import "github.com/judwhite/go-svc/svc"
复制代码
初始化处理
func (p *program) Init(env svc.Environment) error 复制代码
启动处理
func (p *program) Start() error 复制代码
结束处理
func (p *program) Start() error复制代码
主要看下Start():
func (p *program) Start() error {
//调用nsqFlagset和Parse进行命令行参数集初始化
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
//判断config参数是否存在,若存在的话还需进行配置文件的读取
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
//配置文件检查通过后,创建默认配置opts,并于命令行参数和配置文件进行合并。
options.Resolve(opts, flagSet, cfg)
//nsgd的代码在nsg/nsgd/nsgd.go的里面
//创建一个nsgd
nsqd := nsqd.New(opts)
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
p.nsqd = nsqd
return nil
}复制代码
LoadMetadata(),负责加载保存在文件中的Topic出来:
func (n *NSQD) LoadMetadata() error {
//使用atomic包中的方法来保证方法执行前和执行后isLoading值的改变
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
//元数据以json格式保存在nsqd可执行文件目录下的nsqd.%d.dat中。其中%d为代表该程序的ID, //通过在启动时的命令行worker-id或者配置文件中的id指定。默认ID是通过对主机名散列后获得。 //因此保证了同一台机器每次启动的ID相同。
fn := newMetadataFile(n.getOpts())
data, err := readOrEmpty(fn)
if err != nil {
return err
}
if data == nil {
return nil // fresh start
}
var m meta
err = json.Unmarshal(data, &m)
if err != nil {
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}
for _, t := range m.Topics {
//检查topic名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)
//,若不合法则忽略
if !protocol.IsValidTopicName(t.Name) {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
//使用GetTopic函数通过名字获得topic对象
topic := n.GetTopic(t.Name)
//判断当前topic对象是否处于暂停状态,是的话调用Pause函数暂停topic
if t.Paused {
topic.Pause()
}
//获取当前topic下所有的channel,并且遍历channel,执行的操作与topic基本一致
for _, c := range t.Channels {
//检查channel名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)
//,若不合法则忽略
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
//使用GetChannel函数通过名字获得channel对象
channel := topic.GetChannel(c.Name)
//判断当前channel对象是否处于暂停状态,是的话调用Pause函数暂停channel
if c.Paused {
channel.Pause()
}
}
topic.Start()
}
return nil
}复制代码
PersistMetadata()将当前的topic和channel信息写入nsqd.%d.dat文件中:
func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())
n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)
js := make(map[string]interface{})
topics := []interface{}{}
//将topicMap转成map[string]interface{}的map,转成Json
for _, topic := range n.topicMap {
if topic.ephemeral {
continue
}
topicData := make(map[string]interface{})
topicData["name"] = topic.name
topicData["paused"] = topic.IsPaused()
channels := []interface{}{}
topic.Lock()
for _, channel := range topic.channelMap {
channel.Lock()
if channel.ephemeral {
channel.Unlock()
continue
}
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
channels = append(channels, channelData)
channel.Unlock()
}
topic.Unlock()
topicData["channels"] = channels
topics = append(topics, topicData)
}
js["version"] = version.Binary
js["topics"] = topics
data, err := json.Marshal(&js)
if err != nil {
return err
}
//写入文件时先创建扩展名为tmp的临时文件
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
//将topic和channel列表json序列化后写回文件中
err = writeSyncFile(tmpFileName, data)
if err != nil {
return err
}
// 写入内容后并保存后再调用atomicRename函数将tmp文件重命名为nsqd.%d.dat。
err = os.Rename(tmpFileName, fileName)
if err != nil {
return err
}
// technically should fsync DataPath here
return nil
}复制代码
参考至https://www.cnblogs.com/zhangboyu/p/7457061.html
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Android 应用启动性能:延迟初始化
- Composer 的 Autoload 源码实现——启动与初始化
- SpringBoot 启动分析(三) — Environment 的初始化流程
- Spring MVC源码(一) ----- 启动过程与组件初始化
- 如何用Spring Boot解决项目启动时初始化资源?
- Activiti6.0教程(2) - 初始化表, 部署流程, 启动流程, 创建的表介绍
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Think Python
Allen B. Downey / O'Reilly Media / 2012-8-23 / GBP 29.99
Think Python is an introduction to Python programming for students with no programming experience. It starts with the most basic concepts of programming, and is carefully designed to define all terms ......一起来看看 《Think Python》 这本书的介绍吧!