当前位置: 首页 > news >正文

29.5 日志消费组和日志正则处理对象AnalysPoint

本节重点介绍 :

  • 日志正则消费分析对象
  • 日志消费者组存在的意义和对应的方法
  • 定义正则分析结果对象AnalysPoint
  • 编写正则处理方法

日志消费组和日志正则处理对象AnalysPoint

日志正则消费分析对象

consumer对象

  • 位置 consumer\consumer.go
package consumerimport ("bytes""github.com/toolkits/pkg/logger""log2metrics/strategy""math""regexp""sort""strconv""time"
)//单个Consumer对象
type Consumer struct {FilePath     stringClose        chan struct{}Stream       chan stringCounterQueue chan *AnalysPointMark         string //标记该worker信息,方便打log及上报自监控指标, 追查问题Analyzing    bool   //标记当前Worker状态是否在分析中,还是空闲状态Stra         *strategy.Strategy
}

启动和停止

func (c *Consumer) Start() {go func() {c.Work()}()
}func (c *Consumer) Stop() {close(c.Close)
}

核心的work方法

  • 启动一个 统计的任务协程
  • 核心方法为,从c.Stream接收每行的日志,然后调用 analysis方法进行分析
func (c *Consumer) Work() {logger.Infof("worker starting...[%s]", c.Mark)var anaCnt, anaSwp int64analysClose := make(chan int)// 统计的任务go func() {for {//休眠10sselect {case <-analysClose:returncase <-time.After(time.Second * 10):}a := anaCntlogger.Debugf("[mark:%v]analysis %d line in last 10s", c.Mark, a-anaSwp)anaSwp = a}}()for {select {case line := <-c.Stream:c.Analyzing = trueanaCnt = anaCnt + 1c.analysis(line)c.Analyzing = falsecase <-c.Close:analysClose <- 0return}}
}

日志正则处理函数 analysis

  • 可以先使用简单的日志打印代替,如果能打印说明流程没问题
