package service import ( "encoding/json" "fmt" "time" "be.ems/src/framework/constants/cachekey" "be.ems/src/framework/database/redis" "be.ems/src/framework/utils/parse" "be.ems/src/modules/network_data/model" "be.ems/src/modules/network_data/repository" neModel "be.ems/src/modules/network_element/model" ) // 实例化数据层 PerfKPI 结构体 var NewPerfKPI = &PerfKPI{ perfKPIRepository: repository.NewPerfKPI, } // PerfKPI 性能统计 服务层处理 type PerfKPI struct { perfKPIRepository *repository.PerfKPI // 性能统计数据信息 } // SelectGoldKPI 通过网元指标数据信息 func (r *PerfKPI) SelectGoldKPI(query model.GoldKPIQuery) []map[string]any { // 获取数据指标id var kpiIds []string kpiTitles := r.perfKPIRepository.SelectGoldKPITitle(query.NeType) for _, kpiId := range kpiTitles { kpiIds = append(kpiIds, kpiId.KPIID) } data := r.perfKPIRepository.SelectGoldKPI(query, kpiIds) if data == nil { return []map[string]any{} } return data } // SelectGoldKPITitle 网元对应的指标名称 func (r *PerfKPI) SelectGoldKPITitle(neType string) []model.GoldKPITitle { return r.perfKPIRepository.SelectGoldKPITitle(neType) } // UPFTodayFlowFind 查询UPF总流量 N3上行 N6下行 // day 统计天数 func (r PerfKPI) UPFTodayFlowFind(rmUID string, day int) (int64, int64) { // 获取当前日期 now := time.Now() var upTotal, downTotal int64 // 查询最近day天的数据 for i := 0; i <= day; i++ { dateKey := now.AddDate(0, 0, -i).Format("2006-01-02") key := fmt.Sprintf("%sUPF_FLOW:%s:%s", cachekey.NE_DATA_KEY, rmUID, dateKey) // 读取缓存数据 up, err := redis.GetHash("", key, "up") if err != nil || up == "" { up = "0" } down, err := redis.GetHash("", key, "down") if err != nil || down == "" { down = "0" } upTotal += parse.Number(up) downTotal += parse.Number(down) } return upTotal, downTotal } // UPFTodayFlow UPF流量今日统计 func (r PerfKPI) UPFTodayFlowUpdate(rmUID string, upValue, downValue int64) error { // 按日期存储统计数据 dateKey := time.Now().Format("2006-01-02") key := fmt.Sprintf("%sUPF_FLOW:%s:%s", cachekey.NE_DATA_KEY, rmUID, dateKey) // 使用HIncrBy实时累加统计值 if err := redis.IncrBy("", key, "up", upValue); err != nil { return err } if err := redis.IncrBy("", key, "down", downValue); err != nil { return err } return nil } // UPFTodayFlowLoad UPF上下行数据到redis // day 统计天数 func (r PerfKPI) UPFTodayFlowLoad(day int) { cacheKeys, _ := redis.GetKeys("", cachekey.NE_KEY+"UPF:*") if len(cacheKeys) == 0 { return } now := time.Now() for _, key := range cacheKeys { var v neModel.NeInfo jsonStr, _ := redis.Get("", key) if len(jsonStr) > 7 { json.Unmarshal([]byte(jsonStr), &v) } if v.NeType == "UPF" && v.RmUID != "" { // 查询最近day天的数据 for i := 0; i <= day; i++ { dateKey := now.AddDate(0, 0, -i).Format("2006-01-02") key := fmt.Sprintf("%sUPF_FLOW:%s:%s", cachekey.NE_DATA_KEY, v.RmUID, dateKey) // 根据传入天数计算时间范围 beginTime := now.AddDate(0, 0, -i).Truncate(24 * time.Hour).UnixMilli() endTime := beginTime + 24*60*60*1000 - 1 // 查询历史数据 // down * 8 / 1000 / 1000 单位M info := r.perfKPIRepository.SelectUPFTotalFlow("UPF", v.RmUID, fmt.Sprint(beginTime), fmt.Sprint(endTime)) if v, ok := info["up"]; ok && v == nil { info["up"] = 0 } if v, ok := info["down"]; ok && v == nil { info["down"] = 0 } upTotal := parse.Number(info["up"]) downTotal := parse.Number(info["down"]) err := redis.SetHash("", key, map[string]any{ "up": upTotal, "down": downTotal, }) if err != nil { continue } // 设置key的过期时间为30天,自动清理旧数据 daySub := (30 - i) * 24 err = redis.Expire("", key, time.Duration(daySub)*time.Hour) if err != nil { continue } } } } }