内容简介:总结:aggregator聚合器就是从falcon_portal.cluster表中取出用户在页面上配置的表达式,然后解析后,通过api拿到对应机器组的所有机器,通过api查询graph数据算出一个值重新打回transfer作为一个新的点。下面我们来看下代码:2.看下go cron.UpdateItems()
总结:aggregator聚合器就是从falcon_portal.cluster表中取出用户在页面上配置的表达式,然后解析后,通过api拿到对应机器组的所有机器,通过api查询graph数据算出一个值重新打回transfer作为一个新的点。
- 定时从db中拿出所有的聚合器配置放到一个map中
- 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
- 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
- workers这个map 通过 ticker跑cron 运行WorkerRun这个方法
- WorkerRun这个方法解析分子分母的配置
- 调用api 根据grp_id拿出所有机器列表
- 调用graph的last接口拿出所有endpoint的counter 的值然后进行计算
- 计算后重新打回 一个线程安全的双向链表队列
- 另外一个goroutine异步pop队列中的值发生给 transfer的http接口(不是给agent用的rpc接口)
- 机器量很多时获取机器列表和查询最新的值都是瓶颈
- 我在想如果直接在transfer中直接做数据的聚合速度上不存在瓶颈
下面我们来看下代码:
- main.go中核心的两个地方
//查询db 调api算值 push 到push的队列中 go cron.UpdateItems() //从push队列push到transfer sender.StartSender()
2.看下go cron.UpdateItems()
func updateItems() { //从db中查询出结果 items, err := db.ReadClusterMonitorItems() if err != nil { return } //对比key(id+uptime),将已经变更的项删除 deleteNoUseWorker(items) //启动新的worker createWorkerIfNeed(items) } //看下这个读db的func func ReadClusterMonitorItems() (M map[string]*g.Cluster, err error){ ...... /*看到这个funcreturn的是个map key是 每个聚合项的id和他更新时间的字符串 value 就是Cluster结构体指针 type Cluster struct { Id int64 GroupId int64 Numerator string Denominator string Endpoint string Metric string Tags string DsType string Step int LastUpdate time.Time } */ M[fmt.Sprintf("%d%v", c.Id, c.LastUpdate)] = &c return M, err }
3.看下 deleteNoUseWorker 和createWorkerIfNeed 这两个func都是围绕 Worker这个struct的进行增删
func deleteNoUseWorker(m map[string]*g.Cluster) { del := []string{} for key, worker := range Workers { //遍历已经创建的work,如果key在新的map中没有了说明这条记录在db中被更改或删除了 //所以删掉它 给Workers这个map缩容 if _, ok := m[key]; !ok { //将worker 中的Quit chan关闭 会调用ticker.stop 真正关闭 worker.Drop() del = append(del, key) } } for _, key := range del { delete(Workers, key) } } func createWorkerIfNeed(m map[string]*g.Cluster) { for key, item := range m { if _, ok := Workers[key]; !ok { //如果配置中step小于0 丢弃这条 if item.Step <= 0 { log.Println("[W] invalid cluster(step <= 0):", item) continue } //初始化worker worker := NewWorker(item) Workers[key] = worker worker.Start() } } }
4. 看下Worker这个结构体包含三个域
- ticker作为一个计时器实现类似cron的功能每隔一段时间执行一次Start 中的func
- ClusterItem作为每个聚合器的配置
- Quit是一个chan用来外部关闭 key在新的map中没有了说明这条记录在db中被更改或删除了
type Worker struct { Ticker *time.Ticker ClusterItem *g.Cluster Quit chan struct{} } func NewWorker(ci *g.Cluster) Worker { w := Worker{} w.Ticker = time.NewTicker(time.Duration(ci.Step) * time.Second) w.Quit = make(chan struct{}) w.ClusterItem = ci return w } func (this Worker) Start() { go func() { for { select { case <-this.Ticker.C: WorkerRun(this.ClusterItem) case <-this.Quit: if g.Config().Debug { log.Println("[I] drop worker", this.ClusterItem) } this.Ticker.Stop() return } } }() } func (this Worker) Drop() { close(this.Quit) } var Workers = make(map[string]Worker)
到这里我们已经看明白聚合器的流程了:
- 定时从db中拿出所有的聚合器配置放到一个map中
- 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
- 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
- workers这个map 通过 ticker跑cron 运行WorkerRun这个方法
5.下面看下最重要的方法 WorkerRun
func WorkerRun(item *g.Cluster) { debug := g.Config().Debug /* Numerator代表分子 例如 $(cpu.user)+$(cpu.system) 代表求cpu.user和cpu.system的和 Denominator代表分母 例如 $# 代表所有机器 */ //cleanParam去除\r等字符 numeratorStr := cleanParam(item.Numerator) denominatorStr := cleanParam(item.Denominator) //判断分子分母是否合法 if !expressionValid(numeratorStr) || !expressionValid(denominatorStr) { log.Println("[W] invalid numerator or denominator", item) return } //判断分子分母是否需要计算 needComputeNumerator := needCompute(numeratorStr) needComputeDenominator := needCompute(denominatorStr) //如果分子分母都不需要计算就不需要用到聚合器了 if !needComputeNumerator && !needComputeDenominator { log.Println("[W] no need compute", item) return } //比如分子是这样的: "($(cpu.busy)+$(cpu.idle)-$(cpu.nice))>80" //那么parse的返回值为 [cpu.busy cpu.idle cpu.nice] [+ -] >80 numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator) denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator) if !operatorsValid(numeratorOperators) || !operatorsValid(denominatorOperators) { log.Println("[W] operators invalid", item) return } /*add retry for gethostname bygid 这里源码是动过sdk根据group_id查找组里面机器列表 这里我进行了两点优化: 1.sdk调用时没有加重试,http失败导致这次没有get到机器所以这个点就不算了导致断点 2.原来的接口在机器量超过1k时就效率就会很慢 2w+机器需要8s,看了代码是用orm进行了多次查询而且附带了很多别的信息 这里我只需要group_id对应endpoint_list所以我写了一个新的接口用一条raw_sql进行查询 测试2w+的机器0.2s就能返回 */ retry_limit :=3 r_s :=0 var hostnames []string for r_s <retry_limit{ hostnames_tmp, err_tmp := sdk.HostnamesByID(item.GroupId) if err_tmp != nil { log.Println("[E] get hostlist err",err_tmp) r_s+=1 time.Sleep(time.Second) }else{ hostnames = hostnames_tmp break } } //没有机器当然不用算了 if len(hostnames)==0{ log.Println("[E] get 0 record hostname item:",item) return } now := time.Now().Unix() /*这里是调用graph/lastpoint这个api 查询最近一个点的数据 1.机器是上面查到的主机列表 2.counter这里做了合并 把所有要查的metirc都放在一个请求里面查询了 3.查询的时候在api那边做了for循环 逐个item查询 估计这里也会拖慢速度 4.查完之后计算下值推到发送队列 */ valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now) if err != nil { log.Println("[E] get queryCounterLast", err, item) return } .......... sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step)) }
6.最后看下发送的代码
- MetaDataQueue是个线程安全的双向链表
- 上面说的WorkerRun方法中会将转化好的监控项数据PushFront入链表
- startSender这个goroutine 每200毫秒会将队列中的数据取出发送到transfer的http接口
func Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) { md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...) MetaDataQueue.PushFront(md) } const LIMIT = 200 var MetaDataQueue = NewSafeLinkedList() var PostPushUrl string var Debug bool func StartSender() { go startSender() } func startSender() { for { L := MetaDataQueue.PopBack(LIMIT) if len(L) == 0 { time.Sleep(time.Millisecond * 200) continue } err := PostPush(L) if err != nil { log.Println("[E] push to transfer fail", err) } } }
欢迎关注我们的微信公众号,每天学习 Go 知识
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 爱分析《数据智能行业报告》发布,解析集奥聚合缘何在政务场景快速落地
- 监控聚合器系列之: open-falcon新聚合器polymetric
- elasticsearch学习笔记(七)——快速入门案例实战之电商网站商品管理:嵌套聚合,下钻分析,聚合分析
- mongodb高级聚合查询
- MongoDB聚合(aggregate)
- mongodb 聚合管道
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
React Native:用JavaScript开发移动应用
【美】Truong Hoang Dung(张皇容) / 奇舞团 / 电子工业出版社 / 2015-9 / 65.00
React Native是当前移动端开发中的优秀解决方案。《React Native:用JavaScript开发移动应用》围绕着如何将一个完整App提交到App Store,讲解了使用React Native开发iOS应用所涉及的方方面面。首先介绍了Flexbox布局,教大家从零开始搭建一个初始应用,以此阐明React Native的基础运行机理;然后介绍了Flux的设计思想,怎么理解和使用Pro......一起来看看 《React Native:用JavaScript开发移动应用》 这本书的介绍吧!