package service import ( "encoding/json" "fmt" "time" "be.ems/src/framework/constants/cachekey" "be.ems/src/framework/redis" "be.ems/src/framework/utils/parse" "be.ems/src/modules/network_data/model" "be.ems/src/modules/network_data/repository" neModel "be.ems/src/modules/network_element/model" ) // 实例化数据层 PerfKPI 结构体 var NewPerfKPI = &PerfKPI{ perfKPIRepository: repository.NewPerfKPI, } // PerfKPI 性能统计 服务层处理 type PerfKPI struct { perfKPIRepository *repository.PerfKPI // 性能统计数据信息 } // SelectGoldKPI 通过网元指标数据信息 func (r *PerfKPI) SelectGoldKPI(query model.GoldKPIQuery) []map[string]any { // 获取数据指标id var kpiIds []string kpiTitles := r.perfKPIRepository.SelectGoldKPITitle(query.NeType) for _, kpiId := range kpiTitles { kpiIds = append(kpiIds, kpiId.KPIID) } data := r.perfKPIRepository.SelectGoldKPI(query, kpiIds) if data == nil { return []map[string]any{} } return data } // SelectGoldKPITitle 网元对应的指标名称 func (r *PerfKPI) SelectGoldKPITitle(neType string) []model.GoldKPITitle { return r.perfKPIRepository.SelectGoldKPITitle(neType) } // UPFTodayFlowFind 查询UPF总流量 N3上行 N6下行 func (r PerfKPI) UPFTodayFlowFind(rmUID string, day int) (int64, int64) { // 获取当前日期 now := time.Now() var upTotal, downTotal int64 // 查询最近7天的数据 for i := 0; i <= day; i++ { dateKey := now.AddDate(0, 0, -i).Format("2006-01-02") key := fmt.Sprintf("%sUPF_FLOW:%s:%s", cachekey.NE_DATA_KEY, rmUID, dateKey) // 读取缓存数据 up, err := redis.GetHash("", key, "up") if err != nil || up == "" { up = "0" } down, err := redis.GetHash("", key, "down") if err != nil || down == "" { down = "0" } upTotal += parse.Number(up) downTotal += parse.Number(down) } return upTotal, downTotal } // UPFTodayFlow UPF流量今日统计 func (r PerfKPI) UPFTodayFlowUpdate(rmUID string, upValue, downValue int64, rest bool) error { // 按日期存储统计数据 dateKey := time.Now().Format("2006-01-02") key := fmt.Sprintf("%sUPF_FLOW:%s:%s", cachekey.NE_DATA_KEY, rmUID, dateKey) // 重置数据 if rest { err := redis.SetHash("", key, map[string]any{ "up": upValue, "down": downValue, }) if err != nil { return err } // 设置key的过期时间为30天,自动清理旧数据 err = redis.Expire("", key, 30*24*time.Hour) if err != nil { return err } } // 使用HIncrBy实时累加统计值 if err := redis.IncrBy("", key, "up", upValue); err != nil { return err } if err := redis.IncrBy("", key, "down", downValue); err != nil { return err } return nil } // UPFTodayFlowLoad UPF上下行数据到redis func (r PerfKPI) UPFTodayFlowLoad() { cacheKeys, _ := redis.GetKeys("", cachekey.NE_KEY+"UPF:*") if len(cacheKeys) == 0 { return } // 今日流量 now := time.Now() beginTime := now.Truncate(24 * time.Hour).UnixMilli() endTime := beginTime + 24*60*60*1000 - 1 for _, key := range cacheKeys { var v neModel.NeInfo jsonStr, _ := redis.Get("", key) if len(jsonStr) > 7 { json.Unmarshal([]byte(jsonStr), &v) } if v.NeType == "UPF" && v.RmUID != "" { // 查询历史数据 // down * 8 / 1000 / 1000 单位M info := r.perfKPIRepository.SelectUPFTotalFlow("UPF", v.RmUID, fmt.Sprint(beginTime), fmt.Sprint(endTime)) if v, ok := info["up"]; ok && v == nil { info["up"] = 0 } if v, ok := info["down"]; ok && v == nil { info["down"] = 0 } upTotal := parse.Number(info["up"]) downTotal := parse.Number(info["down"]) // 将历史数据添加到Redis r.UPFTodayFlowUpdate(v.RmUID, upTotal, downTotal, true) } } }