diff --git a/features/pm/kpi_c_report/controller.go b/features/pm/kpi_c_report/controller.go index e546217c..1a3c5d8f 100644 --- a/features/pm/kpi_c_report/controller.go +++ b/features/pm/kpi_c_report/controller.go @@ -103,7 +103,7 @@ func (k *KpiCReport) GetReport2FE(c *gin.Context) { dbg := db.DB("").Model(&KpiCReport{}).Table(tableName) if querys.NeID != "" { - conditions = append(conditions, "rm_uid = (select n.rm_uid from ne_info n where n.ne_type=? and n.ne_id=? and n.status=1)") + conditions = append(conditions, "rm_uid = (select n.rm_uid from ne_info n where n.ne_type=? and n.ne_id=?)") params = append(params, querys.NeType, querys.NeID) } else { c.JSON(http.StatusBadRequest, services.ErrResp("Not found required parameter NE ID")) diff --git a/features/pm/performance.go b/features/pm/performance.go index bd690c0e..213021f4 100644 --- a/features/pm/performance.go +++ b/features/pm/performance.go @@ -199,7 +199,10 @@ func saveKPIData(kpiReport KpiReport, index int64) int64 { // 推送到ws订阅组 wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) if neInfo.NeType == "UPF" { - wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent) + // 更新UPF总流量 + upValue := parse.Number(kpiEvent["UPF.03"]) + downValue := parse.Number(kpiEvent["UPF.06"]) + neDataService.NewKpiReport.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue, false) } } } @@ -274,7 +277,7 @@ func saveKPIDataC(kpiReport KpiReport, index int64) int64 { neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid) if neInfo.RmUID == kpiData.RmUid { // 推送自定义KPI到ws订阅组 - wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiEvent) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiEvent) } } return insertId @@ -412,12 +415,9 @@ func PostKPIReportFromNFOld(w http.ResponseWriter, r *http.Request) { if neInfo.RmUID == kpiData.RmUid { // 推送到ws订阅组 - wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) // 推送自定义KPI到ws订阅组 - wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) - if neInfo.NeType == "UPF" { - wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent) - } + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) } services.ResponseStatusOK204NoContent(w) @@ -465,7 +465,7 @@ type Task struct { } type KpiSetJ struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 - KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"] + KPIs []string `json:"KPIs"` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"] } type MeasureTask struct { @@ -475,7 +475,7 @@ type MeasureTask struct { KpiSet []KpiSetJ `json:"KPISet" xorm:"kpi_set"` StartTime string `json:"startTime" xorm:"start_time"` EndTime string `json:"endTime" xorm:"end_time"` - Periods []Period `json:"Periods" xorm:"periods` + Periods []Period `json:"Periods" xorm:"periods"` Schedule []ScheduleJ `json:"Schedule" xorm:"schedule"` GranulOption string `json:"granulOption" xorm:"granul_option"` Status string `json:"status" xorm:"status"` @@ -483,7 +483,7 @@ type MeasureTask struct { Comment string `json:"comment" xorm:"comment"` CreateTime string `json:"createTime" xorm:"create_time"` UpdateTime string `json:"updateTime" xorm:"update_time"` - DeleteTime string `json:"deleteTime xorm:"delete_time"` + DeleteTime string `json:"deleteTime" xorm:"delete_time"` Tasks []Task `json:"Tasks"` NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */ diff --git a/src/framework/database/redis/expand.go b/src/framework/database/redis/expand.go index 8eba226e..85a8076a 100644 --- a/src/framework/database/redis/expand.go +++ b/src/framework/database/redis/expand.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "be.ems/src/framework/logger" "github.com/redis/go-redis/v9" @@ -137,3 +138,38 @@ func SetHash(source, key string, value map[string]any) error { } return nil } + +// IncrBy 累加统计数据 +func IncrBy(source, key, field string, value int64) error { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return fmt.Errorf("redis not client") + } + + // 使用HINCRBY命令进行累加统计 + ctx := context.Background() + err := rdb.HIncrBy(ctx, key, field, value).Err() + if err != nil { + logger.Errorf("redis HIncrBy err %v", err) + return err + } + return nil +} + +// Expire 过期时间设置 +func Expire(source, key string, expiration time.Duration) error { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return fmt.Errorf("redis not client") + } + // 过期时间设置 + ctx := context.Background() + err := rdb.Expire(ctx, key, expiration).Err() + if err != nil { + logger.Errorf("redis HIncrBy err %v", err) + return err + } + return nil +} diff --git a/src/modules/network_data/controller/upf.go b/src/modules/network_data/controller/upf.go index 7cd329b2..72e90efa 100644 --- a/src/modules/network_data/controller/upf.go +++ b/src/modules/network_data/controller/upf.go @@ -57,6 +57,6 @@ func (s UPFController) TotalFlow(c *gin.Context) { return } - data := s.kpiReportService.FindUPFTotalFlow(neInfo.RmUID, querys.Day) - c.JSON(200, resp.OkData(data)) + up, down := s.kpiReportService.UPFTodayFlowFind(neInfo.RmUID, querys.Day) + c.JSON(200, resp.OkData(map[string]int64{"up": up, "down": down})) } diff --git a/src/modules/network_data/network_data.go b/src/modules/network_data/network_data.go index 43a3840b..7c419b4b 100644 --- a/src/modules/network_data/network_data.go +++ b/src/modules/network_data/network_data.go @@ -6,6 +6,7 @@ import ( "be.ems/src/framework/middleware/collectlogs" "be.ems/src/framework/middleware/repeat" "be.ems/src/modules/network_data/controller" + "be.ems/src/modules/network_data/service" "github.com/gin-gonic/gin" ) @@ -14,6 +15,9 @@ import ( func Setup(router *gin.Engine) { logger.Infof("开始加载 ====> network_data 模块路由") + // 启动时需要的初始参数 + InitLoad() + neDataGroup := router.Group("/neData") // 性能统计信息 @@ -349,3 +353,9 @@ func Setup(router *gin.Engine) { ) } } + +// InitLoad 初始参数 +func InitLoad() { + // 启动时,加载UPF上下行流量 + go service.NewKpiReport.UPFTodayFlowLoad() +} diff --git a/src/modules/network_data/service/kpi_report.go b/src/modules/network_data/service/kpi_report.go index 2bd3f922..53134adc 100644 --- a/src/modules/network_data/service/kpi_report.go +++ b/src/modules/network_data/service/kpi_report.go @@ -11,6 +11,7 @@ import ( "be.ems/src/framework/utils/parse" "be.ems/src/modules/network_data/model" "be.ems/src/modules/network_data/repository" + neModel "be.ems/src/modules/network_element/model" ) // 实例化数据层 KpiReport 结构体 @@ -140,55 +141,111 @@ func (r KpiReport) FindTitle(neType string) []model.KpiTitle { return r.kpiReportRepository.SelectKPITitle(neType) } -// FindUPFTotalFlow 查询UPF总流量 N3上行 N6下行 -func (r KpiReport) FindUPFTotalFlow(rmUID string, day int) map[string]int64 { - now := time.Now() +// UPFTodayFlowFind 查询UPF总流量 N3上行 N6下行 +// down * 8 / 1000 / 1000 单位M +func (r KpiReport) UPFTodayFlowFind(rmUID string, day int) (int64, int64) { // 获取当前日期 - endTime := now.UnixMilli() - // 将当前日期前几天数 - beginTime := now.AddDate(0, 0, -day).Truncate(24 * time.Hour).UnixMilli() + now := time.Now() + var upTotal, downTotal int64 - data := map[string]int64{ - "up": 0, - "down": 0, - } + // 查询最近7天的数据 + for i := 0; i <= day; i++ { + dateKey := now.AddDate(0, 0, -i).Format("2006-01-02") + key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey) - // 读取缓存数据 小于2分钟重新缓存 - key := fmt.Sprintf("%s:UPF_FLOW:%s_%d", constants.CACHE_NE_DATA, rmUID, day) - if infoStr, err := redis.Get("", key); err == nil && infoStr != "" { - json.Unmarshal([]byte(infoStr), &data) - if expireSecond, _ := redis.GetExpire("", key); expireSecond > 120 { - return data + // 读取缓存数据 + up, err := redis.GetHash("", key, "up") + if err != nil || up == "" { + up = "0" } + down, err := redis.GetHash("", key, "down") + if err != nil || down == "" { + down = "0" + } + + upTotal += parse.Number(up) + downTotal += parse.Number(down) } - // down * 8 / 1000 / 1000 单位M - rows := r.kpiReportRepository.SelectUPF(rmUID, beginTime, endTime) - for _, row := range rows { - // 解析 JSON 字符串为 map - var kpiValues []map[string]any - err := json.Unmarshal([]byte(row.KpiValues), &kpiValues) + return upTotal, downTotal +} + +// UPFTodayFlow UPF流量今日统计 +func (r KpiReport) UPFTodayFlowUpdate(rmUID string, upValue, downValue int64, rest bool) error { + // 按日期存储统计数据 + dateKey := time.Now().Format("2006-01-02") + key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey) + + // 重置数据 + if rest { + err := redis.SetHash("", key, map[string]any{ + "up": upValue, + "down": downValue, + }) if err != nil { - continue + return err } + // 设置key的过期时间为30天,自动清理旧数据 + err = redis.Expire("", key, 30*24*time.Hour) + if err != nil { + return err + } + } - // 遍历 kpiValues 数组 - for _, v := range kpiValues { - if k, ok := v["kpiId"]; ok { - if k == "UPF.03" { - data["up"] = data["up"] + parse.Number(v["value"]) + // 使用HIncrBy实时累加统计值 + if err := redis.IncrBy("", key, "up", upValue); err != nil { + return err + } + if err := redis.IncrBy("", key, "down", downValue); err != nil { + return err + } + return nil +} + +// UPFTodayFlowLoad UPF上下行数据到redis +func (r KpiReport) UPFTodayFlowLoad() { + cacheKeys, _ := redis.GetKeys("", constants.CACHE_NE_INFO+":UPF:*") + if len(cacheKeys) == 0 { + return + } + + // 今日流量 + now := time.Now() + beginTime := now.Truncate(24 * time.Hour).UnixMilli() + endTime := beginTime + 24*60*60*1000 - 1 + + for _, key := range cacheKeys { + var v neModel.NeInfo + jsonStr, _ := redis.Get("", key) + if len(jsonStr) > 7 { + json.Unmarshal([]byte(jsonStr), &v) + } + if v.NeType == "UPF" && v.RmUID != "" { + // 查询历史数据 + rows := r.kpiReportRepository.SelectUPF(v.RmUID, beginTime, endTime) + var upTotal, downTotal int64 + + // 处理历史数据 + for _, row := range rows { + var kpiValues []map[string]any + if err := json.Unmarshal([]byte(row.KpiValues), &kpiValues); err != nil { + continue } - if k == "UPF.06" { - data["down"] = data["down"] + parse.Number(v["value"]) + + for _, v := range kpiValues { + if k, ok := v["kpiId"]; ok { + if k == "UPF.03" { + upTotal += parse.Number(v["value"]) + } + if k == "UPF.06" { + downTotal += parse.Number(v["value"]) + } + } } } + + // 将历史数据添加到Redis + r.UPFTodayFlowUpdate(v.RmUID, upTotal, downTotal, true) } } - - // 保存到缓存 - if infoJSON, err := json.Marshal(data); err == nil { - redis.SetByExpire("", key, string(infoJSON), time.Duration(10)*time.Minute) - } - - return data } diff --git a/src/modules/ws/processor/upf_total_flow.go b/src/modules/ws/processor/upf_total_flow.go index a0397014..7d83ba79 100644 --- a/src/modules/ws/processor/upf_total_flow.go +++ b/src/modules/ws/processor/upf_total_flow.go @@ -34,11 +34,11 @@ func GetUPFTotalFlow(requestID string, data any) ([]byte, error) { return nil, fmt.Errorf("no matching network element information found") } - dataMap := neDataService.NewKpiReport.FindUPFTotalFlow(neInfo.RmUID, querys.Day) + up, down := neDataService.NewKpiReport.UPFTodayFlowFind(neInfo.RmUID, querys.Day) resultByte, err := json.Marshal(resp.Ok(map[string]any{ "requestId": requestID, - "data": dataMap, + "data": map[string]int64{"up": up, "down": down}, })) return resultByte, err } diff --git a/src/modules/ws/service/ws_send.go b/src/modules/ws/service/ws_send.go index fda2979c..86aa9461 100644 --- a/src/modules/ws/service/ws_send.go +++ b/src/modules/ws/service/ws_send.go @@ -17,11 +17,9 @@ const ( // 组号-信令跟踪Packet 4_taskNo GROUP_TRACE_PACKET = "4_" // 组号-指标通用 10_neType_neId - GROUP_KPI = "10_" - // 组号-指标UPF 12_neId - GROUP_KPI_UPF = "12_" + GROUP_KPI = "10" // 组号-自定义KPI指标 20_neType_neId - GROUP_KPI_C = "20_" + GROUP_KPI_C = "20" // 组号-IMS_CDR会话事件 1005_neId GROUP_IMS_CDR = "1005_" // 组号-SMF_CDR会话事件 1006_neId