352 lines
9.9 KiB
Go
352 lines
9.9 KiB
Go
package service
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"sort"
|
||
"time"
|
||
|
||
"be.ems/src/framework/constants"
|
||
"be.ems/src/framework/database/redis"
|
||
"be.ems/src/framework/utils/parse"
|
||
"be.ems/src/modules/network_data/model"
|
||
"be.ems/src/modules/network_data/repository"
|
||
neModel "be.ems/src/modules/network_element/model"
|
||
)
|
||
|
||
// 实例化数据层 KpiReport 结构体
|
||
var NewKpiReport = &KpiReport{
|
||
kpiReportRepository: repository.NewKpiReport,
|
||
}
|
||
|
||
// KpiReport 指标统计 服务层处理
|
||
type KpiReport struct {
|
||
kpiReportRepository *repository.KpiReport // 指标数据信息
|
||
}
|
||
|
||
// FindData 通过网元指标数据信息
|
||
func (s KpiReport) FindData(query model.KPIQuery) []map[string]any {
|
||
// 原始数据
|
||
rows := s.kpiReportRepository.SelectKPI(query)
|
||
if len(rows) <= 0 {
|
||
return []map[string]any{}
|
||
}
|
||
|
||
kpiIdsHas := false
|
||
kpiIds := []string{}
|
||
// 处理数据
|
||
arr := []map[string]any{}
|
||
for _, row := range rows {
|
||
// 解析 JSON 字符串为 map
|
||
var kpiValues []map[string]any
|
||
err := json.Unmarshal([]byte(row.KpiValues), &kpiValues)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
item := map[string]any{
|
||
"neType": row.NeType,
|
||
"neName": row.NeName,
|
||
"rmUID": row.RmUid,
|
||
"startIndex": row.Index,
|
||
"timeGroup": row.CreatedAt,
|
||
}
|
||
|
||
// 遍历 kpiValues 数组
|
||
for _, v := range kpiValues {
|
||
kpiId := "-"
|
||
if k, ok := v["kpiId"]; ok {
|
||
kpiId = fmt.Sprint(k)
|
||
}
|
||
item[kpiId] = v["value"]
|
||
}
|
||
|
||
arr = append(arr, item)
|
||
|
||
// 添加指标ID
|
||
if !kpiIdsHas {
|
||
for _, v := range kpiValues {
|
||
kpiId := "-"
|
||
if k, ok := v["kpiId"]; ok {
|
||
kpiId = fmt.Sprint(k)
|
||
}
|
||
kpiIds = append(kpiIds, kpiId)
|
||
}
|
||
kpiIdsHas = true
|
||
}
|
||
}
|
||
|
||
// 时间密度分钟 数值单位秒 5分钟的传入300秒
|
||
timeInterval := query.Interval
|
||
// 创建一个map来存储按时间段合并后的数据
|
||
timeGroup := make(map[int64][]map[string]any)
|
||
// 遍历每个数据项
|
||
for _, v := range arr {
|
||
itemTime := parse.Number(v["timeGroup"])
|
||
// 计算时间戳的x分钟时间段(使用秒并除以x分钟)
|
||
timeMinute := itemTime / 1000 / timeInterval * timeInterval
|
||
// 合并到对应的时间段
|
||
timeGroup[timeMinute] = append(timeGroup[timeMinute], v)
|
||
}
|
||
// 时间组合输出
|
||
data := []map[string]any{}
|
||
for _, records := range timeGroup {
|
||
if len(records) <= 0 {
|
||
continue
|
||
}
|
||
// 转换为具体时间显示(根据需要可以格式化显示)
|
||
// timeStr := time.Unix(k, 0).Format("2006-01-02 15:04:05")
|
||
// fmt.Printf("Time Group: %s records: %d\n", timeStr, len(records))
|
||
startItem := records[len(records)-1] // 取最后一条数据也是最开始startIndex
|
||
if len(records) >= 2 { // 最后一条数据不参与计算
|
||
for _, record := range records[:len(records)-1] {
|
||
// fmt.Printf(" - startIndex: %v, Value: %v\n", record["startIndex"], record["timeGroup"])
|
||
// 遍历kpiIds数组对lastRecord赋值
|
||
for _, kpiId := range kpiIds {
|
||
if v, ok := record[kpiId]; ok {
|
||
// 特殊字段,只取一次收到的非0值
|
||
if kpiId == "AMF.01" || kpiId == "UDM.01" || kpiId == "UDM.02" || kpiId == "UDM.03" || kpiId == "SMF.01" {
|
||
// startItem[kpiId] = parse.Number(v)
|
||
continue // startIndex的值不累加不取最后
|
||
} else {
|
||
value := parse.Number(startItem[kpiId])
|
||
startItem[kpiId] = value + parse.Number(v)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
data = append(data, startItem)
|
||
}
|
||
|
||
// 按时间排序
|
||
sort.SliceStable(data, func(i, j int) bool {
|
||
vi := parse.Number(data[i]["timeGroup"])
|
||
vj := parse.Number(data[j]["timeGroup"])
|
||
if query.SortOrder == "asc" {
|
||
return vi < vj // asc
|
||
}
|
||
return vi > vj // desc
|
||
})
|
||
return data
|
||
}
|
||
|
||
// Insert 新增信息
|
||
func (s KpiReport) Insert(param model.KpiReport) int64 {
|
||
return s.kpiReportRepository.Insert(param)
|
||
}
|
||
|
||
// FindTitle 网元对应的指标名称
|
||
func (r KpiReport) FindTitle(neType string) []model.KpiTitle {
|
||
return r.kpiReportRepository.SelectKPITitle(neType)
|
||
}
|
||
|
||
// FindByPage 根据条件分页查询
|
||
func (r KpiReport) TitleFindByPage(query map[string]string) ([]model.KpiTitle, int64) {
|
||
return r.kpiReportRepository.TitleSelectByPage(query)
|
||
}
|
||
|
||
// TitleFind 查询信息
|
||
func (r KpiReport) TitleFind(param model.KpiTitle) []model.KpiTitle {
|
||
return r.kpiReportRepository.TitleSelect(param)
|
||
}
|
||
|
||
// TitleUpdate 更新信息
|
||
func (r KpiReport) TitleUpdate(param model.KpiTitle) int64 {
|
||
return r.kpiReportRepository.TitleUpdate(param)
|
||
}
|
||
|
||
// TitleDeleteByIds 批量删除信息
|
||
func (r KpiReport) TitleDeleteByIds(ids []int64) (int64, error) {
|
||
rows := r.kpiReportRepository.TitleDeleteByIds(ids)
|
||
if rows > 0 {
|
||
return rows, nil
|
||
}
|
||
// 删除信息失败!
|
||
return 0, fmt.Errorf("delete fail")
|
||
}
|
||
|
||
// TitleInsert 新增信息
|
||
func (r KpiReport) TitleInsert(param model.KpiTitle) int64 {
|
||
return r.kpiReportRepository.TitleInsert(param)
|
||
}
|
||
|
||
// UPFTodayFlowFind 查询UPF总流量 N3上行 N6下行
|
||
// day 统计天数
|
||
// down * 8 / 1000 / 1000 单位M
|
||
func (r KpiReport) UPFTodayFlowFind(rmUID string, day int) (int64, int64) {
|
||
// 获取当前日期
|
||
now := time.Now()
|
||
var upTotal, downTotal int64
|
||
|
||
// 查询最近day天的数据
|
||
for i := 0; i <= day; i++ {
|
||
dateKey := now.AddDate(0, 0, -i).Format("2006-01-02")
|
||
key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey)
|
||
|
||
// 读取缓存数据
|
||
up, err := redis.GetHash("", key, "up")
|
||
if err != nil || up == "" {
|
||
up = "0"
|
||
}
|
||
down, err := redis.GetHash("", key, "down")
|
||
if err != nil || down == "" {
|
||
down = "0"
|
||
}
|
||
|
||
upTotal += parse.Number(up)
|
||
downTotal += parse.Number(down)
|
||
}
|
||
|
||
return upTotal, downTotal
|
||
}
|
||
|
||
// UPFTodayFlow UPF流量今日统计
|
||
func (r KpiReport) UPFTodayFlowUpdate(rmUID string, upValue, downValue int64) error {
|
||
// 按日期存储统计数据
|
||
dateKey := time.Now().Format("2006-01-02")
|
||
key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, rmUID, dateKey)
|
||
|
||
// 使用HIncrBy实时累加统计值
|
||
if err := redis.IncrBy("", key, "up", upValue); err != nil {
|
||
return err
|
||
}
|
||
if err := redis.IncrBy("", key, "down", downValue); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// UPFTodayFlowLoad UPF上下行数据到redis
|
||
// day 统计天数
|
||
func (r KpiReport) UPFTodayFlowLoad(day int) {
|
||
cacheKeys, _ := redis.GetKeys("", constants.CACHE_NE_INFO+":UPF:*")
|
||
if len(cacheKeys) == 0 {
|
||
return
|
||
}
|
||
|
||
now := time.Now()
|
||
for _, key := range cacheKeys {
|
||
var v neModel.NeInfo
|
||
jsonStr, _ := redis.Get("", key)
|
||
if len(jsonStr) > 7 {
|
||
json.Unmarshal([]byte(jsonStr), &v)
|
||
}
|
||
if v.NeType == "UPF" && v.RmUID != "" {
|
||
// 查询最近day天的数据
|
||
for i := 0; i <= day; i++ {
|
||
dateKey := now.AddDate(0, 0, -i).Format("2006-01-02")
|
||
key := fmt.Sprintf("%s:UPF_FLOW:%s:%s", constants.CACHE_NE_DATA, v.RmUID, dateKey)
|
||
// 根据传入天数计算时间范围
|
||
beginTime := now.AddDate(0, 0, -i).Truncate(24 * time.Hour).UnixMilli()
|
||
endTime := beginTime + 24*60*60*1000 - 1
|
||
// 查询历史数据
|
||
rows := r.kpiReportRepository.SelectUPF(v.RmUID, beginTime, endTime)
|
||
var upTotal, downTotal int64
|
||
|
||
// 处理历史数据
|
||
for _, row := range rows {
|
||
var kpiValues []map[string]any
|
||
if err := json.Unmarshal([]byte(row.KpiValues), &kpiValues); err != nil {
|
||
continue
|
||
}
|
||
|
||
for _, v := range kpiValues {
|
||
if k, ok := v["kpiId"]; ok {
|
||
if k == "UPF.03" {
|
||
upTotal += parse.Number(v["value"])
|
||
}
|
||
if k == "UPF.06" {
|
||
downTotal += parse.Number(v["value"])
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
err := redis.SetHash("", key, map[string]any{
|
||
"up": upTotal,
|
||
"down": downTotal,
|
||
})
|
||
if err != nil {
|
||
continue
|
||
}
|
||
// 设置key的过期时间为30天,自动清理旧数据
|
||
daySub := (30 - i) * 24
|
||
err = redis.Expire("", key, time.Duration(daySub)*time.Hour)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// IMSBusyHour IMS忙时流量统计
|
||
// SCSCF.06呼叫尝试次数 SCSCF.09呼叫成功次数
|
||
func (r KpiReport) IMSBusyHour(rmUID string, timestamp int64) []map[string]any {
|
||
t := time.UnixMilli(timestamp)
|
||
beginTime := t
|
||
endTime := t
|
||
// 检查时分秒是否都为零
|
||
if t.Hour() == 0 && t.Minute() == 0 && t.Second() == 0 {
|
||
// 获取当天起始时间(00:00:00)
|
||
beginTime = t.Truncate(time.Hour)
|
||
// 计算当天结束时间(23:59:59)
|
||
endTime = beginTime.Add(23*time.Hour + 59*time.Minute + 59*time.Second)
|
||
} else {
|
||
// 起始时间:当前小时的 00 分 00 秒
|
||
beginTime = t.Truncate(time.Hour)
|
||
// 结束时间:当前小时的 59 分 59 秒 999 毫秒
|
||
endTime = beginTime.Add(time.Hour - time.Millisecond)
|
||
}
|
||
// 转换为毫秒级时间戳
|
||
rows := r.kpiReportRepository.SelectIMS(rmUID, beginTime.UnixMilli(), endTime.UnixMilli())
|
||
|
||
// 创建一个map来存储按时间段合并后的数据
|
||
timeGroup := make(map[int64]map[string]int64)
|
||
// 遍历每个数据项
|
||
for _, row := range rows {
|
||
// 将毫秒时间戳转换为小时级时间戳(保留到小时的起始毫秒)
|
||
timeHour := row.CreatedAt / 3600000 * 3600000 // 1小时 = 3600000毫秒
|
||
|
||
// 解析 JSON 字符串为 map
|
||
var kpiValues []map[string]any
|
||
err := json.Unmarshal([]byte(row.KpiValues), &kpiValues)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
var callAttempts, callCompletions int64
|
||
for _, v := range kpiValues {
|
||
if k, ok := v["kpiId"]; ok {
|
||
if k == "SCSCF.06" {
|
||
callAttempts = parse.Number(v["value"])
|
||
}
|
||
if k == "SCSCF.09" {
|
||
callCompletions = parse.Number(v["value"])
|
||
}
|
||
}
|
||
}
|
||
// 合并到对应的小时段
|
||
if _, exists := timeGroup[timeHour]; !exists {
|
||
timeGroup[timeHour] = map[string]int64{
|
||
"callAttempts": 0,
|
||
"callCompletions": 0,
|
||
}
|
||
}
|
||
timeGroup[timeHour]["callAttempts"] += callAttempts
|
||
timeGroup[timeHour]["callCompletions"] += callCompletions
|
||
}
|
||
|
||
// 时间组合输出
|
||
data := make([]map[string]any, 0, len(timeGroup))
|
||
for hour, sums := range timeGroup {
|
||
data = append(data, map[string]any{
|
||
"timeGroup": fmt.Sprintf("%d", hour),
|
||
"callAttempts": sums["callAttempts"],
|
||
"callCompletions": sums["callCompletions"],
|
||
})
|
||
}
|
||
return data
|
||
}
|