From 386e1dcf67300e56fea74a015ac68d40cf8351b6 Mon Sep 17 00:00:00 2001 From: simonzhangsz Date: Tue, 14 May 2024 17:06:39 +0800 Subject: [PATCH] opt: remove insert kpi to gold_kpi table --- features/pm/performance.go | 125 ++++++++++++++++++++++++++++++------- 1 file changed, 104 insertions(+), 21 deletions(-) diff --git a/features/pm/performance.go b/features/pm/performance.go index 6f17fcd4..614bc020 100644 --- a/features/pm/performance.go +++ b/features/pm/performance.go @@ -65,16 +65,16 @@ type GoldKpi struct { } type KpiData struct { - ID int `json:"id" xorm:"pk 'id' '<-' autoincr"` - NEType string `json:"neType" xorm:"ne_type"` - NEName string `json:"neName" xorm:"ne_name"` - RmUid string `json:"rmUid" xorm:"rm_uid"` - Date string `json:"date" xorm:"date"` - StartTime time.Time `json:"startTime" xorm:"start_time"` - EndTime time.Time `json:"endTime" xorm:"end_time"` - Index int `json:"index" xorm:"index"` - Granularity int8 `json:"granularity" xorm:"granularity"` - KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"` + ID int `json:"id" xorm:"pk 'id' '<-' autoincr"` + NEType string `json:"neType" xorm:"ne_type"` + NEName string `json:"neName" xorm:"ne_name"` + RmUid string `json:"rmUid" xorm:"rm_uid"` + Date string `json:"date" xorm:"date"` + StartTime string `json:"startTime" xorm:"start_time"` + EndTime string `json:"endTime" xorm:"end_time"` + Index int `json:"index" xorm:"index"` + Granularity int8 `json:"granularity" xorm:"granularity"` + KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"` //CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"` CreatedAt int64 `json:"createdAt" xorm:"'created_at'"` } @@ -190,7 +190,7 @@ func GetDateTimeFromTimeString(fmtString string, timeString string) string { return t.Format(global.DateTime) } -// process alarm post message from NFs +// process KPI report post message from NFs func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostKPIReportFromNF processing... ") @@ -202,6 +202,91 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { return } + // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) + body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) + if err != nil { + log.Error("Faile to io.ReadAll: ", err) + services.ResponseNotFound404UriNotExist(w, r) + return + } + //log.Trace("Request body:", string(body)) + kpiReport := new(KpiReport) + _ = json.Unmarshal(body, &kpiReport) + //log.Trace("kpiReport:", kpiReport) + + layout := time.RFC3339Nano + kpiDate := GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime) + kpiIndex, _ := strconv.Atoi(vars["index"]) + startTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime) + endTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime) + // get time granularity from startTime and endTime + seconds, _ := global.GetSecondDuration(startTime, endTime) + var granularity int8 = 60 + if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 { + granularity = int8(seconds) + } + + // 黄金指标事件对象 + kpiEvent := map[string]any{ + // kip_id ... + "neType": kpiReport.Task.NE.NeType, + "neName": kpiReport.Task.NE.NEName, + "startIndex": kpiIndex, + "timeGroup": startTime, + } + // insert into new kpi_report_xxx table + kpiData := new(KpiData) + kpiData.Date = kpiDate + kpiData.Index = kpiIndex + //stime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local) + //etime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local) + kpiData.StartTime = startTime + kpiData.EndTime = endTime + kpiData.Granularity = granularity + kpiData.NEName = kpiReport.Task.NE.NEName + kpiData.NEType = kpiReport.Task.NE.NeType + kpiData.RmUid = kpiReport.Task.NE.RmUID + kpiVal := new(KPIVal) + kpiData.CreatedAt = time.Now().UnixMilli() + for _, k := range kpiReport.Task.NE.KPIs { + kpiEvent[k.KPIID] = k.Value // kip_id + + kpiVal.KPIID = k.KPIID + kpiVal.Value = int64(k.Value) + kpiVal.Err = k.Err + kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal) + } + + // insert kpi_report table, no session + tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType) + affected, err := xEngine.Table(tableName).Insert(kpiData) + if err != nil && affected <= 0 { + log.Errorf("Failed to insert %s:%v", tableName, err) + services.ResponseInternalServerError500ProcessError(w, err) + return + } + + // 推送到ws订阅组 + wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent) + if kpiReport.Task.NE.NeType == "UPF" { + wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF, kpiEvent) + } + + services.ResponseStatusOK204NoContent(w) +} + +// post kpi report from NEs, insert insto gold_kpi table, discard... +func PostGoldKPIFromNF(w http.ResponseWriter, r *http.Request) { + log.Debug("PostKPIReportFromNF processing... ") + + vars := mux.Vars(r) + apiVer := vars["apiVersion"] + if apiVer != global.ApiVersionV1 { + log.Error("Uri api version is invalid. apiVersion:", apiVer) + services.ResponseNotFound404UriNotExist(w, r) + return + } + // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { @@ -244,12 +329,10 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { kpiData := new(KpiData) kpiData.Date = goldKpi.Date kpiData.Index = goldKpi.Index - st, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local) - et, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local) - // kpiData.StartTime = goldKpi.StartTime - // kpiData.EndTime = goldKpi.EndTime - kpiData.StartTime = st - kpiData.EndTime = et + //st, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local) + //et, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local) + kpiData.StartTime = goldKpi.StartTime + kpiData.EndTime = goldKpi.EndTime kpiData.Granularity = goldKpi.Granularity kpiData.NEName = goldKpi.NEName kpiData.NEType = goldKpi.NEType @@ -969,9 +1052,9 @@ func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) { layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) - affected, err := session.Table("northbound_pm").Insert(measurement) + affected, err := session.Table("nbi_pm").Insert(measurement) if err != nil && affected <= 0 { - log.Error("Failed to insert northbound_pm:", err) + log.Error("Failed to insert nbi_pm:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } @@ -1052,9 +1135,9 @@ func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) { layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) - affected, err := session.Table("northbound_pm").Insert(measurement) + affected, err := session.Table("nbi_pm").Insert(measurement) if err != nil && affected <= 0 { - log.Error("Failed to insert northbound_pm:", err) + log.Error("Failed to insert nbi_pm:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return }