本节重点介绍 :
时序统计的结构体对象 时序统计结构体的管理者 metrics结果打点方法
时序统计的结构体对象
type PointCounter struct { sync. RWMutexCount int64 Sum float64 Max float64 Min float64 Ts int64 LogFunc string MetricsName string SortLabelString string LabelMap map [ string ] string
} func NewPointCounter ( metricsName, sortLabelString, logFunc string , labelMap map [ string ] string ) * PointCounter { pc := & PointCounter{ MetricsName: metricsName, SortLabelString: sortLabelString, LabelMap: labelMap, LogFunc: logFunc, } return pc}
计算方法
func ( pc * PointCounter) Update ( value float64 ) { pc. Lock ( ) defer pc. Unlock ( ) pc. Sum = pc. Sum + valueif math. IsNaN ( pc. Max) || value > pc. Max { pc. Max = value} if math. IsNaN ( pc. Min) || value < pc. Min { pc. Min = value} pc. Count += 1 pc. Ts = time. Now ( ) . Unix ( )
}
时序统计结构体的管理者
type PointCounterManager struct { sync. RWMutexTagstringMap map [ string ] * PointCounterCounterQueue chan * consumer. AnalysPointMetricsMap map [ string ] * prometheus. GaugeVec
}
初始化方法
func NewPointCounterManager ( cq chan * consumer. AnalysPoint, m map [ string ] * prometheus. GaugeVec) * PointCounterManager { pm := & PointCounterManager{ TagstringMap: make ( map [ string ] * PointCounter) , CounterQueue: cq, MetricsMap: m, } return pm
}
更新和获取统计实体的方法
func ( pm * PointCounterManager) GetPcByUniqueName ( seriesId string ) * PointCounter { pm. RLock ( ) defer pm. RUnlock ( ) return pm. TagstringMap[ seriesId] } func ( pm * PointCounterManager) SetPc ( seriesId string , pc * PointCounter) { pm. Lock ( ) defer pm. Unlock ( ) pm. TagstringMap[ seriesId] = pc}
更新的manager方法
通过分析chan接收 分析的结果 根据metric名字+有序标签字符串作为key 获取统计的实体对象 如果没有就新建一个 然后调用update进行计算
func ( pm * PointCounterManager) UpdateManager ( ctx context. Context) error { for { select { case <- ctx. Done ( ) : logger. Infof ( "PointCounterManager.UpdateManager.receive_quit_signal_and_quit" ) return nil case ap := <- pm. CounterQueue: pc := pm. GetPcByUniqueName ( ap. MetricsName + ap. SortLabelString) if pc == nil { pc = NewPointCounter ( ap. MetricsName, ap. SortLabelString, ap. LogFunc, ap. LabelMap) pm. SetPc ( ap. MetricsName+ ap. SortLabelString, pc) } pc. Update ( ap. Value) } } }
metrics结果打点方法
遍历metrics map,获取metrics对象和它对应的统计实体 根据统计的方法,调用统计实体的字段进行打点
func ( pm * PointCounterManager) SetMetrics ( ) { pm. RLock ( ) defer pm. RUnlock ( ) for _ , pc := range pm. TagstringMap { metric, loaded := pm. MetricsMap[ pc. MetricsName] if ! loaded { logger. Errorf ( "metrics not found in map metric_name:%v" , pc. MetricsName) continue } logger. Debugf ( "[metrics_set][pc:%+v]" , pc) var value float64 switch pc. LogFunc { case common. LogFuncCnt: value = float64 ( pc. Count) case common. LogFuncSum: value = float64 ( pc. Sum) case common. LogFuncMax: value = float64 ( pc. Max) case common. LogFuncMin: value = float64 ( pc. Min) case common. LogFuncAvg: value = float64 ( pc. Sum) / float64 ( pc. Count) } metric. With ( prometheus. Labels ( pc. LabelMap) ) . Set ( value) } }
打点的manager
func ( pm * PointCounterManager) SetMetricsManager ( ctx context. Context) error { ticker := time. NewTicker ( 10 * time. Second) defer ticker. Stop ( ) for { select { case <- ctx. Done ( ) : logger. Infof ( "SetMetricsManager.receive_quit_signal_and_quit" ) return nil case <- ticker. C: logger. Debug ( "SetMetricsManager.SetMetrics.run" ) pm. SetMetrics ( ) } }
}
main.go中启动这些manager
先初始化对应的对象
cq := make ( chan * consumer. AnalysPoint, common. CounterQueueSize) pm := counter. NewPointCounterManager ( cq, metricsMap) lm := logjob. NewLogJobManager ( cq) ctxAll, cancelAll := context. WithCancel ( context. Background ( ) )
oklog.run启动任务
var g run. Group{ term := make ( chan os. Signal, 1 ) signal. Notify ( term, os. Interrupt, syscall. SIGTERM) cancelC := make ( chan struct { } ) g. Add ( func ( ) error { select { case <- term: logger. Infof ( "Received SIGTERM, exiting gracefully..." ) cancelAll ( ) return nil case <- cancelC: logger. Infof ( "other go error server finally exit..." ) return nil } } , func ( err error ) { close ( cancelC) } , ) } { g. Add ( func ( ) error { logger. Infof ( "start web service Listening on address :%v" , sConfig. HttpAddr) errchan := make ( chan error ) go func ( ) { errchan <- metrics. StartMetricWeb ( sConfig. HttpAddr) } ( ) select { case err := <- errchan: logger. Errorf ( "msg" , "Error starting HTTP server.error:%v " , err) return errcase <- ctxAll. Done ( ) : logger. Infof ( "Web service Exit.." ) return nil } } , func ( err error ) { cancelAll ( ) } ) } { g. Add ( func ( ) error { err := pm. UpdateManager ( ctxAll) if err != nil { logger. Errorf ( "PointCounterManager.SetMetricsManager.error:%v" , err) } return err} , func ( err error ) { cancelAll ( ) } ) } { g. Add ( func ( ) error { err := pm. SetMetricsManager ( ctxAll) if err != nil { logger. Errorf ( "PointCounterManager.SetMetricsManager.error:%v" , err) } return err} , func ( err error ) { cancelAll ( ) } ) } { g. Add ( func ( ) error { err := lm. SyncManager ( ctxAll, logjobSyncChan) if err != nil { logger. Errorf ( "PointCounterManager.SetMetricsManager.error:%v" , err) } return err} , func ( err error ) { cancelAll ( ) } ) } g. Run ( )
启动metrics的http
因为srv.ListenAndServe方法不便于使用ctx控制,所以通过一个errChan接收它的错误
{ g. Add ( func ( ) error { logger. Infof ( "start web service Listening on address :%v" , sConfig. HttpAddr) errchan := make ( chan error ) go func ( ) { errchan <- metrics. StartMetricWeb ( sConfig. HttpAddr) } ( ) select { case err := <- errchan: logger. Errorf ( "msg" , "Error starting HTTP server.error:%v " , err) return errcase <- ctxAll. Done ( ) : logger. Infof ( "Web service Exit.." ) return nil } } , func ( err error ) { cancelAll ( ) } ) }
本节重点总结 :
时序统计的结构体对象 时序统计结构体的管理者 metrics结果打点方法