内容简介:Nodata版本VERSION = "0.0.11"Nodata组件功能
Nodata版本
VERSION = "0.0.11"
Nodata组件功能
nodata用于检测监控数据的上报异常。nodata和实时报警judge模块协同工作,过程为: 配置了nodata的采集项超时未上报数据,nodata生成一条默认的模拟数据;用户配置相应的报警策略,收到mock数据就产生报警。采集项上报异常检测,作为judge模块的一个必要补充,能够使judge的实时报警功能更加可靠、完善。【官方描述】
Nodata组件逻辑图
系统流图
官方系统流图
模块结构
官方模块结构图
main入口分析
func main() { //命令参数解析 cfg := flag.String("c", "cfg.json", "configuration file") version := flag.Bool("v", false, "show version") versionGit := flag.Bool("vg", false, "show version") flag.Parse() //版本输出 if *version { fmt.Println(g.VERSION) os.Exit(0) } //gitcommit序列号输出 if *versionGit { fmt.Println(g.VERSION, g.COMMIT) os.Exit(0) } // 全局配置格式化 g.ParseConfig(*cfg) // 统计 g.StartProc() // 缓存Nodata配置 config.Start() // 【参考详细分析】 // 缓存Nodata配置主机的采集数据点 collector.Start() // 【参考详细分析】 // judge策略判断 judge.Start() // 【参考详细分析】 // http API服务 http.Start() // 【参考详细分析】 select {} }
config.Start() 从DB加载Nodata配置(dashboard配置nodata策略写入mysql)
# 加载nodata配置主函数 func Start() { if !g.Config().Config.Enabled { log.Println("config.Start warning, not enabled") return } service.InitDB() //初始化DB StartNdConfigCron() //加载nodata配置缓存至内存 log.Println("config.Start ok") } ## 初始化DB连接 func InitDB() { _, err := GetDbConn(dbBaseConnName) //"db.base"连接conn初始化并保存至内存Map if err != nil { log.Fatalln("config.InitDB error", err) return // never go here } log.Println("config.InitDB ok") } ### GetDbConn实现函数 ### makeDbConn函数实现 sql 客户端连接dbconn ### 内存map dbConnMap[connName]保存dbconn func GetDbConn(connName string) (c *sql.DB, e error) { dbLock.Lock() defer dbLock.Unlock() var err error var dbConn *sql.DB dbConn = dbConnMap[connName] if dbConn == nil { dbConn, err = makeDbConn() //创建sql客户端连接 if err != nil { closeDbConn(dbConn) return nil, err } dbConnMap[connName] = dbConn } err = dbConn.Ping() //dbconn检测,conn.Ping() if err != nil { closeDbConn(dbConn) //dbconn关闭,conn.Close() delete(dbConnMap, connName) return nil, err } return dbConn, err } func makeDbConn() (conn *sql.DB, err error) { conn, err = sql.Open("mysql", g.Config().Config.Dsn) if err != nil { return nil, err } conn.SetMaxIdleConns(int(g.Config().Config.MaxIdle)) err = conn.Ping() return conn, err } ## func StartNdConfigCron() { ndconfigCron.AddFuncCC(ndconfigCronSpec, func() { start := time.Now().Unix() cnt, _ := syncNdConfig() // end := time.Now().Unix() if g.Config().Debug { log.Printf("config cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start)) } // 统计 g.ConfigCronCnt.Incr() g.ConfigLastTs.SetCnt(end - start) g.ConfigLastCnt.SetCnt(int64(cnt)) }, 1) ndconfigCron.Start() // } #### 获取nodata配置、重新格式化及缓存配置全局公开Map(NdConfigMap) func syncNdConfig() (cnt int, errt error) { // 获取nodata配置函数调用 configs := service.GetMockCfgFromDB() // 重新格式化配置NodateConfig结构 nm := nmap.NewSafeMap() for _, ndc := range configs { endpoint := ndc.Endpoint metric := ndc.Metric tags := ndc.Tags if endpoint == "" { log.Printf("bad config: %+v\n", ndc) continue } pk := cutils.PK(endpoint, metric, tags) nm.Put(pk, ndc) } // 缓存map SetNdConfigMap(nm) return nm.Size(), nil //返回map长度 } ##### 底层获取nodata配置实现函数 func GetMockCfgFromDB() map[string]*cmodel.NodataConfig { ret := make(map[string]*cmodel.NodataConfig) dbConn, err := GetDbConn("nodata.mockcfg") //获取dbConn连接 if err != nil { log.Println("db.get_conn error, mockcfg", err) return ret } q := fmt.Sprintf("SELECT id,name,obj,obj_type,metric,tags,dstype,step,mock FROM mockcfg") rows, err := dbConn.Query(q) //执行mockcfg表查询语句 if err != nil { log.Println("db.query error, mockcfg", err) return ret } defer rows.Close() for rows.Next() { //迭代查询结果集 t := MockCfg{} tags := "" err := rows.Scan(&t.Id, &t.Name, &t.Obj, &t.ObjType, &t.Metric, &tags, &t.Type, &t.Step, &t.Mock) if err != nil { log.Println("db.scan error, mockcfg", err) continue } t.Tags = cutils.DictedTagstring(tags) //"tagskey=value"格式化map[Key]Value err = checkMockCfg(&t) //检测配置是否有效 if err != nil { log.Println("check mockcfg, error:", err) continue } endpoints := getEndpoint(t.ObjType, t.Obj) //获取endpoint列表(hosts slice),后面有objtype为"host/group/other"处理函数分析。 if len(endpoints) < 1 { continue } for _, ep := range endpoints { uuid := cutils.PK(ep, t.Metric, t.Tags) //UUID format 'endpoint/metric/k=v,k=v(tags)' ncfg := cmodel.NewNodataConfig(t.Id, t.Name, t.ObjType, ep, t.Metric, t.Tags, t.Type, t.Step, t.Mock) val, found := ret[uuid] if !found { //如果不存在则新建 ret[uuid] = ncfg continue } //如果存在,判断配置类型 if isSpuerNodataCfg(val, ncfg) { // val is spuer than ncfg, so drop ncfg log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, val.Name, ncfg.Name) } else { ret[uuid] = ncfg //如果原val配置类型为group,而新ncfg配置类型为host则覆盖原有配置 log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, ncfg.Name, val.Name) } } } return ret //返回map[UUID]*cmodel.NodataConfig } ###### 根据objType获取Hosts slice func getEndpoint(objType string, obj string) []string { switch objType { case "host": //类型Host与处理函数 return getEndpointFromHosts(obj) case "group": //类型group与处理函数 return getEndpointFromGroups(obj) case "other": //类型other与处理函数 return getEndpointFromOther(obj) default: return make([]string, 0) } } //类型Host与处理函数 func getEndpointFromHosts(hosts string) []string { ret := make([]string, 0) hlist := strings.Split(hosts, "\n") //分隔处理 for _, host := range hlist { nh := strings.TrimSpace(host) if nh != "" { ret = append(ret, nh) } } return ret } //类型group与处理函数 func getEndpointFromGroups(grps string) []string { grplist := strings.Split(grps, "\n") // get host map, avoid duplicating hosts := make(map[string]string) for _, grp := range grplist { ngrp := strings.TrimSpace(grp) if len(ngrp) < 1 { continue } hostmap := GetHostsFromGroup(grp) //根据Group名称获取主机MAP for hostname := range hostmap { if hostname != "" { hosts[hostname] = hostname } } } // get host slice ret := make([]string, 0) for key := range hosts { ret = append(ret, key) } return ret } //类型other与处理函数,同类型Host与处理函数 func getEndpointFromOther(other string) []string { return getEndpointFromHosts(other) }
collector.Start() 缓存Nodata配置主机的采集数据点
# 运行收集nodata数据主函数 func Start() { if !g.Config().Collector.Enabled { log.Println("collector.Start warning, not enabled") return } StartCollectorCron() //周期任务,收集nodata数据 log.Println("collector.Start ok") } ## 定时任务执行,收集函数调用与任务运行 func StartCollectorCron() { collectorCron.AddFuncCC("*/20 * * * * ?", func() { start := time.Now().Unix() cnt := collectDataOnce() //收集函数调用 end := time.Now().Unix() if g.Config().Debug { log.Printf("collect cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start)) } //统计 g.CollectorCronCnt.Incr() g.CollectorLastTs.SetCnt(end - start) g.CollectorLastCnt.SetCnt(int64(cnt)) g.CollectorCnt.IncrBy(int64(cnt)) }, 1) collectorCron.Start() } ### 收集功能实现函数 func collectDataOnce() int { keys := config.Keys() keysLen := len(keys) // 并发+同步控制 cfg := g.Config().Collector //collector全局配置 concurrent := int(cfg.Concurrent) //并发数 if concurrent < 1 || concurrent > 50 { concurrent = 10 } sema := tsema.NewSemaphore(concurrent) //创建并发同步 batch := int(cfg.Batch) //全局批量处理数 if batch < 100 || batch > 1000 { batch = 200 //batch不能太小, 否则channel将会很大 } //根据nodata配置数长度和批量处理数创建channel长度 batchCnt := (keysLen + batch - 1) / batch rch := make(chan int, batchCnt+1) i := 0 for i < keysLen { leftLen := keysLen - i fetchSize := batch // 每次处理batch个配置 if leftLen < fetchSize { fetchSize = leftLen } fetchKeys := keys[i : i+fetchSize] // 并发collect数据 sema.Acquire() //线程批量处理(fetchKeys, fetchSize) go func(keys []string, keySize int) { defer sema.Release() size, err := fetchItemsAndStore(keys, keySize)//批查获取函数调用 if err != nil { log.Printf("fetchItemAndStore fail, size:%v, error:%v", size, err) } if g.Config().Debug { log.Printf("fetchItemAndStore keys:%v, key_size:%v, ret_size:%v", keys, keySize, size) } rch <- size }(fetchKeys, fetchSize) i += fetchSize } collectCnt := 0 //计数 for i := 0; i < batchCnt; i++ { select { case cnt := <-rch: collectCnt += cnt } } return collectCnt } #### 获取数据实现函数 func fetchItemsAndStore(fetchKeys []string, fetchSize int) (size int, errt error) { if fetchSize < 1 { return } // form request args args := make([]*cmodel.GraphLastParam, 0) for _, key := range fetchKeys { ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置 if !found { continue } endpoint := ndcfg.Endpoint //endpoint主机对象 counter := cutils.Counter(ndcfg.Metric, ndcfg.Tags) // 格式metric/tags(k=v,k=v) arg := &cmodel.GraphLastParam{endpoint, counter} //请求参数 args = append(args, arg) } if len(args) < 1 { return } resp, err := queryLastPoints(args)//API调用查询endpoint最近一次采集数据。(POST请求API组件api调用,查看后面函数分析) if err != nil { return 0, err } // store items fts := time.Now().Unix() //存储Items时间float time,Judge逻辑用到 for _, glr := range resp { //log.Printf("collect:%v\n", glr) if glr == nil || glr.Value == nil { continue } AddItem(cutils.PK2(glr.Endpoint, glr.Counter), NewDataItem(glr.Value.Timestamp, float64(glr.Value.Value), "OK", fts)) //缓存收集到的监控数据(ItemMap)。Value.Timestamp数据项时间戳,Value.Value数据项值,"OK"数据项状态,fts数据项存储时间。 } return len(resp), nil } ##### config.GetNdConfig 根据hostname返回NodataConfig配置 func GetNdConfig(key string) (*cmodel.NodataConfig, bool) { rwlock.RLock() defer rwlock.RUnlock() val, found := NdConfigMap.Get(key)//map操作 if found && val != nil { return val.(*cmodel.NodataConfig), true } return &cmodel.NodataConfig{}, false } ##### API组件POST请求实现函数 func queryLastPoints(param []*cmodel.GraphLastParam) (resp []*cmodel.GraphLastResp, err error) { cfg := g.Config() uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", cfg.PlusApi.Addr) //接口定义 var req *httplib.BeegoHttpRequest headers := map[string]string{"Content-type": "application/json"} req, err = requests.CurlPlus(uri, "POST", "nodata", cfg.PlusApi.Token, headers, map[string]string{}) //Curl请求头与方法 if err != nil { return } req.SetTimeout(time.Duration(cfg.PlusApi.ConnectTimeout)*time.Millisecond, time.Duration(cfg.PlusApi.RequestTimeout)*time.Millisecond) b, err := json.Marshal(param) if err != nil { return } req.Body(b) //请求体 err = req.ToJson(&resp) //请求执行与response json if err != nil { return } return resp, nil } ##### 缓存数据点 func AddItem(key string, val *DataItem) { listv, found := ItemMap.Get(key) if !found { ll := tlist.NewSafeListLimited(3) //每个采集指标,缓存最新的3个数据点 ll.PushFrontViolently(val) ItemMap.Put(key, ll) return } listv.(*tlist.SafeListLimited).PushFrontViolently(val) } func NewDataItem(ts int64, val float64, fstatus string, fts int64) *DataItem { return &DataItem{Ts: ts, Value: val, FStatus: fstatus, FTs: fts} } type GraphLastResp struct { Endpoint string `json:"endpoint"` Counter string `json:"counter"` Value *RRDData `json:"value"` }
judge.Start()
# Judge运行入口函数 func Start() { StartJudgeCron() //运行调用 log.Println("judge.Start ok") } # 定时任务Judge执行函数 func StartJudgeCron() { judgeCron.AddFuncCC(judgeCronSpec, func() { start := time.Now().Unix() judge() //执行judge函数调用 end := time.Now().Unix() if g.Config().Debug { log.Printf("judge cron, time %ds, start %s\n", end-start, ttime.FormatTs(start)) } // 统计 g.JudgeCronCnt.Incr() g.JudgeLastTs.SetCnt(end - start) // 触发Mock列表数据发送 sender.SendMockOnceAsync() }, 1) judgeCron.Start() //执行Cron } ## judge实现函数 func judge() { now := time.Now().Unix() keys := config.Keys() for _, key := range keys { ndcfg, found := config.GetNdConfig(key) //根据hostname返回nodata配置 if !found { //策略不存在,不处理 continue } step := ndcfg.Step mock := ndcfg.Mock item, found := collector.GetFirstItem(key) //根据hostname返回collector最近一次配置 if !found { //没有数据,未开始采集,不处理 continue } //3*step(or 180)超时时间 lastTs := now - getTimeout(step) if item.FStatus != "OK" || item.FTs < lastTs { //数据采集失败,不处理 continue } //采集到的数据为mock数据,则认为上报超时了 if fCompare(mock, item.Value) == 0 { //判断采集到的最后一次数据项值timestamp+step(上报周期) //如果小于当前时间则认为已经上报超时了,过了应该上报的周期 //如果大于当前时间则表示还在有效的上报周期内 if LastTs(key)+step <= now { TurnNodata(key, now) //设置nodata状态 genMock(genTs(now, step), key, ndcfg) //添加到Mock列表 } continue } //判断采集到的最后一次数据项值timestamp,如果小于 //3*step(or 180)超时时间则认为数据过期,则认为上报超时 if item.Ts < lastTs { if LastTs(key)+step <= now { TurnNodata(key, now) genMock(genTs(now, step), key, ndcfg) } continue } TurnOk(key, now) } } ### 返回最后一次采集数据的timestamp func LastTs(key string) int64 { statusLock.RLock() var ts int64 = 0 v, found := StatusMap.Get(key) if !found { statusLock.RUnlock() return ts } ts = v.(*NodataStatus).Ts statusLock.RUnlock() return ts } // NoData Data Item Struct type DataItem struct { Ts int64 //timestamp Value float64 FStatus string // OK|ERR FTs int64 //存储时间float } // Nodata Status Struct type NodataStatus struct { Key string Status string // OK|NODATA Cnt int Ts int64 } ### 设置为Nodata状态 func TurnNodata(key string, ts int64) { statusLock.Lock() v, found := StatusMap.Get(key) if !found { // create new status ns := NewNodataStatus(key, "NODATA", 1, ts) StatusMap.Put(key, ns) statusLock.Unlock() return } // update status ns := v.(*NodataStatus) ns.Status = "NODATA" ns.Cnt += 1 ns.Ts = ts statusLock.Unlock() return } ### 添加Item至Mock列表 func genMock(ts int64, key string, ndcfg *cmodel.NodataConfig) { sender.AddMock(key, ndcfg.Endpoint, ndcfg.Metric, cutils.SortedTags(ndcfg.Tags), ts, ndcfg.Type, ndcfg.Step, ndcfg.Mock) } func AddMock(key string, endpoint string, metric string, tags string, ts int64, dstype string, step int64, value interface{}) { item := &cmodel.JsonMetaData{metric, endpoint, ts, step, value, dstype, tags} MockMap.Put(key, item) //put into map }
sender.SendMockOnceAsync() 发送模拟数据
# 发送Mock数据入口函数 func SendMockOnceAsync() { go SendMockOnce() } ## 并发发送和统计 func SendMockOnce() int { if !sema.TryAcquire() { return -1 } defer sema.Release() // not enabled if !g.Config().Sender.Enabled { return 0 } start := time.Now().Unix() cnt, _ := sendMock() //调用发送功能模块 end := time.Now().Unix() if g.Config().Debug { log.Printf("sender cron, cnt %d, time %ds, start %s", cnt, end-start, ttime.FormatTs(start)) } // 统计 g.SenderCronCnt.Incr() g.SenderLastTs.SetCnt(end - start) g.SenderCnt.IncrBy(int64(cnt)) return cnt } ### 发送模拟数据 func sendMock() (cnt int, errt error) { cfg := g.Config().Sender //全局配置加载 batch := int(cfg.Batch) //批量值 connTimeout := cfg.ConnectTimeout //连接超时 requTimeout := cfg.RequestTimeout //API请求超时 // 发送至transfer组件 mocks := MockMap.Slice() //获取Mock列表 MockMap.Clear() //清空列表空间 mockSize := len(mocks) i := 0 for i < mockSize { leftLen := mockSize - i sendSize := batch if leftLen < sendSize { sendSize = leftLen } fetchMocks := mocks[i : i+sendSize] //取一批量 i += sendSize items := make([]*cmodel.JsonMetaData, 0) //整理为slice for _, val := range fetchMocks { if val == nil { continue } items = append(items, val.(*cmodel.JsonMetaData)) } cntonce, err := sendItemsToTransfer(items, len(items), "nodata.mock", time.Millisecond*time.Duration(connTimeout), time.Millisecond*time.Duration(requTimeout)) //API POT调用,发送Mock至Transfer if err == nil { if g.Config().Debug { log.Println("send items:", items) } cnt += cntonce } } return cnt, nil } // API接口调用发送Mock至Transfer func sendItemsToTransfer(items []*cmodel.JsonMetaData, size int, httpcliname string, connT, reqT time.Duration) (cnt int, errt error) { if size < 1 { return } cfg := g.Config() transUlr := fmt.Sprintf("http://%s/api/push", cfg.Sender.TransferAddr) //请求接口API hcli := thttpclient.GetHttpClient(httpcliname, connT, reqT) // 请求体 itemsBody, err := json.Marshal(items) if err != nil { log.Println(transUlr+", format body error,", err) errt = err return } // 构造与执行API req, err := http.NewRequest("POST", transUlr, bytes.NewBuffer(itemsBody)) req.Header.Set("Content-Type", "application/json; charset=UTF-8") //请求内容类型 req.Header.Set("Connection", "close") postResp, err := hcli.Do(req) //执行POST if err != nil { log.Println(transUlr+", post to dest error,", err) errt = err return } defer postResp.Body.Close() //响应状态200判断 if postResp.StatusCode/100 != 2 { log.Println(transUlr+", post to dest, bad response,", postResp.Body) errt = fmt.Errorf("request failed, %s", postResp.Body) return } return size, nil }
http.Start() http API服务
func Start() { go startHttpServer() } func configRoutes() { configCommonRoutes() //公共API路由,可参考Agent模块 configProcHttpRoutes() // configDebugHttpRoutes() //Debug API路由 } func startHttpServer() { if !g.Config().Http.Enabled { return } addr := g.Config().Http.Listen if addr == "" { return } configRoutes() //配置路由 s := &http.Server{ //httpServer实例 Addr: addr, MaxHeaderBytes: 1 << 30, } log.Println("http.startHttpServer ok, listening", addr) log.Fatalln(s.ListenAndServe()) //监听与服务 } ## Proc统计API模块 func configProcHttpRoutes() { // counters http.HandleFunc("/proc/counters", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) { }) // judge.status, /proc/status/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/status/", func(w http.ResponseWriter, r *http.Request) { }) // collector.last.item, /proc/collect/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/collect/", func(w http.ResponseWriter, r *http.Request) { }) // config.mockcfg http.HandleFunc("/proc/config", func(w http.ResponseWriter, r *http.Request) { }) // config.mockcfg /proc/config/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/config/", func(w http.ResponseWriter, r *http.Request) { }) // config.hostgroup, /group/$grpname http.HandleFunc("/proc/group/", func(w http.ResponseWriter, r *http.Request) { urlParam := r.URL.Path[len("/proc/group/"):] RenderDataJson(w, service.GetHostsFromGroup(urlParam)) }) } ## Debug API func configDebugHttpRoutes() { http.HandleFunc("/debug/collector/collect", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/debug/config/sync", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/debug/sender/send", func(w http.ResponseWriter, r *http.Request) { }) }
思考与查证
- nodata config、collector 、judge各模块执行周期是多少?
- Judge主要判断Nodata主机的两点要素是?
扩展学习
- github.com/toolkits/cron 定时任务golang Cron库
- github.com/go-sql-driver/mysql Mysql连接客户端
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- React Hooks 源码解析(一):类组件、函数组件、纯组件
- 权限组件的源码分析
- 【React源码解读】- 组件的实现
- 深入剖析Vue源码 - 组件基础
- 深入剖析Vue源码 - 组件进阶
- 生命周期组件 Lifecycle 源码解析(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Black Box Society
Frank Pasquale / Harvard University Press / 2015-1-5 / USD 35.00
Every day, corporations are connecting the dots about our personal behavior—silently scrutinizing clues left behind by our work habits and Internet use. The data compiled and portraits created are inc......一起来看看 《The Black Box Society》 这本书的介绍吧!