内容简介:Nodata版本VERSION = "0.0.11"Nodata组件功能
Nodata版本
VERSION = "0.0.11"
Nodata组件功能
nodata用于检测监控数据的上报异常。nodata和实时报警judge模块协同工作,过程为: 配置了nodata的采集项超时未上报数据,nodata生成一条默认的模拟数据;用户配置相应的报警策略,收到mock数据就产生报警。采集项上报异常检测,作为judge模块的一个必要补充,能够使judge的实时报警功能更加可靠、完善。【官方描述】
Nodata组件逻辑图
系统流图
官方系统流图
模块结构
官方模块结构图
main入口分析
func main() { //命令参数解析 cfg := flag.String("c", "cfg.json", "configuration file") version := flag.Bool("v", false, "show version") versionGit := flag.Bool("vg", false, "show version") flag.Parse() //版本输出 if *version { fmt.Println(g.VERSION) os.Exit(0) } //gitcommit序列号输出 if *versionGit { fmt.Println(g.VERSION, g.COMMIT) os.Exit(0) } // 全局配置格式化 g.ParseConfig(*cfg) // 统计 g.StartProc() // 缓存Nodata配置 config.Start() // 【参考详细分析】 // 缓存Nodata配置主机的采集数据点 collector.Start() // 【参考详细分析】 // judge策略判断 judge.Start() // 【参考详细分析】 // http API服务 http.Start() // 【参考详细分析】 select {} }
config.Start() 从DB加载Nodata配置(dashboard配置nodata策略写入mysql)
# 加载nodata配置主函数 func Start() { if !g.Config().Config.Enabled { log.Println("config.Start warning, not enabled") return } service.InitDB() //初始化DB StartNdConfigCron() //加载nodata配置缓存至内存 log.Println("config.Start ok") } ## 初始化DB连接 func InitDB() { _, err := GetDbConn(dbBaseConnName) //"db.base"连接conn初始化并保存至内存Map if err != nil { log.Fatalln("config.InitDB error", err) return // never go here } log.Println("config.InitDB ok") } ### GetDbConn实现函数 ### makeDbConn函数实现 sql 客户端连接dbconn ### 内存map dbConnMap[connName]保存dbconn func GetDbConn(connName string) (c *sql.DB, e error) { dbLock.Lock() defer dbLock.Unlock() var err error var dbConn *sql.DB dbConn = dbConnMap[connName] if dbConn == nil { dbConn, err = makeDbConn() //创建sql客户端连接 if err != nil { closeDbConn(dbConn) return nil, err } dbConnMap[connName] = dbConn } err = dbConn.Ping() //dbconn检测,conn.Ping() if err != nil { closeDbConn(dbConn) //dbconn关闭,conn.Close() delete(dbConnMap, connName) return nil, err } return dbConn, err } func makeDbConn() (conn *sql.DB, err error) { conn, err = sql.Open("mysql", g.Config().Config.Dsn) if err != nil { return nil, err } conn.SetMaxIdleConns(int(g.Config().Config.MaxIdle)) err = conn.Ping() return conn, err } ## func StartNdConfigCron() { ndconfigCron.AddFuncCC(ndconfigCronSpec, func() { start := time.Now().Unix() cnt, _ := syncNdConfig() // end := time.Now().Unix() if g.Config().Debug { log.Printf("config cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start)) } // 统计 g.ConfigCronCnt.Incr() g.ConfigLastTs.SetCnt(end - start) g.ConfigLastCnt.SetCnt(int64(cnt)) }, 1) ndconfigCron.Start() // } #### 获取nodata配置、重新格式化及缓存配置全局公开Map(NdConfigMap) func syncNdConfig() (cnt int, errt error) { // 获取nodata配置函数调用 configs := service.GetMockCfgFromDB() // 重新格式化配置NodateConfig结构 nm := nmap.NewSafeMap() for _, ndc := range configs { endpoint := ndc.Endpoint metric := ndc.Metric tags := ndc.Tags if endpoint == "" { log.Printf("bad config: %+v\n", ndc) continue } pk := cutils.PK(endpoint, metric, tags) nm.Put(pk, ndc) } // 缓存map SetNdConfigMap(nm) return nm.Size(), nil //返回map长度 } ##### 底层获取nodata配置实现函数 func GetMockCfgFromDB() map[string]*cmodel.NodataConfig { ret := make(map[string]*cmodel.NodataConfig) dbConn, err := GetDbConn("nodata.mockcfg") //获取dbConn连接 if err != nil { log.Println("db.get_conn error, mockcfg", err) return ret } q := fmt.Sprintf("SELECT id,name,obj,obj_type,metric,tags,dstype,step,mock FROM mockcfg") rows, err := dbConn.Query(q) //执行mockcfg表查询语句 if err != nil { log.Println("db.query error, mockcfg", err) return ret } defer rows.Close() for rows.Next() { //迭代查询结果集 t := MockCfg{} tags := "" err := rows.Scan(&t.Id, &t.Name, &t.Obj, &t.ObjType, &t.Metric, &tags, &t.Type, &t.Step, &t.Mock) if err != nil { log.Println("db.scan error, mockcfg", err) continue } t.Tags = cutils.DictedTagstring(tags) //"tagskey=value"格式化map[Key]Value err = checkMockCfg(&t) //检测配置是否有效 if err != nil { log.Println("check mockcfg, error:", err) continue } endpoints := getEndpoint(t.ObjType, t.Obj) //获取endpoint列表(hosts slice),后面有objtype为"host/group/other"处理函数分析。 if len(endpoints) < 1 { continue } for _, ep := range endpoints { uuid := cutils.PK(ep, t.Metric, t.Tags) //UUID format 'endpoint/metric/k=v,k=v(tags)' ncfg := cmodel.NewNodataConfig(t.Id, t.Name, t.ObjType, ep, t.Metric, t.Tags, t.Type, t.Step, t.Mock) val, found := ret[uuid] if !found { //如果不存在则新建 ret[uuid] = ncfg continue } //如果存在,判断配置类型 if isSpuerNodataCfg(val, ncfg) { // val is spuer than ncfg, so drop ncfg log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, val.Name, ncfg.Name) } else { ret[uuid] = ncfg //如果原val配置类型为group,而新ncfg配置类型为host则覆盖原有配置 log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, ncfg.Name, val.Name) } } } return ret //返回map[UUID]*cmodel.NodataConfig } ###### 根据objType获取Hosts slice func getEndpoint(objType string, obj string) []string { switch objType { case "host": //类型Host与处理函数 return getEndpointFromHosts(obj) case "group": //类型group与处理函数 return getEndpointFromGroups(obj) case "other": //类型other与处理函数 return getEndpointFromOther(obj) default: return make([]string, 0) } } //类型Host与处理函数 func getEndpointFromHosts(hosts string) []string { ret := make([]string, 0) hlist := strings.Split(hosts, "\n") //分隔处理 for _, host := range hlist { nh := strings.TrimSpace(host) if nh != "" { ret = append(ret, nh) } } return ret } //类型group与处理函数 func getEndpointFromGroups(grps string) []string { grplist := strings.Split(grps, "\n") // get host map, avoid duplicating hosts := make(map[string]string) for _, grp := range grplist { ngrp := strings.TrimSpace(grp) if len(ngrp) < 1 { continue } hostmap := GetHostsFromGroup(grp) //根据Group名称获取主机MAP for hostname := range hostmap { if hostname != "" { hosts[hostname] = hostname } } } // get host slice ret := make([]string, 0) for key := range hosts { ret = append(ret, key) } return ret } //类型other与处理函数,同类型Host与处理函数 func getEndpointFromOther(other string) []string { return getEndpointFromHosts(other) }
collector.Start() 缓存Nodata配置主机的采集数据点
# 运行收集nodata数据主函数 func Start() { if !g.Config().Collector.Enabled { log.Println("collector.Start warning, not enabled") return } StartCollectorCron() //周期任务,收集nodata数据 log.Println("collector.Start ok") } ## 定时任务执行,收集函数调用与任务运行 func StartCollectorCron() { collectorCron.AddFuncCC("*/20 * * * * ?", func() { start := time.Now().Unix() cnt := collectDataOnce() //收集函数调用 end := time.Now().Unix() if g.Config().Debug { log.Printf("collect cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start)) } //统计 g.CollectorCronCnt.Incr() g.CollectorLastTs.SetCnt(end - start) g.CollectorLastCnt.SetCnt(int64(cnt)) g.CollectorCnt.IncrBy(int64(cnt)) }, 1) collectorCron.Start() } ### 收集功能实现函数 func collectDataOnce() int { keys := config.Keys() keysLen := len(keys) // 并发+同步控制 cfg := g.Config().Collector //collector全局配置 concurrent := int(cfg.Concurrent) //并发数 if concurrent < 1 || concurrent > 50 { concurrent = 10 } sema := tsema.NewSemaphore(concurrent) //创建并发同步 batch := int(cfg.Batch) //全局批量处理数 if batch < 100 || batch > 1000 { batch = 200 //batch不能太小, 否则channel将会很大 } //根据nodata配置数长度和批量处理数创建channel长度 batchCnt := (keysLen + batch - 1) / batch rch := make(chan int, batchCnt+1) i := 0 for i < keysLen { leftLen := keysLen - i fetchSize := batch // 每次处理batch个配置 if leftLen < fetchSize { fetchSize = leftLen } fetchKeys := keys[i : i+fetchSize] // 并发collect数据 sema.Acquire() //线程批量处理(fetchKeys, fetchSize) go func(keys []string, keySize int) { defer sema.Release() size, err := fetchItemsAndStore(keys, keySize)//批查获取函数调用 if err != nil { log.Printf("fetchItemAndStore fail, size:%v, error:%v", size, err) } if g.Config().Debug { log.Printf("fetchItemAndStore keys:%v, key_size:%v, ret_size:%v", keys, keySize, size) } rch <- size }(fetchKeys, fetchSize) i += fetchSize } collectCnt := 0 //计数 for i := 0; i < batchCnt; i++ { select { case cnt := <-rch: collectCnt += cnt } } return collectCnt } #### 获取数据实现函数 func fetchItemsAndStore(fetchKeys []string, fetchSize int) (size int, errt error) { if fetchSize < 1 { return } // form request args args := make([]*cmodel.GraphLastParam, 0) for _, key := range fetchKeys { ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置 if !found { continue } endpoint := ndcfg.Endpoint //endpoint主机对象 counter := cutils.Counter(ndcfg.Metric, ndcfg.Tags) // 格式metric/tags(k=v,k=v) arg := &cmodel.GraphLastParam{endpoint, counter} //请求参数 args = append(args, arg) } if len(args) < 1 { return } resp, err := queryLastPoints(args)//API调用查询endpoint最近一次采集数据。(POST请求API组件api调用,查看后面函数分析) if err != nil { return 0, err } // store items fts := time.Now().Unix() //存储Items时间float time,Judge逻辑用到 for _, glr := range resp { //log.Printf("collect:%v\n", glr) if glr == nil || glr.Value == nil { continue } AddItem(cutils.PK2(glr.Endpoint, glr.Counter), NewDataItem(glr.Value.Timestamp, float64(glr.Value.Value), "OK", fts)) //缓存收集到的监控数据(ItemMap)。Value.Timestamp数据项时间戳,Value.Value数据项值,"OK"数据项状态,fts数据项存储时间。 } return len(resp), nil } ##### config.GetNdConfig 根据hostname返回NodataConfig配置 func GetNdConfig(key string) (*cmodel.NodataConfig, bool) { rwlock.RLock() defer rwlock.RUnlock() val, found := NdConfigMap.Get(key)//map操作 if found && val != nil { return val.(*cmodel.NodataConfig), true } return &cmodel.NodataConfig{}, false } ##### API组件POST请求实现函数 func queryLastPoints(param []*cmodel.GraphLastParam) (resp []*cmodel.GraphLastResp, err error) { cfg := g.Config() uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", cfg.PlusApi.Addr) //接口定义 var req *httplib.BeegoHttpRequest headers := map[string]string{"Content-type": "application/json"} req, err = requests.CurlPlus(uri, "POST", "nodata", cfg.PlusApi.Token, headers, map[string]string{}) //Curl请求头与方法 if err != nil { return } req.SetTimeout(time.Duration(cfg.PlusApi.ConnectTimeout)*time.Millisecond, time.Duration(cfg.PlusApi.RequestTimeout)*time.Millisecond) b, err := json.Marshal(param) if err != nil { return } req.Body(b) //请求体 err = req.ToJson(&resp) //请求执行与response json if err != nil { return } return resp, nil } ##### 缓存数据点 func AddItem(key string, val *DataItem) { listv, found := ItemMap.Get(key) if !found { ll := tlist.NewSafeListLimited(3) //每个采集指标,缓存最新的3个数据点 ll.PushFrontViolently(val) ItemMap.Put(key, ll) return } listv.(*tlist.SafeListLimited).PushFrontViolently(val) } func NewDataItem(ts int64, val float64, fstatus string, fts int64) *DataItem { return &DataItem{Ts: ts, Value: val, FStatus: fstatus, FTs: fts} } type GraphLastResp struct { Endpoint string `json:"endpoint"` Counter string `json:"counter"` Value *RRDData `json:"value"` }
judge.Start()
# Judge运行入口函数 func Start() { StartJudgeCron() //运行调用 log.Println("judge.Start ok") } # 定时任务Judge执行函数 func StartJudgeCron() { judgeCron.AddFuncCC(judgeCronSpec, func() { start := time.Now().Unix() judge() //执行judge函数调用 end := time.Now().Unix() if g.Config().Debug { log.Printf("judge cron, time %ds, start %s\n", end-start, ttime.FormatTs(start)) } // 统计 g.JudgeCronCnt.Incr() g.JudgeLastTs.SetCnt(end - start) // 触发Mock列表数据发送 sender.SendMockOnceAsync() }, 1) judgeCron.Start() //执行Cron } ## judge实现函数 func judge() { now := time.Now().Unix() keys := config.Keys() for _, key := range keys { ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置 if !found { //策略不存在,不处理 continue } step := ndcfg.Step mock := ndcfg.Mock item, found := collector.GetFirstItem(key) //根据hostname返回collector最近一次配置 if !found { //没有数据,未开始采集,不处理 continue } //3*step(or 180)超时时间 lastTs := now - getTimeout(step) if item.FStatus != "OK" || item.FTs < lastTs { //数据采集失败,不处理 continue } //采集到的数据为mock数据,则认为上报超时了 if fCompare(mock, item.Value) == 0 { //判断采集到的最后一次数据项值timestamp+step(上报周期) //如果小于当前时间则认为已经上报超时了,过了应该上报的周期 //如果大于当前时间则表示还在有效的上报周期内 if LastTs(key)+step <= now { TurnNodata(key, now) //设置nodata状态 genMock(genTs(now, step), key, ndcfg) //添加到Mock列表 } continue } //判断采集到的最后一次数据项值timestamp,如果小于 //3*step(or 180)超时时间则认为数据过期,则认为上报超时 if item.Ts < lastTs { if LastTs(key)+step <= now { TurnNodata(key, now) genMock(genTs(now, step), key, ndcfg) } continue } TurnOk(key, now) } } ### 返回最后一次采集数据的timestamp func LastTs(key string) int64 { statusLock.RLock() var ts int64 = 0 v, found := StatusMap.Get(key) if !found { statusLock.RUnlock() return ts } ts = v.(*NodataStatus).Ts statusLock.RUnlock() return ts } // NoData Data Item Struct type DataItem struct { Ts int64 //timestamp Value float64 FStatus string // OK|ERR FTs int64 //存储时间float } // Nodata Status Struct type NodataStatus struct { Key string Status string // OK|NODATA Cnt int Ts int64 } ### 设置为Nodata状态 func TurnNodata(key string, ts int64) { statusLock.Lock() v, found := StatusMap.Get(key) if !found { // create new status ns := NewNodataStatus(key, "NODATA", 1, ts) StatusMap.Put(key, ns) statusLock.Unlock() return } // update status ns := v.(*NodataStatus) ns.Status = "NODATA" ns.Cnt += 1 ns.Ts = ts statusLock.Unlock() return } ### 添加Item至Mock列表 func genMock(ts int64, key string, ndcfg *cmodel.NodataConfig) { sender.AddMock(key, ndcfg.Endpoint, ndcfg.Metric, cutils.SortedTags(ndcfg.Tags), ts, ndcfg.Type, ndcfg.Step, ndcfg.Mock) } func AddMock(key string, endpoint string, metric string, tags string, ts int64, dstype string, step int64, value interface{}) { item := &cmodel.JsonMetaData{metric, endpoint, ts, step, value, dstype, tags} MockMap.Put(key, item) //put into map }
sender.SendMockOnceAsync() 发送模拟数据
# 发送Mock数据入口函数 func SendMockOnceAsync() { go SendMockOnce() } ## 并发发送和统计 func SendMockOnce() int { if !sema.TryAcquire() { return -1 } defer sema.Release() // not enabled if !g.Config().Sender.Enabled { return 0 } start := time.Now().Unix() cnt, _ := sendMock() //调用发送功能模块 end := time.Now().Unix() if g.Config().Debug { log.Printf("sender cron, cnt %d, time %ds, start %s", cnt, end-start, ttime.FormatTs(start)) } // 统计 g.SenderCronCnt.Incr() g.SenderLastTs.SetCnt(end - start) g.SenderCnt.IncrBy(int64(cnt)) return cnt } ### 发送模拟数据 func sendMock() (cnt int, errt error) { cfg := g.Config().Sender //全局配置加载 batch := int(cfg.Batch) //批量值 connTimeout := cfg.ConnectTimeout //连接超时 requTimeout := cfg.RequestTimeout //API请求超时 // 发送至transfer组件 mocks := MockMap.Slice() //获取Mock列表 MockMap.Clear() //清空列表空间 mockSize := len(mocks) i := 0 for i < mockSize { leftLen := mockSize - i sendSize := batch if leftLen < sendSize { sendSize = leftLen } fetchMocks := mocks[i : i+sendSize] //取一批量 i += sendSize items := make([]*cmodel.JsonMetaData, 0) //整理为slice for _, val := range fetchMocks { if val == nil { continue } items = append(items, val.(*cmodel.JsonMetaData)) } cntonce, err := sendItemsToTransfer(items, len(items), "nodata.mock", time.Millisecond*time.Duration(connTimeout), time.Millisecond*time.Duration(requTimeout)) //API POT调用,发送Mock至Transfer if err == nil { if g.Config().Debug { log.Println("send items:", items) } cnt += cntonce } } return cnt, nil } // API接口调用发送Mock至Transfer func sendItemsToTransfer(items []*cmodel.JsonMetaData, size int, httpcliname string, connT, reqT time.Duration) (cnt int, errt error) { if size < 1 { return } cfg := g.Config() transUlr := fmt.Sprintf("http://%s/api/push", cfg.Sender.TransferAddr) //请求接口API hcli := thttpclient.GetHttpClient(httpcliname, connT, reqT) // 请求体 itemsBody, err := json.Marshal(items) if err != nil { log.Println(transUlr+", format body error,", err) errt = err return } // 构造与执行API req, err := http.NewRequest("POST", transUlr, bytes.NewBuffer(itemsBody)) req.Header.Set("Content-Type", "application/json; charset=UTF-8") //请求内容类型 req.Header.Set("Connection", "close") postResp, err := hcli.Do(req) //执行POST if err != nil { log.Println(transUlr+", post to dest error,", err) errt = err return } defer postResp.Body.Close() //响应状态200判断 if postResp.StatusCode/100 != 2 { log.Println(transUlr+", post to dest, bad response,", postResp.Body) errt = fmt.Errorf("request failed, %s", postResp.Body) return } return size, nil }
http.Start() http API服务
func Start() { go startHttpServer() } func configRoutes() { configCommonRoutes() //公共API路由,可参考Agent模块 configProcHttpRoutes() // configDebugHttpRoutes() //Debug API路由 } func startHttpServer() { if !g.Config().Http.Enabled { return } addr := g.Config().Http.Listen if addr == "" { return } configRoutes() //配置路由 s := &http.Server{ //httpServer实例 Addr: addr, MaxHeaderBytes: 1 << 30, } log.Println("http.startHttpServer ok, listening", addr) log.Fatalln(s.ListenAndServe()) //监听与服务 } ## Proc统计API模块 func configProcHttpRoutes() { // counters http.HandleFunc("/proc/counters", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) { }) // judge.status, /proc/status/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/status/", func(w http.ResponseWriter, r *http.Request) { }) // collector.last.item, /proc/collect/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/collect/", func(w http.ResponseWriter, r *http.Request) { }) // config.mockcfg http.HandleFunc("/proc/config", func(w http.ResponseWriter, r *http.Request) { }) // config.mockcfg /proc/config/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/config/", func(w http.ResponseWriter, r *http.Request) { }) // config.hostgroup, /group/$grpname http.HandleFunc("/proc/group/", func(w http.ResponseWriter, r *http.Request) { urlParam := r.URL.Path[len("/proc/group/"):] RenderDataJson(w, service.GetHostsFromGroup(urlParam)) }) } ## Debug API func configDebugHttpRoutes() { http.HandleFunc("/debug/collector/collect", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/debug/config/sync", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/debug/sender/send", func(w http.ResponseWriter, r *http.Request) { }) }
思考与查证
- nodata config、collector 、judge各模块执行周期是多少?
- Judge主要判断Nodata主机的两点要素是?
扩展学习
- github.com/toolkits/cron 定时任务golang Cron库
- github.com/go-sql-driver/mysql Mysql连接客户端
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- React Hooks 源码解析(一):类组件、函数组件、纯组件
- 权限组件的源码分析
- 【React源码解读】- 组件的实现
- 深入剖析Vue源码 - 组件基础
- 深入剖析Vue源码 - 组件进阶
- 生命周期组件 Lifecycle 源码解析(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
SSA:用户搜索心理与行为分析
[美] 罗森菲尔德(Louis Rosenfeld) / 汤海、蔡复青 / 清华大学出版社 / 2014-4-1 / 59.00
何为站内搜索分析(SSA)?它如何帮助你挖掘用户搜索曰志,从中洞悉用户搜索心理和行为,从而有针对性地改善用户体验,提升网站价值?这些都可以从《SSA:用户搜索心理与行为分析》中找到答案。《SSA:用户搜索心理与行为分析》首先通过故事来说明SSA是如何使Vanguard集团起死回生的,简要介绍SSA并指导读者动手实践。其次,通过丰富的实例来介绍很多工具和方法,帮助读者着手分析用户查询数据,从中获得更......一起来看看 《SSA:用户搜索心理与行为分析》 这本书的介绍吧!