当前位置: 首页 > news >正文

29.6 时序统计的结构体对象和metrics结果打点方法

本节重点介绍 :

  • 时序统计的结构体对象
  • 时序统计结构体的管理者
  • metrics结果打点方法

时序统计的结构体对象

  • 位置 counter\counter.go
//统计的实体
type PointCounter struct {sync.RWMutexCount           int64   // 日志条数计数Sum             float64 // 正则数字的sumMax             float64 // 正则数字的maxMin             float64 // 正则数字的minTs              int64   // 最近更新的时间戳LogFunc         string  // 计算方法MetricsName     string  //metrics名字 SortLabelString string  // 标签排序的结果LabelMap        map[string]string
}func NewPointCounter(metricsName, sortLabelString, logFunc string, labelMap map[string]string) *PointCounter {pc := &PointCounter{MetricsName:     metricsName,SortLabelString: sortLabelString,LabelMap:        labelMap,LogFunc:         logFunc,}return pc}

计算方法

func (pc *PointCounter) Update(value float64) {//logger.Infof("[start.Update][pc:%+v]", pc)pc.Lock()defer pc.Unlock()pc.Sum = pc.Sum + valueif math.IsNaN(pc.Max) || value > pc.Max {pc.Max = value}if math.IsNaN(pc.Min) || value < pc.Min {pc.Min = value}pc.Count += 1pc.Ts = time.Now().Unix()
}

时序统计结构体的管理者

type PointCounterManager struct {sync.RWMutexTagstringMap map[string]*PointCounterCounterQueue chan *consumer.AnalysPointMetricsMap map[string]*prometheus.GaugeVec
}

初始化方法

  • 传入metrics map 和分析结果的chan
