feat: 优化UPF流量统计逻辑

This commit is contained in:
TsMask
2025-03-03 18:06:14 +08:00
parent 93e6fdb2b8
commit c286e68845
8 changed files with 157 additions and 56 deletions

View File

@@ -103,7 +103,7 @@ func (k *KpiCReport) GetReport2FE(c *gin.Context) {
dbg := db.DB("").Model(&KpiCReport{}).Table(tableName)
if querys.NeID != "" {
conditions = append(conditions, "rm_uid = (select n.rm_uid from ne_info n where n.ne_type=? and n.ne_id=? and n.status=1)")
conditions = append(conditions, "rm_uid = (select n.rm_uid from ne_info n where n.ne_type=? and n.ne_id=?)")
params = append(params, querys.NeType, querys.NeID)
} else {
c.JSON(http.StatusBadRequest, services.ErrResp("Not found required parameter NE ID"))

View File

@@ -199,7 +199,10 @@ func saveKPIData(kpiReport KpiReport, index int64) int64 {
// 推送到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent)
if neInfo.NeType == "UPF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent)
// 更新UPF总流量
upValue := parse.Number(kpiEvent["UPF.03"])
downValue := parse.Number(kpiEvent["UPF.06"])
neDataService.NewKpiReport.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue, false)
}
}
}
@@ -274,7 +277,7 @@ func saveKPIDataC(kpiReport KpiReport, index int64) int64 {
neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid)
if neInfo.RmUID == kpiData.RmUid {
// 推送自定义KPI到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiEvent)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiEvent)
}
}
return insertId
@@ -412,12 +415,9 @@ func PostKPIReportFromNFOld(w http.ResponseWriter, r *http.Request) {
if neInfo.RmUID == kpiData.RmUid {
// 推送到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent)
// 推送自定义KPI到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent)
if neInfo.NeType == "UPF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent)
}
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent)
}
services.ResponseStatusOK204NoContent(w)
@@ -465,7 +465,7 @@ type Task struct {
}
type KpiSetJ struct {
Code string `json:"Code"` // 统计编码 如SMFHA01
KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"]
KPIs []string `json:"KPIs"` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"]
}
type MeasureTask struct {
@@ -475,7 +475,7 @@ type MeasureTask struct {
KpiSet []KpiSetJ `json:"KPISet" xorm:"kpi_set"`
StartTime string `json:"startTime" xorm:"start_time"`
EndTime string `json:"endTime" xorm:"end_time"`
Periods []Period `json:"Periods" xorm:"periods`
Periods []Period `json:"Periods" xorm:"periods"`
Schedule []ScheduleJ `json:"Schedule" xorm:"schedule"`
GranulOption string `json:"granulOption" xorm:"granul_option"`
Status string `json:"status" xorm:"status"`
@@ -483,7 +483,7 @@ type MeasureTask struct {
Comment string `json:"comment" xorm:"comment"`
CreateTime string `json:"createTime" xorm:"create_time"`
UpdateTime string `json:"updateTime" xorm:"update_time"`
DeleteTime string `json:"deleteTime xorm:"delete_time"`
DeleteTime string `json:"deleteTime" xorm:"delete_time"`
Tasks []Task `json:"Tasks"`
NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"be.ems/src/framework/logger"
"github.com/redis/go-redis/v9"
@@ -137,3 +138,38 @@ func SetHash(source, key string, value map[string]any) error {
}
return nil
}
// IncrBy 累加统计数据
func IncrBy(source, key, field string, value int64) error {
// 数据源
rdb := RDB(source)
if rdb == nil {
return fmt.Errorf("redis not client")
}
// 使用HINCRBY命令进行累加统计
ctx := context.Background()
err := rdb.HIncrBy(ctx, key, field, value).Err()
if err != nil {
logger.Errorf("redis HIncrBy err %v", err)
return err
}
return nil
}
// Expire 过期时间设置
func Expire(source, key string, expiration time.Duration) error {
// 数据源
rdb := RDB(source)
if rdb == nil {
return fmt.Errorf("redis not client")
}
// 过期时间设置
ctx := context.Background()
err := rdb.Expire(ctx, key, expiration).Err()
if err != nil {
logger.Errorf("redis HIncrBy err %v", err)
return err
}
return nil
}

View File

@@ -57,6 +57,6 @@ func (s UPFController) TotalFlow(c *gin.Context) {
return
}
data := s.kpiReportService.FindUPFTotalFlow(neInfo.RmUID, querys.Day)
c.JSON(200, resp.OkData(data))
up, down := s.kpiReportService.UPFTodayFlowFind(neInfo.RmUID, querys.Day)
c.JSON(200, resp.OkData(map[string]int64{"up": up, "down": down}))
}

View File

@@ -6,6 +6,7 @@ import (
"be.ems/src/framework/middleware/collectlogs"
"be.ems/src/framework/middleware/repeat"
"be.ems/src/modules/network_data/controller"
"be.ems/src/modules/network_data/service"
"github.com/gin-gonic/gin"
)
@@ -14,6 +15,9 @@ import (
func Setup(router *gin.Engine) {
logger.Infof("开始加载 ====> network_data 模块路由")
// 启动时需要的初始参数
InitLoad()
neDataGroup := router.Group("/neData")
// 性能统计信息
@@ -349,3 +353,9 @@ func Setup(router *gin.Engine) {
)
}
}
// InitLoad 初始参数
func InitLoad() {
// 启动时加载UPF上下行流量
go service.NewKpiReport.UPFTodayFlowLoad()
}

View File

@@ -11,6 +11,7 @@ import (
"be.ems/src/framework/utils/parse"
"be.ems/src/modules/network_data/model"
"be.ems/src/modules/network_data/repository"
neModel "be.ems/src/modules/network_element/model"
)
// 实例化数据层 KpiReport 结构体
@@ -140,55 +141,111 @@ func (r KpiReport) FindTitle(neType string) []model.KpiTitle {
return r.kpiReportRepository.SelectKPITitle(neType)
}
// FindUPFTotalFlow 查询UPF总流量 N3上行 N6下行
func (r KpiReport) FindUPFTotalFlow(rmUID string, day int) map[string]int64 {
now := time.Now()
// 获取当前日期
endTime := now.UnixMilli()
// 将当前日期前几天数
beginTime := now.AddDate(0, 0, -day).Truncate(24 * time.Hour).UnixMilli()
data := map[string]int64{
"up": 0,
"down": 0,
}
// 读取缓存数据 小于2分钟重新缓存
key := fmt.Sprintf("%s:UPF_FLOW:%s_%d", constants.CACHE_NE_DATA, rmUID, day)
if infoStr, err := redis.Get("", key); err == nil && infoStr != "" {
json.Unmarshal([]byte(infoStr), &data)
if expireSecond, _ := redis.GetExpire("", key); expireSecond > 120 {
return data
}
}
// UPFTodayFlowFind 查询UPF总流量 N3上行 N6下行
// down * 8 / 1000 / 1000 单位M
rows := r.kpiReportRepository.SelectUPF(rmUID, beginTime, endTime)
for _, row := range rows {
// 解析 JSON 字符串为 map
var kpiValues []map[string]any
err := json.Unmarshal([]byte(row.KpiValues), &kpiValues)
func (r KpiReport) UPFTodayFlowFind(rmUID string, day int) (int64, int64) {
// 获取当前日期
now := time.Now()
var upTotal, downTotal int64
// 查询最近7天的数据
for i := 0; i <= day; i++ {
dateKey := now.AddDate(0, 0, -i).Format("2006-01-02")
key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey)
// 读取缓存数据
up, err := redis.GetHash("", key, "up")
if err != nil || up == "" {
up = "0"
}
down, err := redis.GetHash("", key, "down")
if err != nil || down == "" {
down = "0"
}
upTotal += parse.Number(up)
downTotal += parse.Number(down)
}
return upTotal, downTotal
}
// UPFTodayFlow UPF流量今日统计
func (r KpiReport) UPFTodayFlowUpdate(rmUID string, upValue, downValue int64, rest bool) error {
// 按日期存储统计数据
dateKey := time.Now().Format("2006-01-02")
key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey)
// 重置数据
if rest {
err := redis.SetHash("", key, map[string]any{
"up": upValue,
"down": downValue,
})
if err != nil {
return err
}
// 设置key的过期时间为30天自动清理旧数据
err = redis.Expire("", key, 30*24*time.Hour)
if err != nil {
return err
}
}
// 使用HIncrBy实时累加统计值
if err := redis.IncrBy("", key, "up", upValue); err != nil {
return err
}
if err := redis.IncrBy("", key, "down", downValue); err != nil {
return err
}
return nil
}
// UPFTodayFlowLoad UPF上下行数据到redis
func (r KpiReport) UPFTodayFlowLoad() {
cacheKeys, _ := redis.GetKeys("", constants.CACHE_NE_INFO+":UPF:*")
if len(cacheKeys) == 0 {
return
}
// 今日流量
now := time.Now()
beginTime := now.Truncate(24 * time.Hour).UnixMilli()
endTime := beginTime + 24*60*60*1000 - 1
for _, key := range cacheKeys {
var v neModel.NeInfo
jsonStr, _ := redis.Get("", key)
if len(jsonStr) > 7 {
json.Unmarshal([]byte(jsonStr), &v)
}
if v.NeType == "UPF" && v.RmUID != "" {
// 查询历史数据
rows := r.kpiReportRepository.SelectUPF(v.RmUID, beginTime, endTime)
var upTotal, downTotal int64
// 处理历史数据
for _, row := range rows {
var kpiValues []map[string]any
if err := json.Unmarshal([]byte(row.KpiValues), &kpiValues); err != nil {
continue
}
// 遍历 kpiValues 数组
for _, v := range kpiValues {
if k, ok := v["kpiId"]; ok {
if k == "UPF.03" {
data["up"] = data["up"] + parse.Number(v["value"])
upTotal += parse.Number(v["value"])
}
if k == "UPF.06" {
data["down"] = data["down"] + parse.Number(v["value"])
downTotal += parse.Number(v["value"])
}
}
}
}
// 保存到缓存
if infoJSON, err := json.Marshal(data); err == nil {
redis.SetByExpire("", key, string(infoJSON), time.Duration(10)*time.Minute)
// 将历史数据添加到Redis
r.UPFTodayFlowUpdate(v.RmUID, upTotal, downTotal, true)
}
}
return data
}

View File

@@ -34,11 +34,11 @@ func GetUPFTotalFlow(requestID string, data any) ([]byte, error) {
return nil, fmt.Errorf("no matching network element information found")
}
dataMap := neDataService.NewKpiReport.FindUPFTotalFlow(neInfo.RmUID, querys.Day)
up, down := neDataService.NewKpiReport.UPFTodayFlowFind(neInfo.RmUID, querys.Day)
resultByte, err := json.Marshal(resp.Ok(map[string]any{
"requestId": requestID,
"data": dataMap,
"data": map[string]int64{"up": up, "down": down},
}))
return resultByte, err
}

View File

@@ -17,11 +17,9 @@ const (
// 组号-信令跟踪Packet 4_taskNo
GROUP_TRACE_PACKET = "4_"
// 组号-指标通用 10_neType_neId
GROUP_KPI = "10_"
// 组号-指标UPF 12_neId
GROUP_KPI_UPF = "12_"
GROUP_KPI = "10"
// 组号-自定义KPI指标 20_neType_neId
GROUP_KPI_C = "20_"
GROUP_KPI_C = "20"
// 组号-IMS_CDR会话事件 1005_neId
GROUP_IMS_CDR = "1005_"
// 组号-SMF_CDR会话事件 1006_neId