package service import ( "encoding/json" "fmt" "math" "time" "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/expr" "be.ems/src/framework/utils/parse" "github.com/tsmask/go-oam" neDataModel "be.ems/src/modules/network_data/model" neDataService "be.ems/src/modules/network_data/service" neModel "be.ems/src/modules/network_element/model" neService "be.ems/src/modules/network_element/service" wsService "be.ems/src/modules/ws/service" ) // 实例化服务层 KPI 结构体 var NewKPI = &KPI{ neInfoService: neService.NewNeInfo, wsService: wsService.NewWSSend, kpiReportService: neDataService.NewKpiReport, kpiCReportService: neDataService.NewKpiCReport, } // KPI 消息处理 type KPI struct { neInfoService *neService.NeInfo wsService *wsService.WSSend kpiReportService *neDataService.KpiReport kpiCReportService *neDataService.KpiCReport } // Resolve 接收处理 func (s *KPI) Resolve(k oam.KPI) error { if len(k.Data) == 0 { return fmt.Errorf("kpi data is nil") } // 是否存在网元 neInfo := s.neInfoService.FindByRmuid(k.NeUid) if neInfo.NeType == "" || neInfo.RmUID != k.NeUid { return fmt.Errorf("resolve kpi network element does not exist %s", k.NeUid) } // 时间片 curTime := time.Now() curSeconds := curTime.Hour()*3600 + curTime.Minute()*60 + curTime.Second() index := int64(curSeconds) / k.Granularity if err := s.saveKPIData(neInfo, k, index); err != nil { return err } if err := s.saveKPIDataC(neInfo, k, index); err != nil { return err } return nil } // saveKPIData 存储KPI数据并推送到ws订阅组 func (s KPI) saveKPIData(neInfo neModel.NeInfo, k oam.KPI, index int64) error { // 时间数据处理 recordTime := time.Now() if k.RecordTime > 1e12 { recordTime = time.UnixMilli(k.RecordTime) } else if k.RecordTime > 1e9 { recordTime = time.Unix(k.RecordTime, 0) } recordDate := date.ParseDateToStr(recordTime, "2006-01-02") recordEndTime := date.ParseDateToStr(recordTime, "15:04:05") startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second) recordStartTime := date.ParseDateToStr(startTime, "15:04:05") // kpi data数据json kpiTitles := s.kpiReportService.FindTitle(neInfo.NeType) KpiValues := make([]map[string]any, 0) for _, kt := range kpiTitles { item := map[string]any{ "kpiId": kt.KpiId, "value": 0, "err": "", } // 匹配指标记录 for k, v := range k.Data { if k == kt.KpiId { item["value"] = v } } KpiValues = append(KpiValues, item) } KpiValuesByte, err := json.Marshal(KpiValues) if err != nil { return err } // KPI 信息 kpiData := neDataModel.KpiReport{ NeType: neInfo.NeType, NeName: neInfo.NeName, RmUid: neInfo.RmUID, Date: recordDate, StartTime: recordStartTime, EndTime: recordEndTime, Index: index, Granularity: k.Granularity, KpiValues: string(KpiValuesByte), CreatedAt: k.RecordTime, } insertId := s.kpiReportService.Insert(kpiData) if insertId <= 0 { return fmt.Errorf("add kpi data fail") } kpiData.ID = insertId // 指标事件对象 data := map[string]any{ "neType": kpiData.NeType, "neName": kpiData.NeName, "rmUID": kpiData.RmUid, "startIndex": kpiData.Index, "timeGroup": kpiData.CreatedAt, // kip_id ... } for _, v := range KpiValues { data[fmt.Sprint(v["kpiId"])] = v["value"] } // 推送到ws订阅组 s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), data) // 更新UPF总流量 if neInfo.NeType == "UPF" { upValue := parse.Number(data["UPF.03"]) downValue := parse.Number(data["UPF.06"]) s.kpiReportService.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue) } return nil } // saveKPIDataC 存储自定义KPI数据并推送到ws订阅组 func (s KPI) saveKPIDataC(neInfo neModel.NeInfo, k oam.KPI, index int64) error { // 时间数据处理 recordTime := time.Now() if k.RecordTime > 1e12 { recordTime = time.UnixMilli(k.RecordTime) } else if k.RecordTime > 1e9 { recordTime = time.Unix(k.RecordTime, 0) } recordDate := date.ParseDateToStr(recordTime, "2006-01-02") recordEndTime := date.ParseDateToStr(recordTime, "15:04:05") startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second) recordStartTime := date.ParseDateToStr(startTime, "15:04:05") // kpi data数据json kpiCTitles := s.kpiCReportService.FindTitle(neInfo.NeType) KpiValues := make([]map[string]any, 0) // 自定义指标的表达式环境变量 KpiExprEnv := make(map[string]any, 0) for k, v := range k.Data { KpiExprEnv[k] = v } // 自定义指标的计算 for _, v := range kpiCTitles { item := map[string]any{ "kpiId": v.KpiId, "value": 0, "err": "", } // 匹配指标记录 if envValue, envOk := KpiExprEnv[v.KpiId]; envOk { item["value"] = envValue } // 计算结果 exprStr, exprEnv := expr.ParseExprEnv(v.Expression, KpiExprEnv) result, err := expr.Eval(exprStr, exprEnv) if err != nil { item["value"] = 0 item["err"] = err.Error() } else { if v.Unit == "%" { resultV, ok := result.(float64) if !ok || math.IsNaN(resultV) { resultV = 0 } if resultV > 100 { result = 100 } if resultV <= 0 { result = 0 } } item["value"] = result } KpiValues = append(KpiValues, item) } KpiValuesByte, err := json.Marshal(KpiValues) if err != nil { return err } // KPI 信息 kpiCData := neDataModel.KpiCReport{ NeType: neInfo.NeType, NeName: neInfo.NeName, RmUid: neInfo.RmUID, Date: recordDate, StartTime: recordStartTime, EndTime: recordEndTime, Index: index, Granularity: k.Granularity, KpiValues: string(KpiValuesByte), CreatedAt: k.RecordTime, } insertId := s.kpiCReportService.Insert(kpiCData) if insertId <= 0 { return fmt.Errorf("add kpic data fail") } kpiCData.ID = insertId // 指标事件对象 data := map[string]any{ "neType": kpiCData.NeType, "neName": kpiCData.NeName, "rmUID": kpiCData.RmUid, "startIndex": kpiCData.Index, "timeGroup": kpiCData.CreatedAt, // kip_id ... } for _, v := range KpiValues { data[fmt.Sprint(v["kpiId"])] = v["value"] } // 推送到ws订阅组 s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), data) return nil }