diff --git a/features/pm/performance.go b/features/pm/performance.go index d717e791..1613e3f6 100644 --- a/features/pm/performance.go +++ b/features/pm/performance.go @@ -2,7 +2,6 @@ package pm import ( "encoding/json" - "errors" "fmt" "io" "math" @@ -16,12 +15,13 @@ import ( "be.ems/lib/log" "be.ems/lib/services" "be.ems/restagent/config" - "xorm.io/xorm" + neService "be.ems/src/modules/network_element/service" wsService "be.ems/src/modules/ws/service" "github.com/go-resty/resty/v2" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/mux" + "xorm.io/xorm" ) type Response struct { @@ -226,14 +226,6 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { granularity = int8(seconds) } - // 黄金指标事件对象 - kpiEvent := map[string]any{ - // kip_id ... - "neType": kpiReport.Task.NE.NeType, - "neName": kpiReport.Task.NE.NEName, - "startIndex": kpiIndex, - "timeGroup": startTime, - } // insert into new kpi_report_xxx table kpiData := new(KpiData) kpiData.Date = startTime @@ -248,6 +240,15 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { kpiData.RmUid = kpiReport.Task.NE.RmUID kpiVal := new(KPIVal) kpiData.CreatedAt = time.Now().UnixMilli() + + // 黄金指标事件对象 + kpiEvent := map[string]any{ + // kip_id ... + "neType": kpiReport.Task.NE.NeType, + "neName": kpiReport.Task.NE.NEName, + "startIndex": kpiIndex, + "timeGroup": kpiData.CreatedAt, + } for _, k := range kpiReport.Task.NE.KPIs { kpiEvent[k.KPIID] = k.Value // kip_id @@ -266,15 +267,20 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { return } - // 推送到ws订阅组 - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent) - if kpiReport.Task.NE.NeType == "UPF" { - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF, kpiEvent) + // 发送到匹配的网元 + neInfo := neService.NewNeInfoImpl.SelectNeInfoByRmuid(kpiData.RmUid) + if neInfo.RmUID == kpiData.RmUid { + // 推送到ws订阅组 + wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) + if neInfo.NeType == "UPF" { + wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent) + } } services.ResponseStatusOK204NoContent(w) } +// PostGoldKPIFromNF 已废弃 // post kpi report from NEs, insert insto gold_kpi table, discard... func PostGoldKPIFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostKPIReportFromNF processing... ") @@ -639,7 +645,7 @@ func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { return } if neInfo == nil { - err := errors.New(fmt.Sprintf("not found target NE neType=%s, neId=%s", neType, neId)) + err := fmt.Errorf("not found target NE neType=%s, neId=%s", neType, neId) log.Error(err) services.ResponseInternalServerError500ProcessError(w, err) return @@ -692,7 +698,7 @@ func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { return } default: - err = errors.New(fmt.Sprintf("measure task status must be inactive id=%d", id)) + err = fmt.Errorf("measure task status must be inactive id=%d", id) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return @@ -719,7 +725,7 @@ func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { services.TransportResponse(w, response.StatusCode(), response.Body()) return } else { - err = errors.New(fmt.Sprintf("failed to active measure task, NF return error status=%v", response.Status())) + err = fmt.Errorf("failed to active measure task, NF return error status=%v", response.Status()) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return @@ -932,8 +938,6 @@ func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { return } if neInfo == nil { - em := errors.New("Not found NE info in database") - log.Error(em) taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) @@ -986,7 +990,6 @@ func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { } services.ResponseWithJson(w, response.StatusCode(), respMsg) - return } type Measurement struct { diff --git a/src/modules/network_data/controller/all_kpi.go b/src/modules/network_data/controller/all_kpi.go index 6131c122..221eb655 100644 --- a/src/modules/network_data/controller/all_kpi.go +++ b/src/modules/network_data/controller/all_kpi.go @@ -3,7 +3,6 @@ package controller import ( "be.ems/src/framework/i18n" "be.ems/src/framework/utils/ctx" - "be.ems/src/framework/utils/date" "be.ems/src/framework/vo/result" "be.ems/src/modules/network_data/model" neDataService "be.ems/src/modules/network_data/service" @@ -37,18 +36,6 @@ func (s *PerfKPIController) GoldKPI(c *gin.Context) { c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) return } - - // 时间格式校验 - startTime := date.ParseStrToDate(querys.StartTime, date.YYYY_MM_DD_HH_MM_SS) - if startTime.IsZero() { - c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) - return - } - endTime := date.ParseStrToDate(querys.EndTime, date.YYYY_MM_DD_HH_MM_SS) - if endTime.IsZero() { - c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) - return - } if querys.Interval < 5 || querys.Interval > 3600 { c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) return diff --git a/src/modules/network_data/model/perf_kpi.go b/src/modules/network_data/model/perf_kpi.go index 108d81f2..be88a063 100644 --- a/src/modules/network_data/model/perf_kpi.go +++ b/src/modules/network_data/model/perf_kpi.go @@ -16,7 +16,7 @@ type GoldKPIQuery struct { NeID string `form:"neId" binding:"required"` StartTime string `form:"startTime" binding:"required"` EndTime string `form:"endTime" binding:"required"` - Interval int64 `form:"interval" binding:"required"` + Interval int64 `form:"interval" binding:"required,oneof=5 60 300 900 1800 3600"` RmUID string `form:"rmUID"` SortField string `form:"sortField" binding:"omitempty,oneof=timeGroup"` SortOrder string `form:"sortOrder" binding:"omitempty,oneof=asc desc"` diff --git a/src/modules/network_data/repository/perf_kpi.go b/src/modules/network_data/repository/perf_kpi.go index d2674155..1a00957f 100644 --- a/src/modules/network_data/repository/perf_kpi.go +++ b/src/modules/network_data/repository/perf_kpi.go @@ -7,9 +7,6 @@ type IPerfKPI interface { // SelectGoldKPI 通过网元指标数据信息 SelectGoldKPI(query model.GoldKPIQuery, kpiIds []string) []map[string]any - // select from new kpi report table, exp. kpi_report_upf - SelectKpiReport(query model.GoldKPIQuery, kpiIds []string) []map[string]any - // SelectGoldKPITitle 网元对应的指标名称 SelectGoldKPITitle(neType string) []model.GoldKPITitle diff --git a/src/modules/network_data/repository/perf_kpi.impl.go b/src/modules/network_data/repository/perf_kpi.impl.go index 80908098..b7ac5bb8 100644 --- a/src/modules/network_data/repository/perf_kpi.impl.go +++ b/src/modules/network_data/repository/perf_kpi.impl.go @@ -17,76 +17,6 @@ type PerfKPIImpl struct{} // SelectGoldKPI 通过网元指标数据信息 func (r *PerfKPIImpl) SelectGoldKPI(query model.GoldKPIQuery, kpiIds []string) []map[string]any { - // 查询条件拼接 - var conditions []string - var params []any - if query.RmUID != "" { - conditions = append(conditions, "gk.rm_uid = ?") - params = append(params, query.RmUID) - } - if query.NeType != "" { - conditions = append(conditions, "gk.ne_type = ?") - params = append(params, query.NeType) - } - if query.StartTime != "" { - conditions = append(conditions, "gk.start_time >= ?") - params = append(params, query.StartTime) - } - if query.EndTime != "" { - conditions = append(conditions, "gk.start_time <= ?") - params = append(params, query.EndTime) - } - // 构建查询条件语句 - whereSql := "" - if len(conditions) > 0 { - whereSql += " where " + strings.Join(conditions, " and ") - } - - // 查询字段列 - timeFormat := "DATE_FORMAT(gk.start_time, '%Y-%m-%d %H:%i:')" - secondGroup := fmt.Sprintf("LPAD(FLOOR(SECOND(gk.start_time) / %d) * %d, 2, '0')", query.Interval, query.Interval) - groupByField := fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, secondGroup) - if query.Interval > 60 { - minute := query.Interval / 60 - timeFormat = "DATE_FORMAT(gk.start_time, '%Y-%m-%d %H:')" - minuteGroup := fmt.Sprintf("LPAD(FLOOR(MINUTE(gk.start_time) / %d) * %d, 2, '0')", minute, minute) - groupByField = fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, minuteGroup) - } - var fields = []string{ - groupByField, - "min(CASE WHEN gk.index != '' THEN gk.index ELSE 0 END) AS startIndex", - "min(CASE WHEN gk.ne_type != '' THEN gk.ne_type ELSE 0 END) AS neType", - "min(CASE WHEN gk.ne_name != '' THEN gk.ne_name ELSE 0 END) AS neName", - } - for _, kid := range kpiIds { - // 特殊字段,只取最后一次收到的非0值 - if kid == "AMF.01" || kid == "UDM.01" || kid == "UDM.02" || kid == "UDM.03" || kid == "SMF.01" { - str := fmt.Sprintf("IFNULL(SUBSTRING_INDEX(GROUP_CONCAT( CASE WHEN gk.kpi_id = '%s' and gk.VALUE != 0 THEN gk.VALUE END ), ',', 1), 0) AS '%s'", kid, kid) - fields = append(fields, str) - } else { - str := fmt.Sprintf("sum(CASE WHEN gk.kpi_id = '%s' THEN gk.value ELSE 0 END) AS '%s'", kid, kid) - fields = append(fields, str) - } - } - fieldsSql := strings.Join(fields, ",") - - // 查询数据 - if query.SortField == "" { - query.SortField = "timeGroup" - } - if query.SortOrder == "" { - query.SortOrder = "desc" - } - orderSql := fmt.Sprintf(" order by %s %s", query.SortField, query.SortOrder) - querySql := fmt.Sprintf("SELECT %s FROM gold_kpi gk %s GROUP BY timeGroup %s", fieldsSql, whereSql, orderSql) - results, err := datasource.RawDB("", querySql, params) - if err != nil { - logger.Errorf("query err => %v", err) - } - return results -} - -func (r *PerfKPIImpl) SelectKpiReport(query model.GoldKPIQuery, kpiIds []string) []map[string]any { // 查询条件拼接 var conditions []string var params []any @@ -100,43 +30,15 @@ func (r *PerfKPIImpl) SelectKpiReport(query model.GoldKPIQuery, kpiIds []string) // params = append(params, query.NeType) tableName += strings.ToLower(query.NeType) } - - var dateStr1, dateStr2, timeStr1, timeStr2 string if query.StartTime != "" { - dateStr1 = query.StartTime[:10] - timeStr1 = query.StartTime[11:] + conditions = append(conditions, "gk.created_at >= ?") + params = append(params, query.StartTime) } if query.EndTime != "" { - dateStr2 = query.EndTime[:10] - timeStr2 = query.EndTime[11:] - } - if dateStr1 == dateStr2 && dateStr1 != "" { - conditions = append(conditions, "gk.`date` = ?") - params = append(params, dateStr1) - conditions = append(conditions, "gk.`start_time` >= ?") - params = append(params, timeStr1) - conditions = append(conditions, "gk.`start_time` <= ?") - params = append(params, timeStr2) - } else { - if dateStr1 != "" { - conditions = append(conditions, "(gk.`date` > ? OR (gk.`date` = ? AND gk.`start_time` >= ?))") - params = append(params, dateStr1, dateStr1, timeStr1) - } - if dateStr2 != "" { - conditions = append(conditions, "(gk.`date` < ? OR (gk.`date` = ? AND gk.`start_time` <= ?))") - params = append(params, dateStr2, dateStr2, timeStr2) - } + conditions = append(conditions, "gk.created_at <= ?") + params = append(params, query.EndTime) } - // var dateTimeStr string = "CONCAT(gk.`date`, \" \", gk.start_time)" - // if query.StartTime != "" { - // conditions = append(conditions, dateTimeStr+" >= ?") - // params = append(params, query.StartTime) - // } - // if query.EndTime != "" { - // conditions = append(conditions, dateTimeStr+" <= ?") - // params = append(params, query.EndTime) - // } // 构建查询条件语句 whereSql := "" if len(conditions) > 0 { @@ -144,18 +46,9 @@ func (r *PerfKPIImpl) SelectKpiReport(query model.GoldKPIQuery, kpiIds []string) } // 查询字段列 - var dateTimeStr string = "CONCAT(gk.`date`, \" \", gk.start_time)" - timeFormat := "DATE_FORMAT(" + dateTimeStr + ", '%Y-%m-%d %H:%i:')" - secondGroup := fmt.Sprintf("LPAD(FLOOR(SECOND(gk.start_time) / %d) * %d, 2, '0')", query.Interval, query.Interval) - groupByField := fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, secondGroup) - if query.Interval > 60 { - minute := query.Interval / 60 - timeFormat = "DATE_FORMAT(" + dateTimeStr + ", '%Y-%m-%d %H:')" - minuteGroup := fmt.Sprintf("LPAD(FLOOR(MINUTE(gk.start_time) / %d) * %d, 2, '0')", minute, minute) - groupByField = fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, minuteGroup) - } var fields = []string{ - groupByField, + // fmt.Sprintf("FROM_UNIXTIME(FLOOR(gk.created_at / (%d * 1000)) * %d) AS timeGroup", query.Interval, query.Interval), + fmt.Sprintf("CONCAT(FLOOR(gk.created_at / (%d * 1000)) * (%d * 1000)) AS timeGroup", query.Interval, query.Interval), // 时间戳毫秒 "min(CASE WHEN gk.index != '' THEN gk.index ELSE 0 END) AS startIndex", "min(CASE WHEN gk.ne_type != '' THEN gk.ne_type ELSE 0 END) AS neType", "min(CASE WHEN gk.ne_name != '' THEN gk.ne_name ELSE 0 END) AS neName", diff --git a/src/modules/network_data/service/perf_kpi.impl.go b/src/modules/network_data/service/perf_kpi.impl.go index da234fab..a7174faf 100644 --- a/src/modules/network_data/service/perf_kpi.impl.go +++ b/src/modules/network_data/service/perf_kpi.impl.go @@ -31,8 +31,7 @@ func (r *PerfKPIImpl) SelectGoldKPI(query model.GoldKPIQuery) []map[string]any { kpiIds = append(kpiIds, kpiId.KPIID) } - //data := r.perfKPIRepository.SelectGoldKPI(query, kpiIds) - data := r.perfKPIRepository.SelectKpiReport(query, kpiIds) + data := r.perfKPIRepository.SelectGoldKPI(query, kpiIds) if data == nil { return []map[string]any{} } diff --git a/src/modules/ws/service/ws_send.impl.go b/src/modules/ws/service/ws_send.impl.go index 94c23b71..2eea685b 100644 --- a/src/modules/ws/service/ws_send.impl.go +++ b/src/modules/ws/service/ws_send.impl.go @@ -12,10 +12,10 @@ import ( const ( // 组号-其他 GROUP_OTHER = "0" - // 组号-指标 - GROUP_KPI = "10" - // 组号-指标UPF - GROUP_KPI_UPF = "12" + // 组号-指标通用 10_neType_neId + GROUP_KPI = "10_" + // 组号-指标UPF 12_neId + GROUP_KPI_UPF = "12_" // 组号-IMS_CDR会话事件 GROUP_IMS_CDR = "1005" // 组号-SMF_CDR会话事件