func NewPointCounterManager(cq chan *consumer.AnalysPoint, m map[string]*prometheus.GaugeVec) *PointCounterManager {pm := &PointCounterManager{TagstringMap: make(map[string]*PointCounter),CounterQueue: cq,//QuitC:        make(chan struct{}, 1),MetricsMap: m,}return pm
}

更新和获取统计实体的方法

func (pm *PointCounterManager) GetPcByUniqueName(seriesId string) *PointCounter {pm.RLock()defer pm.RUnlock()return pm.TagstringMap[seriesId]}func (pm *PointCounterManager) SetPc(seriesId string, pc *PointCounter) {pm.Lock()defer pm.Unlock()pm.TagstringMap[seriesId] = pc}

更新的manager方法

  • 通过分析chan接收 分析的结果
  • 根据metric名字+有序标签字符串作为key 获取统计的实体对象
  • 如果没有就新建一个
  • 然后调用update进行计算
func (pm *PointCounterManager) UpdateManager(ctx context.Context) error {for {select {case <-ctx.Done():logger.Infof("PointCounterManager.UpdateManager.receive_quit_signal_and_quit")return nilcase ap := <-pm.CounterQueue://logger.Infof("[receive_ap_from_pm.CounterQueue][ap:%+v]", ap)pc := pm.GetPcByUniqueName(ap.MetricsName + ap.SortLabelString)if pc == nil {pc = NewPointCounter(ap.MetricsName, ap.SortLabelString, ap.LogFunc, ap.LabelMap)pm.SetPc(ap.MetricsName+ap.SortLabelString, pc)}pc.Update(ap.Value)//case <-pm.QuitC://	return nil}}}

metrics结果打点方法

  • 遍历metrics map,获取metrics对象和它对应的统计实体
  • 根据统计的方法,调用统计实体的字段进行打点
func (pm *PointCounterManager) SetMetrics() {pm.RLock()defer pm.RUnlock()for _, pc := range pm.TagstringMap {metric, loaded := pm.MetricsMap[pc.MetricsName]if !loaded {logger.Errorf("metrics not found in map metric_name:%v", pc.MetricsName)continue}logger.Debugf("[metrics_set][pc:%+v]", pc)var value float64switch pc.LogFunc {case common.LogFuncCnt:value = float64(pc.Count)case common.LogFuncSum:value = float64(pc.Sum)case common.LogFuncMax:value = float64(pc.Max)case common.LogFuncMin:value = float64(pc.Min)case common.LogFuncAvg:value = float64(pc.Sum) / float64(pc.Count)}metric.With(prometheus.Labels(pc.LabelMap)).Set(value)}}

打点的manager

func (pm *PointCounterManager) SetMetricsManager(ctx context.Context) error {ticker := time.NewTicker(10 * time.Second)defer ticker.Stop()for {select {case <-ctx.Done():logger.Infof("SetMetricsManager.receive_quit_signal_and_quit")//close(pm.QuitC)return nilcase <-ticker.C:logger.Debug("SetMetricsManager.SetMetrics.run")pm.SetMetrics()}}
}

main.go中启动这些manager

先初始化对应的对象

// 统计指标的同步queuecq := make(chan *consumer.AnalysPoint, common.CounterQueueSize)// 统计指标的管理器pm := counter.NewPointCounterManager(cq, metricsMap)// 日志job管理器lm := logjob.NewLogJobManager(cq)ctxAll, cancelAll := context.WithCancel(context.Background())

oklog.run启动任务

var g run.Group{// Termination handler.term := make(chan os.Signal, 1)signal.Notify(term, os.Interrupt, syscall.SIGTERM)cancelC := make(chan struct{})g.Add(func() error {select {case <-term:/**/logger.Infof("Received SIGTERM, exiting gracefully...")cancelAll()return nilcase <-cancelC:/*1. 如果cancelC读到了数据,说明其他的goroutine出现了错误,通知接收signal的本goroutine退出*/logger.Infof("other go error server finally exit...")return nil}},func(err error) {close(cancelC)},)}{// metrics web handler.g.Add(func() error {logger.Infof("start web service Listening on address :%v", sConfig.HttpAddr)errchan := make(chan error)go func() {errchan <- metrics.StartMetricWeb(sConfig.HttpAddr)}()select {case err := <-errchan:logger.Errorf("msg", "Error starting HTTP server.error:%v ", err)return errcase <-ctxAll.Done():logger.Infof("Web service Exit..")return nil}}, func(err error) {cancelAll()})}{// 统计metrics的模块g.Add(func() error {err := pm.UpdateManager(ctxAll)if err != nil {logger.Errorf("PointCounterManager.SetMetricsManager.error:%v", err)}return err}, func(err error) {cancelAll()})}{// 统计metrics的模块g.Add(func() error {err := pm.SetMetricsManager(ctxAll)if err != nil {logger.Errorf("PointCounterManager.SetMetricsManager.error:%v", err)}return err}, func(err error) {cancelAll()})}{// LogJobManager 同步的模块g.Add(func() error {err := lm.SyncManager(ctxAll, logjobSyncChan)if err != nil {logger.Errorf("PointCounterManager.SetMetricsManager.error:%v", err)}return err}, func(err error) {cancelAll()})}g.Run()

启动metrics的http

  • 因为srv.ListenAndServe方法不便于使用ctx控制,所以通过一个errChan接收它的错误
	{// metrics web handler.g.Add(func() error {logger.Infof("start web service Listening on address :%v", sConfig.HttpAddr)errchan := make(chan error)go func() {errchan <- metrics.StartMetricWeb(sConfig.HttpAddr)}()select {case err := <-errchan:logger.Errorf("msg", "Error starting HTTP server.error:%v ", err)return errcase <-ctxAll.Done():logger.Infof("Web service Exit..")return nil}}, func(err error) {cancelAll()})}

本节重点总结 :

  • 时序统计的结构体对象
  • 时序统计结构体的管理者
  • metrics结果打点方法

http://www.mrgr.cn/news/67738.html

相关文章:

  • 城镇住房保障:SpringBoot系统优化技巧
  • 数据结构-IndexTree结构解析(一)
  • Tomcat(1) 什么是Tomcat?
  • Python并发编程——multiprocessing
  • 前端跨域问题全解:JSONP、CORS 与代理服务器
  • 看看你的电脑可以跑 AI 模型吗?
  • 禅道与Jira与Ones对比:哪个更适合你的项目管理需求?
  • 在数据库设计中,如何避免全表扫描?
  • 项目:使用LNMP搭建私有云存储
  • C语言复习第7章 自定义类型(结构体+位段+枚举+联合体)
  • 还有人不会设置微信自动回复?
  • YOLOv8相较于YOLOv5有哪些改进?
  • ubuntu【桌面】 配置NAT模式固定IP
  • 【NLP自然语言处理】深入解析Encoder与Decoder模块:结构、作用与深度学习应用
  • faiss里面SQ量化4bit是啥意思?具体举例并解释
  • 符号回归概念
  • 黑马redis原理篇 数据结构 set
  • [蓝桥杯算法从小白到大牛]动态规划第二讲:三步问题
  • 如何使用 C# 编写一个修改文件时间属性的小工具?
  • Java 实训 十四天 IO流
  • 对称二叉树(力扣101)
  • 国标GB28181
  • 一台电脑如何同时多开多 IP 浏览器多登账号?
  • git中的gitignore文件
  • 大模型-微调与对齐-人类对齐背景与标准
  • 【Linux】冯诺依曼体系、再谈操作系统