diff --git a/src/framework/redis/expand.go b/src/framework/redis/expand.go new file mode 100644 index 00000000..85a8076a --- /dev/null +++ b/src/framework/redis/expand.go @@ -0,0 +1,175 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "be.ems/src/framework/logger" + "github.com/redis/go-redis/v9" +) + +// 连接Redis实例 +func ConnectPush(source string, rdb *redis.Client) { + if rdb == nil { + delete(rdbMap, source) + return + } + rdbMap[source] = rdb +} + +// 批量获得缓存数据 [key]result +func GetHashBatch(source string, keys []string) (map[string]map[string]string, error) { + result := make(map[string]map[string]string, 0) + if len(keys) == 0 { + return result, fmt.Errorf("not keys") + } + + // 数据源 + rdb := RDB(source) + if rdb == nil { + return result, fmt.Errorf("redis not client") + } + + // 创建一个有限的并发控制信号通道 + sem := make(chan struct{}, 10) + var wg sync.WaitGroup + var mt sync.Mutex + batchSize := 1000 + total := len(keys) + if total < batchSize { + batchSize = total + } + + for i := 0; i < total; i += batchSize { + wg.Add(1) + go func(start int) { + ctx := context.Background() + // 并发控制,限制同时执行的 Goroutine 数量 + sem <- struct{}{} + defer func() { + <-sem + ctx.Done() + wg.Done() + }() + + // 检查索引是否越界 + end := start + batchSize + if end > total { + end = total + } + pipe := rdb.Pipeline() + for _, key := range keys[start:end] { + pipe.HGetAll(ctx, key) + } + + cmds, err := pipe.Exec(ctx) + if err != nil { + logger.Errorf("Failed to get hash batch exec err: %v", err) + return + } + + // 将结果添加到 result map 并发访问 + mt.Lock() + defer mt.Unlock() + + // 处理命令结果 + for _, cmd := range cmds { + if cmd.Err() != nil { + logger.Errorf("Failed to get hash batch cmds err: %v", cmd.Err()) + continue + } + // 将结果转换为 *redis.StringStringMapCmd 类型 + rcmd, ok := cmd.(*redis.MapStringStringCmd) + if !ok { + logger.Errorf("Failed to get hash batch type err: %v", cmd.Err()) + continue + } + + key := "-" + args := rcmd.Args() + if len(args) > 0 { + key = fmt.Sprint(args[1]) + } + + result[key] = rcmd.Val() + } + }(i) + } + + wg.Wait() + return result, nil +} + +// GetHash 获得缓存数据 +func GetHash(source, key, field string) (string, error) { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return "", fmt.Errorf("redis not client") + } + + ctx := context.Background() + v, err := rdb.HGet(ctx, key, field).Result() + if errors.Is(err, redis.Nil) { + return "", fmt.Errorf("no key field") + } + if err != nil { + return "", err + } + return v, nil +} + +// SetHash 设置缓存数据 +func SetHash(source, key string, value map[string]any) error { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return fmt.Errorf("redis not client") + } + + ctx := context.Background() + err := rdb.HSet(ctx, key, value).Err() + if err != nil { + logger.Errorf("redis HSet err %v", err) + return err + } + return nil +} + +// IncrBy 累加统计数据 +func IncrBy(source, key, field string, value int64) error { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return fmt.Errorf("redis not client") + } + + // 使用HINCRBY命令进行累加统计 + ctx := context.Background() + err := rdb.HIncrBy(ctx, key, field, value).Err() + if err != nil { + logger.Errorf("redis HIncrBy err %v", err) + return err + } + return nil +} + +// Expire 过期时间设置 +func Expire(source, key string, expiration time.Duration) error { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return fmt.Errorf("redis not client") + } + // 过期时间设置 + ctx := context.Background() + err := rdb.Expire(ctx, key, expiration).Err() + if err != nil { + logger.Errorf("redis HIncrBy err %v", err) + return err + } + return nil +} diff --git a/src/framework/redis/redis.go b/src/framework/redis/redis.go index 88e5cdd0..2a9e25b3 100644 --- a/src/framework/redis/redis.go +++ b/src/framework/redis/redis.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "sync" "time" "be.ems/src/framework/config" @@ -31,15 +30,6 @@ if tonumber(current) == 1 then end return tonumber(current);`) -// 连接Redis实例 -func ConnectPush(source string, rdb *redis.Client) { - if rdb == nil { - delete(rdbMap, source) - return - } - rdbMap[source] = rdb -} - // 连接Redis实例 func Connect() { ctx := context.Background() @@ -237,105 +227,6 @@ func Get(source, key string) (string, error) { return value, nil } -// 获得缓存数据Hash -func GetHash(source, key string) (map[string]string, error) { - // 数据源 - rdb := DefaultRDB() - if source != "" { - rdb = RDB(source) - } - - ctx := context.Background() - value, err := rdb.HGetAll(ctx, key).Result() - if err == redis.Nil || err != nil { - return map[string]string{}, err - } - return value, nil -} - -// 批量获得缓存数据 [key]result -func GetHashBatch(source string, keys []string) (map[string]map[string]string, error) { - result := make(map[string]map[string]string, 0) - if len(keys) == 0 { - return result, fmt.Errorf("not keys") - } - - // 数据源 - rdb := DefaultRDB() - if source != "" { - rdb = RDB(source) - } - - // 创建一个有限的并发控制信号通道 - sem := make(chan struct{}, 10) - var wg sync.WaitGroup - var mt sync.Mutex - batchSize := 1000 - total := len(keys) - if total < batchSize { - batchSize = total - } - - for i := 0; i < total; i += batchSize { - wg.Add(1) - go func(start int) { - ctx := context.Background() - // 并发控制,限制同时执行的 Goroutine 数量 - sem <- struct{}{} - defer func() { - <-sem - ctx.Done() - wg.Done() - }() - - // 检查索引是否越界 - end := start + batchSize - if end > total { - end = total - } - pipe := rdb.Pipeline() - for _, key := range keys[start:end] { - pipe.HGetAll(ctx, key) - } - - cmds, err := pipe.Exec(ctx) - if err != nil { - logger.Errorf("Failed to get hash batch exec err: %v", err) - return - } - - // 将结果添加到 result map 并发访问 - mt.Lock() - defer mt.Unlock() - - // 处理命令结果 - for _, cmd := range cmds { - if cmd.Err() != nil { - logger.Errorf("Failed to get hash batch cmds err: %v", cmd.Err()) - continue - } - // 将结果转换为 *redis.StringStringMapCmd 类型 - rcmd, ok := cmd.(*redis.MapStringStringCmd) - if !ok { - logger.Errorf("Failed to get hash batch type err: %v", cmd.Err()) - continue - } - - key := "-" - args := rcmd.Args() - if len(args) > 0 { - key = fmt.Sprint(args[1]) - } - - result[key] = rcmd.Val() - } - }(i) - } - - wg.Wait() - return result, nil -} - // 判断是否存在 func Has(source string, keys ...string) (bool, error) { // 数据源 diff --git a/src/modules/monitor/controller/sys_cache.go b/src/modules/monitor/controller/sys_cache.go index 06c4c897..6743d0cb 100644 --- a/src/modules/monitor/controller/sys_cache.go +++ b/src/modules/monitor/controller/sys_cache.go @@ -159,7 +159,7 @@ func (s *SysCacheController) ClearCacheSafe(c *gin.Context) { model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_data"), cachekey.NE_DATA_KEY), } for _, v := range caches { - cacheKeys, err := redis.GetKeys("", v.CacheName+":*") + cacheKeys, err := redis.GetKeys("", v.CacheName+"*") if err != nil { continue } diff --git a/src/modules/network_data/controller/upf.go b/src/modules/network_data/controller/upf.go index fcd48f3b..c2c2ec1a 100644 --- a/src/modules/network_data/controller/upf.go +++ b/src/modules/network_data/controller/upf.go @@ -56,7 +56,6 @@ func (s *UPFController) TotalFlow(c *gin.Context) { return } - data := s.perfKPIService.SelectUPFTotalFlow(neInfo.NeType, neInfo.RmUID, querys.Day) - - c.JSON(200, result.OkData(data)) + up, down := s.perfKPIService.UPFTodayFlowFind(neInfo.RmUID, querys.Day) + c.JSON(200, result.OkData(map[string]int64{"up": up, "down": down})) } diff --git a/src/modules/network_data/network_data.go b/src/modules/network_data/network_data.go index 6f7cf295..018818d0 100644 --- a/src/modules/network_data/network_data.go +++ b/src/modules/network_data/network_data.go @@ -6,6 +6,7 @@ import ( "be.ems/src/framework/middleware/collectlogs" "be.ems/src/framework/middleware/repeat" "be.ems/src/modules/network_data/controller" + "be.ems/src/modules/network_data/service" "github.com/gin-gonic/gin" ) @@ -14,6 +15,9 @@ import ( func Setup(router *gin.Engine) { logger.Infof("开始加载 ====> network_data 模块路由") + // 启动时需要的初始参数 + InitLoad() + neDataGroup := router.Group("/neData") // 性能统计信息 @@ -318,3 +322,9 @@ func Setup(router *gin.Engine) { ) } } + +// InitLoad 初始参数 +func InitLoad() { + // 启动时,加载UPF上下行流量 + go service.NewPerfKPI.UPFTodayFlowLoad() +} diff --git a/src/modules/network_data/service/all_perf_kpi.go b/src/modules/network_data/service/all_perf_kpi.go index b4aef0c4..74b58b95 100644 --- a/src/modules/network_data/service/all_perf_kpi.go +++ b/src/modules/network_data/service/all_perf_kpi.go @@ -7,8 +7,10 @@ import ( "be.ems/src/framework/constants/cachekey" "be.ems/src/framework/redis" + "be.ems/src/framework/utils/parse" "be.ems/src/modules/network_data/model" "be.ems/src/modules/network_data/repository" + neModel "be.ems/src/modules/network_element/model" ) // 实例化数据层 PerfKPI 结构体 @@ -42,38 +44,100 @@ func (r *PerfKPI) SelectGoldKPITitle(neType string) []model.GoldKPITitle { return r.perfKPIRepository.SelectGoldKPITitle(neType) } -// SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行 -func (r *PerfKPI) SelectUPFTotalFlow(neType, rmUID string, day int) map[string]any { - now := time.Now() +// UPFTodayFlowFind 查询UPF总流量 N3上行 N6下行 +func (r PerfKPI) UPFTodayFlowFind(rmUID string, day int) (int64, int64) { // 获取当前日期 - endDate := fmt.Sprint(now.UnixMilli()) - // 将当前日期前几天数 - startDate := fmt.Sprint(now.AddDate(0, 0, -day).Truncate(24 * time.Hour).UnixMilli()) + now := time.Now() + var upTotal, downTotal int64 - var info map[string]any + // 查询最近7天的数据 + for i := 0; i <= day; i++ { + dateKey := now.AddDate(0, 0, -i).Format("2006-01-02") + key := fmt.Sprintf("%sUPF_FLOW:%s:%s", cachekey.NE_DATA_KEY, rmUID, dateKey) - // 读取缓存数据 小于2分钟重新缓存 - key := fmt.Sprintf("%sUPF:totalFlow:%s_%d", cachekey.NE_DATA_KEY, rmUID, day) - infoStr, _ := redis.Get("", key) - if infoStr != "" { - json.Unmarshal([]byte(infoStr), &info) - expireSecond, _ := redis.GetExpire("", key) - if expireSecond > 120 { - return info + // 读取缓存数据 + up, err := redis.GetHash("", key, "up") + if err != nil || up == "" { + up = "0" + } + down, err := redis.GetHash("", key, "down") + if err != nil || down == "" { + down = "0" + } + + upTotal += parse.Number(up) + downTotal += parse.Number(down) + } + + return upTotal, downTotal +} + +// UPFTodayFlow UPF流量今日统计 +func (r PerfKPI) UPFTodayFlowUpdate(rmUID string, upValue, downValue int64, rest bool) error { + // 按日期存储统计数据 + dateKey := time.Now().Format("2006-01-02") + key := fmt.Sprintf("%sUPF_FLOW:%s:%s", cachekey.NE_DATA_KEY, rmUID, dateKey) + + // 重置数据 + if rest { + err := redis.SetHash("", key, map[string]any{ + "up": upValue, + "down": downValue, + }) + if err != nil { + return err + } + // 设置key的过期时间为30天,自动清理旧数据 + err = redis.Expire("", key, 30*24*time.Hour) + if err != nil { + return err } } - // down * 8 / 1000 / 1000 单位M - info = r.perfKPIRepository.SelectUPFTotalFlow(neType, rmUID, startDate, endDate) - if v, ok := info["up"]; ok && v == nil { - info["up"] = 0 - } - if v, ok := info["down"]; ok && v == nil { - info["down"] = 0 - } - // 保存到缓存 - infoJSON, _ := json.Marshal(info) - redis.SetByExpire("", key, string(infoJSON), time.Duration(10)*time.Minute) - - return info + // 使用HIncrBy实时累加统计值 + if err := redis.IncrBy("", key, "up", upValue); err != nil { + return err + } + if err := redis.IncrBy("", key, "down", downValue); err != nil { + return err + } + return nil +} + +// UPFTodayFlowLoad UPF上下行数据到redis +func (r PerfKPI) UPFTodayFlowLoad() { + cacheKeys, _ := redis.GetKeys("", cachekey.NE_KEY+"UPF:*") + if len(cacheKeys) == 0 { + return + } + + // 今日流量 + now := time.Now() + beginTime := now.Truncate(24 * time.Hour).UnixMilli() + endTime := beginTime + 24*60*60*1000 - 1 + + for _, key := range cacheKeys { + var v neModel.NeInfo + jsonStr, _ := redis.Get("", key) + if len(jsonStr) > 7 { + json.Unmarshal([]byte(jsonStr), &v) + } + if v.NeType == "UPF" && v.RmUID != "" { + // 查询历史数据 + // down * 8 / 1000 / 1000 单位M + info := r.perfKPIRepository.SelectUPFTotalFlow("UPF", v.RmUID, fmt.Sprint(beginTime), fmt.Sprint(endTime)) + if v, ok := info["up"]; ok && v == nil { + info["up"] = 0 + } + if v, ok := info["down"]; ok && v == nil { + info["down"] = 0 + } + + upTotal := parse.Number(info["up"]) + downTotal := parse.Number(info["down"]) + + // 将历史数据添加到Redis + r.UPFTodayFlowUpdate(v.RmUID, upTotal, downTotal, true) + } + } } diff --git a/src/modules/ws/processor/upf_total_flow.go b/src/modules/ws/processor/upf_total_flow.go index 0186dcd3..7c219e3f 100644 --- a/src/modules/ws/processor/upf_total_flow.go +++ b/src/modules/ws/processor/upf_total_flow.go @@ -34,11 +34,11 @@ func GetUPFTotalFlow(requestID string, data any) ([]byte, error) { return nil, fmt.Errorf("no matching network element information found") } - dataMap := neDataService.NewPerfKPI.SelectUPFTotalFlow(neInfo.NeType, neInfo.RmUID, querys.Day) + up, down := neDataService.NewPerfKPI.UPFTodayFlowFind(neInfo.RmUID, querys.Day) resultByte, err := json.Marshal(result.Ok(map[string]any{ "requestId": requestID, - "data": dataMap, + "data": map[string]int64{"up": up, "down": down}, })) return resultByte, err }