feat: support add redis queue with alarm/nbi_pm/nbi_kpi, support generate all nbi pm data

This commit is contained in:
simon
2025-08-11 17:10:13 +08:00
parent d4923d008c
commit 46ccc0ab83
26 changed files with 14059 additions and 41 deletions

View File

@@ -5,7 +5,10 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@@ -340,9 +343,9 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
}
// 推送到redis队列
err = redisqueue.AddNbiPMQueue(kpiData.NEType, strconv.Itoa(kpiData.ID))
err = redisqueue.AddNbiKpiQueue(kpiData.NEType, strconv.Itoa(kpiData.ID))
if err != nil {
log.Warn("Failed to AddNbiPMQueue:", err)
log.Warn("Failed to AddNbiKpiQueue:", err)
}
services.ResponseStatusOK204NoContent(w)
@@ -933,37 +936,449 @@ func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
}
type Measurement struct {
Id int `json:"-" xorm:"pk 'id' autoincr"`
Date string `json:"-" xorm:"date"`
Index int `json:"Index"` // 1天中测量时间粒度(如15分钟)的切片索引: 0~95
Timestamp string `json:"TimeStamp" xorm:"-"`
NeName string `json:"NeName"` // UserLabel
RmUID string `json:"RmUID" xorm:"rm_uid"`
NeType string `json:"NeType"` // 网元类型
PmVersion string `json:"PmVersion"` // 性能数据版本号
Dn string `json:"Dn"` // (???)网元标识, 如:RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456
Period string `json:"Period"` // 测量时间粒度选项5/15/30/60
// Date is the measurement date in YYYY-MM-DD format (internal use only, omitted from JSON).
Date string `json:"-" xorm:"date"`
Index int `json:"Index"` // Index of 15-minute intervals (time granularity) in a day, range: 0~95
// Timestamp represents the measurement time in Unix milliseconds, populated from measurement time.
NeName string `json:"NeName"` // 网元用户标签(UserLabel),用于显示和唯一标识网元名称
// RmUID is the unique identifier for the network element (unique per NE, used for database joins).
RmUID string `json:"RmUID" xorm:"rm_uid"`
// Dn is the Network Element Distinguished Name (DN), uniquely identifies the network element.
// Format example: "RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456"
// Dn为网元唯一标识(DN),格式示例:"RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456"
Dn string `json:"Dn" xorm:"dn"`
// TimeZone specifies the time zone for the measurement, e.g., "Asia/Shanghai" or "+08:00".
TimeZone string `json:"TimeZone"`
NeType string `json:"NeType"` // 网元类型
StartTime string `json:"StartTime"`
// List of KPIs grouped by ObjectType.
Datas []Data `json:"Datas"`
}
type KPIValue struct {
Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项如Dnn的名称写对应的Dnn"cmnet"/"ims"
Name string `json:"Name"` // "Total" is used for aggregate values; other names (e.g., "cmnet", "ims") are used for specific measurement items such as DNN names.
Value int64 `json:"Value"`
}
type KPI struct {
KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
KPIID string `json:"KPIID"`
KPIValues []KPIValue `json:"KPIValues"`
}
type Data struct {
ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如SmfFunction
KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
}
// 添加配置结构体
// type PMConfig struct {
// SchemaPath string `json:"schemaPath"` // schema文件路径
// RandomMin int `json:"randomMin"` // 随机数最小值
// RandomMax int `json:"randomMax"` // 随机数最大值
// MergeMode string `json:"mergeMode"` // 合并模式: "merge"(合并), "none"(不合并), "schema"(仅schema随机值)
// }
// 定义合并模式常量
const (
MergeModeNone = "none" // 不合并,仅使用网元发送的数据
MergeModeMerge = "merge" // 合并模式,优先使用网元数据,缺失部分用随机值补充
MergeModeSchema = "schema" // 仅使用schema数据全部用随机值
)
// 全局配置
// SchemaPath should be configured per environment; by default, it uses "/home/simon/omc.git/be.ems/config/schema".
// You can override this by setting the environment variable EMS_PM_SCHEMA_PATH.
// var pmConfig = PMConfig{
// SchemaPath: func() string {
// if envPath := os.Getenv("EMS_PM_SCHEMA_PATH"); envPath != "" {
// return envPath
// }
// return "/home/simon/omc.git/be.ems/config/schema"
// }(),
// RandomMin: 1,
// RandomMax: 16,
// MergeMode: MergeModeSchema,
// }
// var pmConfig = PMConfig{
// SchemaPath: config.GetNbiPmConfig().SchemaPath,
// RandomMin: config.GetNbiPmConfig().RandomMin,
// RandomMax: config.GetNbiPmConfig().RandomMax,
// MergeMode: config.GetNbiPmConfig().MergeMode,
// }
// schema数据结构
type SchemaKPIValue struct {
Name string `json:"Name"`
Value int `json:"Value"`
}
type SchemaKPI struct {
KPIID string `json:"KPIID"`
KPIValues []SchemaKPIValue `json:"KPIValues"`
}
type SchemaObject struct {
ObjectType string `json:"ObjectType"`
KPIs []SchemaKPI `json:"KPIs"`
}
type SchemaData []SchemaObject
// 读取schema文件
func loadSchemaData(neType string) (*SchemaData, error) {
schemaFile := filepath.Join(config.GetNbiPmConfig().SchemaPath, strings.ToLower(neType)+"-nbi-pm-schema.json")
if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
log.Warnf("Schema file not found: %s", schemaFile)
return nil, nil // 文件不存在时返回 nil不是错误
}
data, err := os.ReadFile(schemaFile)
if err != nil {
return nil, fmt.Errorf("failed to read schema file: %v", err)
}
var schema SchemaData
if err := json.Unmarshal(data, &schema); err != nil {
return nil, fmt.Errorf("failed to parse schema file: %v", err)
}
return &schema, nil
}
// 生成随机值
func generateRandomKPIValue() int64 {
return int64(rand.Intn(config.GetNbiPmConfig().RandomMax-config.GetNbiPmConfig().RandomMin+1) +
config.GetNbiPmConfig().RandomMin)
}
// 合并网元数据和schema数据
func mergeWithSchema(measurement *dborm.NorthboundPm, schema *SchemaData) {
if schema == nil {
return
}
switch config.GetNbiPmConfig().MergeMode {
case MergeModeNone:
// 不合并,直接返回
log.Debug("Merge mode is 'none', skipping schema merge")
return
case MergeModeSchema:
// 仅使用schema数据清空原有数据全部用随机值
log.Debug("Merge mode is 'schema', replacing all data with schema random values")
measurement.Datas = nil
generateSchemaOnlyData(measurement, schema)
return
case MergeModeMerge:
// 合并模式,优先使用网元数据,缺失部分用随机值补充
log.Debug("Merge mode is 'merge', merging NE data with schema")
mergeNeDataWithSchema(measurement, schema)
return
default:
log.Warnf("Unknown merge mode: %s, using default merge mode", config.GetNbiPmConfig().MergeMode)
mergeNeDataWithSchema(measurement, schema)
}
}
// 仅使用schema数据生成随机值
func generateSchemaOnlyData(measurement *dborm.NorthboundPm, schema *SchemaData) {
for _, schemaObj := range *schema {
newData := struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
}{
ObjectType: schemaObj.ObjectType,
PmDatas: []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{},
}
// 遍历schema中的KPI
for _, schemaKPI := range schemaObj.KPIs {
newKPI := struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{
PmName: schemaKPI.KPIID,
SubDatas: []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{},
}
// 为每个KPI值生成随机数
for _, schemaValue := range schemaKPI.KPIValues {
randomValue := generateRandomKPIValue()
newSubData := struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{
SN: schemaValue.Name,
SV: randomValue,
}
newKPI.SubDatas = append(newKPI.SubDatas, newSubData)
log.Debugf("Generated schema random value for %s.%s: %d",
schemaKPI.KPIID, schemaValue.Name, randomValue)
}
if len(newKPI.SubDatas) > 0 {
newData.PmDatas = append(newData.PmDatas, newKPI)
}
}
if len(newData.PmDatas) > 0 {
measurement.Datas = append(measurement.Datas, newData)
log.Debugf("Created ObjectType with schema data: %s (%d KPIs)",
schemaObj.ObjectType, len(newData.PmDatas))
}
}
log.Debugf("Schema-only mode: generated %d object types", len(measurement.Datas))
}
// 合并网元数据和schema数据原有逻辑
func mergeNeDataWithSchema(measurement *dborm.NorthboundPm, schema *SchemaData) {
// 创建网元已有数据的映射,用于快速查找
neDataMap := make(map[string]map[string]map[string]int64) // ObjectType -> KPIID -> Name -> Value
for _, data := range measurement.Datas {
if neDataMap[data.ObjectType] == nil {
neDataMap[data.ObjectType] = make(map[string]map[string]int64)
}
for _, pmData := range data.PmDatas {
if neDataMap[data.ObjectType][pmData.PmName] == nil {
neDataMap[data.ObjectType][pmData.PmName] = make(map[string]int64)
}
for _, subData := range pmData.SubDatas {
neDataMap[data.ObjectType][pmData.PmName][subData.SN] = subData.SV
}
}
}
log.Debugf("Original measurement data contains %d object types", len(measurement.Datas))
// 遍历schema补充缺失的数据
for _, schemaObj := range *schema {
// 查找或创建对应的ObjectType
var targetData *struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
}
for i := range measurement.Datas {
if measurement.Datas[i].ObjectType == schemaObj.ObjectType {
targetData = &measurement.Datas[i]
break
}
}
// 如果没找到对应的ObjectType创建新的
if targetData == nil {
newData := struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
}{
ObjectType: schemaObj.ObjectType,
PmDatas: []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{},
}
measurement.Datas = append(measurement.Datas, newData)
targetData = &measurement.Datas[len(measurement.Datas)-1]
log.Debugf("Created new ObjectType: %s", schemaObj.ObjectType)
}
// 遍历schema中的KPI
for _, schemaKPI := range schemaObj.KPIs {
// 查找是否已存在该KPI
var targetKPI *struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}
for i := range targetData.PmDatas {
if targetData.PmDatas[i].PmName == schemaKPI.KPIID {
targetKPI = &targetData.PmDatas[i]
break
}
}
// 如果没找到对应的KPI创建新的
if targetKPI == nil {
newKPI := struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{
PmName: schemaKPI.KPIID,
SubDatas: []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{},
}
targetData.PmDatas = append(targetData.PmDatas, newKPI)
targetKPI = &targetData.PmDatas[len(targetData.PmDatas)-1]
log.Debugf("Created new KPI: %s", schemaKPI.KPIID)
}
// 遍历schema中的KPI值
for _, schemaValue := range schemaKPI.KPIValues {
// 检查是否已存在该值
exists := false
for i := range targetKPI.SubDatas {
if targetKPI.SubDatas[i].SN == schemaValue.Name {
exists = true
// 如果网元没有提供该值,使用随机值
if neDataMap[schemaObj.ObjectType] == nil ||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID] == nil ||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name] == 0 {
targetKPI.SubDatas[i].SV = generateRandomKPIValue()
log.Debugf("Updated KPI %s.%s with random value: %d",
schemaKPI.KPIID, schemaValue.Name, targetKPI.SubDatas[i].SV)
}
break
}
}
// 如果不存在,添加新的值
if !exists {
var value int64
// 优先使用网元发送的值
if neDataMap[schemaObj.ObjectType] != nil &&
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID] != nil &&
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name] != 0 {
value = neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name]
log.Debugf("Using NE provided value for %s.%s: %d",
schemaKPI.KPIID, schemaValue.Name, value)
} else {
value = generateRandomKPIValue()
log.Debugf("Generated random value for %s.%s: %d",
schemaKPI.KPIID, schemaValue.Name, value)
}
newSubData := struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{
SN: schemaValue.Name,
SV: value,
}
targetKPI.SubDatas = append(targetKPI.SubDatas, newSubData)
}
}
}
}
}
// Add this function before mergeNeDataWithSchema
func mergeDuplicateObjectTypes(measurement *dborm.NorthboundPm) {
if len(measurement.Datas) <= 1 {
return
}
// Create a map to group data by ObjectType
objectTypeMap := make(map[string]*struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
})
// Group data by ObjectType
for _, data := range measurement.Datas {
if existingData, exists := objectTypeMap[data.ObjectType]; exists {
// Merge PmDatas - check for duplicate KPIs
kpiMap := make(map[string]*struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
})
// Index existing KPIs
for i := range existingData.PmDatas {
kpiMap[existingData.PmDatas[i].PmName] = &existingData.PmDatas[i]
}
// Merge new KPIs
for _, newPmData := range data.PmDatas {
if existingKpi, exists := kpiMap[newPmData.PmName]; exists {
// Merge SubDatas for existing KPI
subDataMap := make(map[string]bool)
for _, subData := range existingKpi.SubDatas {
subDataMap[subData.SN] = true
}
// Add new SubDatas that don't exist
for _, newSubData := range newPmData.SubDatas {
if !subDataMap[newSubData.SN] {
existingKpi.SubDatas = append(existingKpi.SubDatas, newSubData)
}
}
} else {
// Add new KPI
existingData.PmDatas = append(existingData.PmDatas, newPmData)
}
}
} else {
// Create new entry for this ObjectType
objectTypeMap[data.ObjectType] = &data
}
}
// Replace measurement.Datas with merged data
measurement.Datas = nil
for _, mergedData := range objectTypeMap {
measurement.Datas = append(measurement.Datas, *mergedData)
}
log.Debugf("Merged duplicate ObjectTypes: original count %d, merged count %d",
len(measurement.Datas), len(objectTypeMap))
}
// process measurement post message from NFs
func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostMeasurementFromNF processing... ")
@@ -983,11 +1398,12 @@ func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
return
}
//log.Debug("Request body:", string(body))
// measurement := new(dborm.NorthboundPm)
measurement := new(dborm.NorthboundPm)
_ = json.Unmarshal(body, &measurement)
//log.Debug("measurement:", measurement)
//neInfo := new(dborm.NeInfo)
// Merge duplicate ObjectTypes before processing
mergeDuplicateObjectTypes(measurement)
neInfo, err := dborm.XormGetNeInfoByRmUID(measurement.NeType, measurement.RmUID)
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
@@ -1002,13 +1418,28 @@ func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
measurement.NeName = neInfo.NeName
measurement.Dn = neInfo.Dn
session := dborm.DbClient.XEngine.NewSession()
// 加载schema数据并进行合并
if config.GetNbiPmConfig().MergeMode != MergeModeNone {
log.Debugf("Loading schema for neType: %s (mode: %s)", measurement.NeType, config.GetNbiPmConfig().MergeMode)
schema, err := loadSchemaData(measurement.NeType)
if err != nil {
log.Warnf("Failed to load schema for %s: %v", measurement.NeType, err)
} else if schema != nil {
log.Debugf("Successfully loaded schema for %s, processing with mode: %s", measurement.NeType, config.GetNbiPmConfig().MergeMode)
mergeWithSchema(measurement, schema)
log.Debug("Data processing completed")
} else {
log.Debugf("No schema file found for %s, using original data", measurement.NeType)
}
}
session := xEngine.NewSession()
defer session.Close()
// layout := global.DateTime
layout := time.RFC3339
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
affected, err := session.Table("nbi_pm").Insert(measurement)
if err != nil && affected <= 0 {
log.Error("Failed to insert nbi_pm:", err)
@@ -1016,6 +1447,12 @@ func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
return
}
// 推送到redis队列
err = redisqueue.AddNbiPMQueue(strconv.Itoa(measurement.Id))
if err != nil {
log.Warn("Failed to AddNbiPMQueue:", err)
}
services.ResponseStatusOK204NoContent(w)
}
@@ -1023,12 +1460,6 @@ func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("GetMeasurementFromNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
@@ -1052,9 +1483,7 @@ func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
}
log.Debugf("neType: %s neId:%s", neType, neIds)
//var neInfo *dborm.NeInfo
neInfo := new(dborm.NeInfo)
neInfo, err := dborm.XormGetNeInfo(neType, neIds[0])
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
@@ -1081,10 +1510,26 @@ func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
log.Debug("response:", response)
// measurement := new(dborm.NorthboundPm)
measurement := new(dborm.NorthboundPm)
_ = json.Unmarshal(response.Body(), &measurement)
log.Debug("measurement:", measurement)
// Merge duplicate ObjectTypes before processing
mergeDuplicateObjectTypes(measurement)
// Load schema and merge if needed
if config.GetNbiPmConfig().MergeMode != MergeModeNone {
log.Debugf("Loading schema for neType: %s (mode: %s)", measurement.NeType, config.GetNbiPmConfig().MergeMode)
schema, err := loadSchemaData(measurement.NeType)
if err != nil {
log.Warnf("Failed to load schema for %s: %v", measurement.NeType, err)
} else if schema != nil {
log.Debugf("Successfully loaded schema for %s, processing with mode: %s", measurement.NeType, config.GetNbiPmConfig().MergeMode)
mergeWithSchema(measurement, schema)
log.Debug("Data processing completed")
} else {
log.Debugf("No schema file found for %s, using original data", measurement.NeType)
}
}
session := dborm.DbClient.XEngine.NewSession()
defer session.Close()