package service import ( "encoding/json" "fmt" "sort" "time" "be.ems/src/framework/constants" "be.ems/src/framework/database/redis" "be.ems/src/framework/utils/parse" neModel "be.ems/src/modules/ne/model" "be.ems/src/modules/ne_data/model" "be.ems/src/modules/ne_data/repository" ) // 实例化数据层 KpiReport 结构体 var NewKpiReport = &KpiReport{ kpiReportRepository: repository.NewKpiReport, } // KpiReport 指标统计 服务层处理 type KpiReport struct { kpiReportRepository *repository.KpiReport // 指标数据信息 } // FindData 通过网元指标数据信息 func (s KpiReport) FindData(query model.KPIQuery) []map[string]any { // 原始数据 rows := s.kpiReportRepository.SelectKPI(query) if len(rows) <= 0 { return []map[string]any{} } kpiIdsHas := false kpiIds := []string{} // 处理数据 arr := []map[string]any{} for _, row := range rows { // 解析 JSON 字符串为 map var kpiValues []map[string]any err := json.Unmarshal([]byte(row.KpiValues), &kpiValues) if err != nil { continue } item := map[string]any{ "neType": row.NeType, "neName": row.NeName, "rmUID": row.RmUid, "startIndex": row.Index, "timeGroup": row.CreatedAt, } // 遍历 kpiValues 数组 for _, v := range kpiValues { kpiId := "-" if k, ok := v["kpiId"]; ok { kpiId = fmt.Sprint(k) } item[kpiId] = v["value"] } arr = append(arr, item) // 添加指标ID if !kpiIdsHas { for _, v := range kpiValues { kpiId := "-" if k, ok := v["kpiId"]; ok { kpiId = fmt.Sprint(k) } kpiIds = append(kpiIds, kpiId) } kpiIdsHas = true } } // 时间密度分钟 数值单位秒 5分钟的传入300秒 timeInterval := query.Interval // 创建一个map来存储按时间段合并后的数据 timeGroup := make(map[int64][]map[string]any) // 遍历每个数据项 for _, v := range arr { itemTime := parse.Number(v["timeGroup"]) // 计算时间戳的x分钟时间段(使用秒并除以x分钟) timeMinute := itemTime / 1000 / timeInterval * timeInterval // 合并到对应的时间段 timeGroup[timeMinute] = append(timeGroup[timeMinute], v) } // 时间组合输出 data := []map[string]any{} for _, records := range timeGroup { if len(records) <= 0 { continue } // 转换为具体时间显示(根据需要可以格式化显示) // timeStr := time.Unix(k, 0).Format("2006-01-02 15:04:05") // fmt.Printf("Time Group: %s records: %d\n", timeStr, len(records)) startItem := records[len(records)-1] // 取最后一条数据也是最开始startIndex if len(records) >= 2 { // 最后一条数据不参与计算 for _, record := range records[:len(records)-1] { // fmt.Printf(" - startIndex: %v, Value: %v\n", record["startIndex"], record["timeGroup"]) // 遍历kpiIds数组对lastRecord赋值 for _, kpiId := range kpiIds { if v, ok := record[kpiId]; ok { // 特殊字段,只取一次收到的非0值 if kpiId == "AMF.01" || kpiId == "UDM.01" || kpiId == "UDM.02" || kpiId == "UDM.03" || kpiId == "SMF.01" { // startItem[kpiId] = parse.Number(v) continue // startIndex的值不累加不取最后 } else { value := parse.Number(startItem[kpiId]) startItem[kpiId] = value + parse.Number(v) } } } } } data = append(data, startItem) } // 按时间排序 sort.SliceStable(data, func(i, j int) bool { vi := parse.Number(data[i]["timeGroup"]) vj := parse.Number(data[j]["timeGroup"]) if query.SortOrder == "asc" { return vi < vj // asc } return vi > vj // desc }) return data } // Insert 新增信息 func (s KpiReport) Insert(param model.KpiReport) int64 { return s.kpiReportRepository.Insert(param) } // FindTitle 网元对应的指标名称 func (r KpiReport) FindTitle(neType string) []model.KpiTitle { return r.kpiReportRepository.SelectKPITitle(neType) } // FindByPage 根据条件分页查询 func (r KpiReport) TitleFindByPage(query map[string]string) ([]model.KpiTitle, int64) { return r.kpiReportRepository.TitleSelectByPage(query) } // TitleFind 查询信息 func (r KpiReport) TitleFind(param model.KpiTitle) []model.KpiTitle { return r.kpiReportRepository.TitleSelect(param) } // TitleUpdate 更新信息 func (r KpiReport) TitleUpdate(param model.KpiTitle) int64 { return r.kpiReportRepository.TitleUpdate(param) } // TitleDeleteByIds 批量删除信息 func (r KpiReport) TitleDeleteByIds(ids []int64) (int64, error) { rows := r.kpiReportRepository.TitleDeleteByIds(ids) if rows > 0 { return rows, nil } // 删除信息失败! return 0, fmt.Errorf("delete fail") } // TitleInsert 新增信息 func (r KpiReport) TitleInsert(param model.KpiTitle) int64 { return r.kpiReportRepository.TitleInsert(param) } // UPFTodayFlowFind 查询UPF总流量 N3上行 N6下行 // day 统计天数 // down * 8 / 1000 / 1000 单位M func (r KpiReport) UPFTodayFlowFind(rmUID string, day int) (int64, int64) { // 获取当前日期 now := time.Now() var upTotal, downTotal int64 // 查询最近day天的数据 for i := 0; i <= day; i++ { dateKey := now.AddDate(0, 0, -i).Format("2006-01-02") key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey) // 读取缓存数据 up, err := redis.GetHash("", key, "up") if err != nil || up == "" { up = "0" } down, err := redis.GetHash("", key, "down") if err != nil || down == "" { down = "0" } upTotal += parse.Number(up) downTotal += parse.Number(down) } return upTotal, downTotal } // UPFTodayFlow UPF流量今日统计 func (r KpiReport) UPFTodayFlowUpdate(rmUID string, upValue, downValue int64) error { // 按日期存储统计数据 dateKey := time.Now().Format("2006-01-02") key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey) // 使用HIncrBy实时累加统计值 if err := redis.IncrBy("", key, "up", upValue); err != nil { return err } if err := redis.IncrBy("", key, "down", downValue); err != nil { return err } return nil } // UPFTodayFlowLoad UPF上下行数据到redis // day 统计天数 func (r KpiReport) UPFTodayFlowLoad(day int) { cacheKeys, _ := redis.GetKeys("", constants.CACHE_NE_INFO+":UPF:*") if len(cacheKeys) == 0 { return } now := time.Now() for _, key := range cacheKeys { var v neModel.NeInfo jsonStr, _ := redis.Get("", key) if len(jsonStr) > 7 { json.Unmarshal([]byte(jsonStr), &v) } if v.NeType == "UPF" && v.RmUID != "" { // 查询最近day天的数据 for i := 0; i <= day; i++ { dateKey := now.AddDate(0, 0, -i).Format("2006-01-02") key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, v.RmUID, dateKey) // 根据传入天数计算时间范围 beginTime := now.AddDate(0, 0, -i).Truncate(24 * time.Hour).UnixMilli() endTime := beginTime + 24*60*60*1000 - 1 // 查询历史数据 rows := r.kpiReportRepository.SelectUPF(v.RmUID, beginTime, endTime) var upTotal, downTotal int64 // 处理历史数据 for _, row := range rows { var kpiValues []map[string]any if err := json.Unmarshal([]byte(row.KpiValues), &kpiValues); err != nil { continue } for _, v := range kpiValues { if k, ok := v["kpiId"]; ok { if k == "UPF.03" { upTotal += parse.Number(v["value"]) } if k == "UPF.06" { downTotal += parse.Number(v["value"]) } } } } err := redis.SetHash("", key, map[string]any{ "up": upTotal, "down": downTotal, }) if err != nil { continue } // 设置key的过期时间为30天,自动清理旧数据 daySub := (30 - i) * 24 err = redis.Expire("", key, time.Duration(daySub)*time.Hour) if err != nil { continue } } } } } // IMSBusyHour IMS忙时流量统计 // SCSCF.06呼叫尝试次数 SCSCF.09呼叫成功次数 func (r KpiReport) IMSBusyHour(rmUID string, timestamp int64) []map[string]any { t := time.UnixMilli(timestamp) beginTime := t endTime := t // 检查时分秒是否都为零 if t.Hour() == 0 && t.Minute() == 0 && t.Second() == 0 { // 获取当天起始时间(00:00:00) beginTime = t.Truncate(time.Hour) // 计算当天结束时间(23:59:59) endTime = beginTime.Add(23*time.Hour + 59*time.Minute + 59*time.Second) } else { // 起始时间:当前小时的 00 分 00 秒 beginTime = t.Truncate(time.Hour) // 结束时间:当前小时的 59 分 59 秒 999 毫秒 endTime = beginTime.Add(time.Hour - time.Millisecond) } // 转换为毫秒级时间戳 rows := r.kpiReportRepository.SelectIMS(rmUID, beginTime.UnixMilli(), endTime.UnixMilli()) // 创建一个map来存储按时间段合并后的数据 timeGroup := make(map[int64]map[string]int64) // 遍历每个数据项 for _, row := range rows { // 将毫秒时间戳转换为小时级时间戳(保留到小时的起始毫秒) timeHour := row.CreatedAt / 3600000 * 3600000 // 1小时 = 3600000毫秒 // 解析 JSON 字符串为 map var kpiValues []map[string]any err := json.Unmarshal([]byte(row.KpiValues), &kpiValues) if err != nil { continue } var callAttempts, callCompletions int64 for _, v := range kpiValues { if k, ok := v["kpiId"]; ok { if k == "SCSCF.06" { callAttempts = parse.Number(v["value"]) } if k == "SCSCF.09" { callCompletions = parse.Number(v["value"]) } } } // 合并到对应的小时段 if _, exists := timeGroup[timeHour]; !exists { timeGroup[timeHour] = map[string]int64{ "callAttempts": 0, "callCompletions": 0, } } timeGroup[timeHour]["callAttempts"] += callAttempts timeGroup[timeHour]["callCompletions"] += callCompletions } // 时间组合输出 data := make([]map[string]any, 0, len(timeGroup)) for hour, sums := range timeGroup { data = append(data, map[string]any{ "timeGroup": fmt.Sprintf("%d", hour), "callAttempts": sums["callAttempts"], "callCompletions": sums["callCompletions"], }) } return data } // 定义结构体用于存储话务量值和对应的时间 type TrafficData struct { Time int64 `json:"time"` // 时间戳(毫秒) Value float64 `json:"value"` // 话务量值 } // IMSBusyWeek IMS忙时流量统计 周 func (r KpiReport) IMSBusyWeek(rmUID string, weekStart, weekEnd int64) map[string]any { weekStartTime := time.UnixMilli(weekStart) weekEndTime := time.UnixMilli(weekEnd) // 1. 获取一周内每小时的呼叫数据 // 转换为毫秒级时间戳 rows := r.kpiReportRepository.SelectIMS(rmUID, weekStartTime.UnixMilli(), weekEndTime.UnixMilli()) // 创建一个map来存储按时间段合并后的数据 timeGroup := make(map[int64]map[string]int64) // 遍历每个数据项 for _, row := range rows { // 将毫秒时间戳转换为小时级时间戳(保留到小时的起始毫秒) timeHour := row.CreatedAt / 3600000 * 3600000 // 1小时 = 3600000毫秒 // 解析 JSON 字符串为 map var kpiValues []map[string]any err := json.Unmarshal([]byte(row.KpiValues), &kpiValues) if err != nil { continue } var callAttempts, callCompletions int64 for _, v := range kpiValues { if k, ok := v["kpiId"]; ok { if k == "SCSCF.06" { callAttempts = parse.Number(v["value"]) } if k == "SCSCF.09" { callCompletions = parse.Number(v["value"]) } } } // 合并到对应的小时段 if _, exists := timeGroup[timeHour]; !exists { timeGroup[timeHour] = map[string]int64{ "callAttempts": 0, "callCompletions": 0, } } timeGroup[timeHour]["callAttempts"] += callAttempts timeGroup[timeHour]["callCompletions"] += callCompletions } // 时间组合输出 data := make([]map[string]any, 0, len(timeGroup)) for hour, sums := range timeGroup { data = append(data, map[string]any{ "timeGroup": fmt.Sprintf("%d", hour), "callAttempts": sums["callAttempts"], "callCompletions": sums["callCompletions"], }) } if len(data) == 0 { return map[string]any{ "busyHourAverageBHCA": 0, "busyHourAverageBHCC": 0, "topFourHoursBHCA": []float64{}, "topFourHoursBHCC": []float64{}, "totalHours": 0, } } // 2. 分离BHCA和BHCC数据,并按降序排序 var bhcaData []TrafficData var bhccData []TrafficData for _, row := range data { // 获取时间戳 timeValue := int64(0) if t, ok := row["timeGroup"]; ok { timeValue = parse.Number(t) } // 处理BHCA数据 if value, ok := row["callAttempts"]; ok { bhcaVal := parse.Number(value) bhcaData = append(bhcaData, TrafficData{ Time: timeValue, Value: float64(bhcaVal), }) } // 处理BHCC数据 if value, ok := row["callCompletions"]; ok { bhccVal := parse.Number(value) bhccData = append(bhccData, TrafficData{ Time: timeValue, Value: float64(bhccVal), }) } } // 按降序排序(值大的在前) sort.Slice(bhcaData, func(i, j int) bool { return bhcaData[i].Value > bhcaData[j].Value }) sort.Slice(bhccData, func(i, j int) bool { return bhccData[i].Value > bhccData[j].Value }) // 3. 取前四个最高值并计算平均值 topFourBHCA := getTopFourTrafficData(bhcaData) topFourBHCC := getTopFourTrafficData(bhccData) avgBHCA := calculateTrafficDataAverage(topFourBHCA) avgBHCC := calculateTrafficDataAverage(topFourBHCC) // 4. 返回结果 return map[string]any{ "busyHourAverageBHCA": avgBHCA, "busyHourAverageBHCC": avgBHCC, "topFourHoursBHCA": topFourBHCA, "topFourHoursBHCC": topFourBHCC, "totalHours": len(data), } } // 辅助函数:获取前四个最高值的TrafficData func getTopFourTrafficData(data []TrafficData) []TrafficData { if len(data) == 0 { return []TrafficData{} } // 最多取前四个值 maxCount := 4 if len(data) < maxCount { maxCount = len(data) } return data[:maxCount] } // 辅助函数:计算TrafficData的平均值 func calculateTrafficDataAverage(data []TrafficData) float64 { if len(data) == 0 { return 0 } var sum float64 = 0 for _, v := range data { sum += v.Value } return sum / float64(len(data)) }