func (c *Consumer) analysis(line string) {logger.Infof("[mark:%v]start analysis %v", c.Mark, line)//c.producer(line)}

日志消费者组

作用

  • 因为正则匹配比较消耗资源,速度较慢
  • 所以一个消费者不够用,所以要抽象消费者组容纳多个消费者

代码

  • 位置 consumer\group.go
package consumerimport ("fmt""github.com/toolkits/pkg/logger""log2metrics/common""log2metrics/strategy"
)//Consumer组
type ConsumerGroup struct {ConsumerNum        intConsumers          []*Consumer
}func NewConsumerGroup(filePath string, stream chan string, stra *strategy.Strategy, cq chan *AnalysPoint) *ConsumerGroup {consumerNum := common.LogConsumerNumcg := &ConsumerGroup{ConsumerNum: consumerNum,Consumers:   make([]*Consumer, 0),}logger.Infof("new worker group, [file:%s][consumer_num:%d]", filePath, consumerNum)for i := 0; i < cg.ConsumerNum; i++ {mark := fmt.Sprintf("[consumer][file:%s][num:%d/%d]", filePath, i, consumerNum)c := Consumer{}c.CounterQueue = cqc.Stra = strac.Close = make(chan struct{})c.FilePath = filePathc.Stream = streamc.Mark = markc.Analyzing = falsecg.Consumers = append(cg.Consumers, &c)}return cg
}func (cg *ConsumerGroup) Start() {for _, consumer := range cg.Consumers {consumer.Start()}
}func (cg *ConsumerGroup) Stop() {for _, consumer := range cg.Consumers {consumer.Stop()}
}
解读一下
  • 根据配置的组中消费者数量,创建消费者
  • stream是接收日志reader信息的chan
  • cq是分析结果后传输 结果的chan,对象是AnalysPoint

初始化job

  • 位置logjob\perjob.go
func (lj *LogJob) start(cq chan *consumer.AnalysPoint) {fPath := lj.Stra.FilePathcache := make(chan string, common.LogQueueSize)//启动readerr, err := reader.NewReader(fPath, cache)if err != nil {return}lj.r = r//启动workercg := consumer.NewConsumerGroup(fPath, cache, lj.Stra, cq)cg.Start()lj.c = cg//启动readergo r.Start()logger.Infof("Create job success [filePath:%s][sid:%d]", fPath, lj.Stra.ID)
}func (lj *LogJob) stop() {lj.c.Stop() //先stop consumerlj.r.Stop()logger.Infof("stop job success [filePath:%s][sid:%d]", lj.Stra.FilePath, lj.Stra.ID)
}

定义正则分析结果对象

  • 位置 consumer\consumer.go
//从worker往计算部分推的Point
type AnalysPoint struct {Value           float64           // 数字的正则,cnt是 NaN,其余是对应的数字MetricsName     string            // metrics的名字,用作后续匹配使用LogFunc         string            // 计算的方法,cnt、avg、max、minSortLabelString string            // 标签排序后的结果LabelMap        map[string]string // 标签的map
}

编写正则处理方法

func (c *Consumer) producer(line string) {defer func() {if err := recover(); err != nil {logger.Errorf("%s[producer panic] : %v", c.Mark, err)}}()//处理用户正则var patternReg *regexp.Regexpvar value = math.NaN()var err errorpatternReg = c.Stra.PatternRegv := patternReg.FindStringSubmatch(line)var vString stringif len(v) == 0 {//  正则匹配失败return}logger.Debug("[mark:%v][line:%v][reg_res:%v]", c.Mark, line, v)/*patternReg.FindStringSubmatch(line) 的结果vlen=0 说明 正则没匹配中,应该丢弃这行len=1 说明 正则匹配中了,但是小括号分组没匹配到len>1 说明 正则匹配中了,小括号分组也匹配到*/if len(v) > 1 {// 用户正则的第一个 小括号分组 ()vString = v[1]} else {vString = ""}value, err = strconv.ParseFloat(vString, 64)if err != nil {value = math.NaN()}//处理tag 正则labelMap := map[string]string{}for tagk, regTag := range c.Stra.TagRegs {labelMap[tagk] = ""t := regTag.FindStringSubmatch(line)if t != nil && len(t) > 1 {labelMap[tagk] = t[1]}}ret := &AnalysPoint{LabelMap:        labelMap,Value:           value,SortLabelString: SortedTags(labelMap),MetricsName:     c.Stra.MetricName,LogFunc:         c.Stra.Func,}c.CounterQueue <- ret
}

处理日志主正则

  • patternReg.FindStringSubmatch(line) 的结果v
  • len=0 说明 正则没匹配中,应该丢弃这行
  • len=1 说明 正则匹配中了,但是小括号分组没匹配到
  • len>1 说明 正则匹配中了,小括号分组也匹配到
	//处理用户正则var patternReg *regexp.Regexpvar value = math.NaN()var err errorpatternReg = c.Stra.PatternRegv := patternReg.FindStringSubmatch(line)var vString stringif len(v) == 0 {//  正则匹配失败return}logger.Debug("[mark:%v][line:%v][reg_res:%v]", c.Mark, line, v)/*patternReg.FindStringSubmatch(line) 的结果vlen=0 说明 正则没匹配中,应该丢弃这行len=1 说明 正则匹配中了,但是小括号分组没匹配到len>1 说明 正则匹配中了,小括号分组也匹配到*/

设置value

  • 将正则匹配的结果做float64转行,如果失败就设置一个NaN
	value, err = strconv.ParseFloat(vString, 64)if err != nil {value = math.NaN()}

处理标签的正则

//处理tag 正则labelMap := map[string]string{}for tagk, regTag := range c.Stra.TagRegs {labelMap[tagk] = ""t := regTag.FindStringSubmatch(line)if t != nil && len(t) > 1 {labelMap[tagk] = t[1]}}
  • code=404 和code=200 是两个series,因为标签不一致
  • 所以需要一个标签排序的方法
func SortedTags(tags map[string]string) string {if tags == nil {return ""}size := len(tags)if size == 0 {return ""}ret := new(bytes.Buffer)if size == 1 {for k, v := range tags {ret.WriteString(k)ret.WriteString("=")ret.WriteString(v)}return ret.String()}keys := make([]string, size)i := 0for k := range tags {keys[i] = ki++}sort.Strings(keys)for j, key := range keys {ret.WriteString(key)ret.WriteString("=")ret.WriteString(tags[key])if j != size-1 {ret.WriteString(",")}}return ret.String()
}

构造正则分析的结果,塞入chan中

ret := &AnalysPoint{LabelMap:        labelMap,Value:           value,SortLabelString: SortedTags(labelMap),MetricsName:     c.Stra.MetricName,LogFunc:         c.Stra.Func,}c.CounterQueue <- ret

本节重点总结 :

  • 日志正则消费分析对象
  • 日志消费者组存在的意义和对应的方法
  • 定义正则分析结果对象AnalysPoint
  • 编写正则处理方法

http://www.mrgr.cn/news/66867.html

相关文章:

  • Redis应用(6)接口限流
  • Linux云计算 |【第五阶段】PROJECT3-DAY1
  • springboot职业推荐系统-计算机设计毕业源码12908
  • C# 独立线程
  • 损失函数的分类
  • 大数据与智能算法助力金融市场分析:正大的技术创新探索
  • 2023下半年上午(1~11)
  • 数据库概论实验一
  • 【云岚到家】-day09-1-项目迁移6-秒杀抢购介绍
  • Spark SQL大数据分析快速上手-DataFrame应用体验
  • 【Orange Pi 设备】window11主机下使用VNC可视化控制RK3566
  • python之dict
  • mac上的一些实用工具
  • 【5.10】指针算法-快慢指针将有序链表转二叉搜索树
  • 基于YOLOv8 Web的安全帽佩戴识别检测系统的研究和设计,数据集+训练结果+Web源码
  • 一文彻底搞懂大模型 - Dify(Agent + RAG)
  • 会议室有了智能中控系统价值,会议效率和效果还不飞升。
  • 自动化运维
  • 前端面筋(持续更新)
  • GESP4级考试语法知识(算法概论(一))
  • 会话技术 Cookie和Session对象
  • golang安装,常用框架安装,记忆点
  • 2024系统架构师---论软件系统架构风格论文
  • Elasticsearch与Redis的Netty冲突
  • flink 内存配置(四):内存调优和问题处理
  • mysql5安全审计