feat: 更新多个模块以支持新的数据结构和日志格式

This commit is contained in:
TsMask
2025-02-20 10:08:27 +08:00
parent 045a2b6b01
commit f3c33b31ac
272 changed files with 13246 additions and 15885 deletions

View File

@@ -0,0 +1,194 @@
package service
import (
"encoding/json"
"fmt"
"sort"
"time"
"be.ems/src/framework/constants"
"be.ems/src/framework/database/redis"
"be.ems/src/framework/utils/parse"
"be.ems/src/modules/network_data/model"
"be.ems/src/modules/network_data/repository"
)
// 实例化数据层 KpiReport 结构体
var NewKpiReport = &KpiReport{
kpiReportRepository: repository.NewKpiReport,
}
// KpiReport 指标统计 服务层处理
type KpiReport struct {
kpiReportRepository *repository.KpiReport // 指标数据信息
}
// FindData 通过网元指标数据信息
func (s KpiReport) FindData(query model.KPIQuery) []map[string]any {
// 原始数据
rows := s.kpiReportRepository.SelectKPI(query)
if len(rows) <= 0 {
return []map[string]any{}
}
kpiIdsHas := false
kpiIds := []string{}
// 处理数据
arr := []map[string]any{}
for _, row := range rows {
// 解析 JSON 字符串为 map
var kpiValues []map[string]any
err := json.Unmarshal([]byte(row.KpiValues), &kpiValues)
if err != nil {
continue
}
item := map[string]any{
"neType": row.NeType,
"neName": row.NeName,
"rmUID": row.RmUid,
"startIndex": row.Index,
"timeGroup": row.CreatedAt,
}
// 遍历 kpiValues 数组
for _, v := range kpiValues {
kpiId := "-"
if k, ok := v["kpiId"]; ok {
kpiId = fmt.Sprint(k)
}
item[kpiId] = v["value"]
}
arr = append(arr, item)
// 添加指标ID
if !kpiIdsHas {
for _, v := range kpiValues {
kpiId := "-"
if k, ok := v["kpiId"]; ok {
kpiId = fmt.Sprint(k)
}
kpiIds = append(kpiIds, kpiId)
}
kpiIdsHas = true
}
}
// 时间密度分钟 数值单位秒 5分钟的传入300秒
timeInterval := query.Interval
// 创建一个map来存储按时间段合并后的数据
timeGroup := make(map[int64][]map[string]any)
// 遍历每个数据项
for _, v := range arr {
itemTime := parse.Number(v["timeGroup"])
// 计算时间戳的x分钟时间段使用秒并除以x分钟
timeMinute := itemTime / 1000 / timeInterval * timeInterval
// 合并到对应的时间段
timeGroup[timeMinute] = append(timeGroup[timeMinute], v)
}
// 时间组合输出
data := []map[string]any{}
for _, records := range timeGroup {
if len(records) <= 0 {
continue
}
// 转换为具体时间显示(根据需要可以格式化显示)
// timeStr := time.Unix(k, 0).Format("2006-01-02 15:04:05")
// fmt.Printf("Time Group: %s records: %d\n", timeStr, len(records))
startItem := records[len(records)-1] // 取最后一条数据也是最开始startIndex
if len(records) >= 2 { // 最后一条数据不参与计算
for _, record := range records[:len(records)-1] {
// fmt.Printf(" - startIndex: %v, Value: %v\n", record["startIndex"], record["timeGroup"])
// 遍历kpiIds数组对lastRecord赋值
for _, kpiId := range kpiIds {
if v, ok := record[kpiId]; ok {
// 特殊字段只取一次收到的非0值
if kpiId == "AMF.01" || kpiId == "UDM.01" || kpiId == "UDM.02" || kpiId == "UDM.03" || kpiId == "SMF.01" {
// startItem[kpiId] = parse.Number(v)
continue // startIndex的值不累加不取最后
} else {
value := parse.Number(startItem[kpiId])
startItem[kpiId] = value + parse.Number(v)
}
}
}
}
}
data = append(data, startItem)
}
// 按时间排序
sort.SliceStable(data, func(i, j int) bool {
vi := parse.Number(data[i]["timeGroup"])
vj := parse.Number(data[j]["timeGroup"])
if query.SortOrder == "asc" {
return vi < vj // asc
}
return vi > vj // desc
})
return data
}
// Insert 新增信息
func (s KpiReport) Insert(param model.KpiReport) int64 {
return s.kpiReportRepository.Insert(param)
}
// FindTitle 网元对应的指标名称
func (r KpiReport) FindTitle(neType string) []model.KpiTitle {
return r.kpiReportRepository.SelectKPITitle(neType)
}
// FindUPFTotalFlow 查询UPF总流量 N3上行 N6下行
func (r KpiReport) FindUPFTotalFlow(rmUID string, day int) map[string]int64 {
now := time.Now()
// 获取当前日期
endTime := now.UnixMilli()
// 将当前日期前几天数
beginTime := now.AddDate(0, 0, -day).Truncate(24 * time.Hour).UnixMilli()
data := map[string]int64{
"up": 0,
"down": 0,
}
// 读取缓存数据 小于2分钟重新缓存
key := fmt.Sprintf("%s:UPF_FLOW:%s_%d", constants.CACHE_NE_DATA, rmUID, day)
if infoStr, err := redis.Get("", key); err == nil && infoStr != "" {
json.Unmarshal([]byte(infoStr), &data)
if expireSecond, _ := redis.GetExpire("", key); expireSecond > 120 {
return data
}
}
// down * 8 / 1000 / 1000 单位M
rows := r.kpiReportRepository.SelectUPF(rmUID, beginTime, endTime)
for _, row := range rows {
// 解析 JSON 字符串为 map
var kpiValues []map[string]any
err := json.Unmarshal([]byte(row.KpiValues), &kpiValues)
if err != nil {
continue
}
// 遍历 kpiValues 数组
for _, v := range kpiValues {
if k, ok := v["kpiId"]; ok {
if k == "UPF.03" {
data["up"] = data["up"] + parse.Number(v["value"])
}
if k == "UPF.06" {
data["down"] = data["down"] + parse.Number(v["value"])
}
}
}
}
// 保存到缓存
if infoJSON, err := json.Marshal(data); err == nil {
redis.SetByExpire("", key, string(infoJSON), time.Duration(10)*time.Minute)
}
return data
}