Files
be.ems/features/pm/performance.go

1555 lines
52 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package pm
import (
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"be.ems/features/nbi/redisqueue"
"be.ems/features/pm/kpi_c_report"
"be.ems/features/pm/kpi_c_title"
"be.ems/lib/config"
"be.ems/lib/dborm"
evaluate "be.ems/lib/eval"
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/utils/parse"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
"github.com/go-resty/resty/v2"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
"xorm.io/xorm"
)
type Response struct {
Data interface{} `json:"data"`
}
type KpiReport struct {
Timestamp string `json:"TimeStamp"`
Task struct {
Period struct {
StartTime string `json:"StartTime"`
EndTime string `json:"EndTime"`
} `json:"Period"`
NE struct {
NEName string `json:"NEName"`
RmUID string `json:"rmUID"`
NeType string `json:"NeType"`
KPIs []struct {
KPIID string `json:"KPIID"`
Value int64 `json:"Value"`
Err string `json:"Err"`
} `json:"KPIs"`
} `json:"NE"`
} `json:"Task"`
}
type GoldKpi struct {
// Id int `json:"-" xorm:"pk 'id' autoincr"`
Date string `json:"date" xorm:"date"`
Index int `json:"index"`
Granularity int8 `json:"granularity"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
NEName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
NEType string `json:"neType" xorm:"ne_type"`
KpiId string `json:"kpiId" xorm:"kpi_id"`
Value int64 `json:"value"`
Error string `json:"error"`
Timestamp string `json:"timestamp"`
}
type KpiData struct {
ID int `json:"id" xorm:"pk 'id' '<-' autoincr"`
NEType string `json:"neType" xorm:"ne_type"`
NEName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
Date string `json:"date" xorm:"date"`
StartTime string `json:"startTime" xorm:"start_time"`
EndTime string `json:"endTime" xorm:"end_time"`
Index int `json:"index" xorm:"index"`
Granularity int8 `json:"granularity" xorm:"granularity"`
KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"`
//CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"`
CreatedAt int64 `json:"createdAt" xorm:"'created_at'"`
}
type KPIVal struct {
KPIID string `json:"kpi_id" xorm:"kpi_id"`
Value int64 `json:"value" xorm:"value"`
Err string `json:"err" xorm:"err"`
}
var (
// performance management
PerformanceUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
MeasureTaskUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask"
MeasureReportUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport"
MeasureReportFmt = config.DefaultUriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport"
MeasurementUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}"
UriMeasureTask = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
// performance management
CustomPerformanceUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
CustomMeasureTaskUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask"
CustomMeasureReportUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport"
CustomMeasureReportFmt = config.UriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport"
CustomMeasurementUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}"
CustomUriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
)
var xEngine *xorm.Engine
type DatabaseClient struct {
dbType string
dbUrl string
dbConnMaxLifetime time.Duration
dbMaxIdleConns int
dbMaxOpenConns int
IsShowSQL bool
XEngine *xorm.Engine
}
var DbClient DatabaseClient
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error {
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s",
dbUser, dbPassword, dbHost, dbPort, dbName, dbParam)
DbClient.dbType = dbType
DbClient.dbConnMaxLifetime = 0
DbClient.dbMaxIdleConns = 0
DbClient.dbMaxOpenConns = 0
if log.GetLevel() == log.LOG_TRACE {
DbClient.IsShowSQL = true
}
log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl)
var err error
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
if err != nil {
log.Error("Failed to connet database:", err)
return err
}
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
DbClient.XEngine.DatabaseTZ = time.Local // 必须
DbClient.XEngine.TZLocation = time.Local // 必须
if DbClient.IsShowSQL {
DbClient.XEngine.ShowSQL(true)
}
xEngine = DbClient.XEngine
// exist, err := xEngine.IsTableExist("kpi_report")
// if err != nil {
// log.Error("Failed to IsTableExist:", err)
// return err
// }
// if exist {
// // 复制表结构到新表
// sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` AS SELECT * FROM kpi_report WHERE 1=0", "kpi_report_amf")
// _, err := xEngine.Exec(sql)
// if err != nil {
// log.Error("Failed to Exec:", err)
// return err
// }
// }
return nil
}
func XormConnectDatabase(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) (*xorm.Engine, error) {
sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbUser, dbPassword, dbHost, dbPort, dbName)
log.Debugf("dbType:%s Connect to:%s:******@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbType, dbUser, dbHost, dbPort, dbName)
var err error
xEngine, err = xorm.NewEngine(dbType, sqlStr) //1、Create xorm engine
if err != nil {
log.Error("Failed to connect database:", err)
return nil, err
}
if log.GetLevel() == log.LOG_TRACE {
xEngine.ShowSQL(true)
}
return xEngine, nil
}
func GetDateFromTimeString(fmtString string, timeString string) string {
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
return t.Format("2006-01-02")
}
func GetDateTimeFromTimeString(fmtString string, timeString string) string {
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
return t.Format(global.DateTime)
}
// process KPI report post message from NFs
func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostKPIReportFromNF processing... ")
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
if err != nil {
log.Error("Faile to io.ReadAll: ", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
//log.Trace("Request body:", string(body))
kpiReport := new(KpiReport)
_ = json.Unmarshal(body, &kpiReport)
//log.Trace("kpiReport:", kpiReport)
layout := time.RFC3339Nano
//kpiDate := GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime)
kpiIndex, _ := strconv.Atoi(vars["index"])
startTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime)
endTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime)
// get time granularity from startTime and endTime
seconds, _ := global.GetSecondDuration(startTime, endTime)
var granularity int8 = 60
if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 {
granularity = int8(seconds)
}
// insert into new kpi_report_xxx table
kpiData := new(KpiData)
kpiData.Date = startTime
kpiData.Index = kpiIndex
//stime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local)
//etime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local)
kpiData.StartTime = startTime
kpiData.EndTime = endTime
kpiData.Granularity = granularity
kpiData.NEName = kpiReport.Task.NE.NEName
kpiData.NEType = kpiReport.Task.NE.NeType
kpiData.RmUid = kpiReport.Task.NE.RmUID
kpiVal := new(KPIVal)
kpiData.CreatedAt = time.Now().UnixMilli()
// 黄金指标事件对象
kpiEvent := map[string]any{
// kip_id ...
"neType": kpiReport.Task.NE.NeType,
"neName": kpiReport.Task.NE.NEName,
"rmUID": kpiReport.Task.NE.RmUID,
"startIndex": kpiIndex,
"timeGroup": kpiData.CreatedAt,
}
// for custom kpi
kpiValMap := map[string]any{}
for _, k := range kpiReport.Task.NE.KPIs {
kpiEvent[k.KPIID] = k.Value // kip_id
kpiVal.KPIID = k.KPIID
kpiVal.Value = int64(k.Value)
kpiVal.Err = k.Err
kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal)
kpiValMap[k.KPIID] = k.Value
}
kpiValMap["granularity"] = kpiData.Granularity
// insert kpi_report table, no session
tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType)
affected, err := xEngine.Table(tableName).Insert(kpiData)
if err != nil && affected <= 0 {
log.Errorf("Failed to insert %s:%v", tableName, err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
report := kpi_c_report.KpiCReport{
NeType: &kpiData.NEType,
NeName: &kpiData.NEName,
RmUID: &kpiData.RmUid,
Date: kpiData.Date,
StartTime: &kpiData.StartTime,
EndTime: &kpiData.EndTime,
Index: int16(kpiData.Index),
Granularity: &kpiData.Granularity,
}
// 发送到匹配的网元
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(kpiData.RmUid)
// custom kpi report to FE
kpiCEvent := map[string]any{
// kip_id ...
"neType": kpiData.NEType,
"neId": neInfo.NeId,
"neName": kpiData.NEName,
"rmUID": kpiData.RmUid,
"startIndex": kpiData.Index,
"timeGroup": kpiData.Date[:10] + " " + kpiData.EndTime,
"createdAt": kpiData.CreatedAt,
"granularity": kpiData.Granularity,
}
kpiCList := kpi_c_title.GetActiveKPICList(kpiData.NEType)
for _, k := range kpiCList {
result, err := evaluate.CalcExpr(*k.Expression, kpiValMap)
kpiCVal := new(kpi_c_report.KpiCVal)
kpiCVal.KPIID = *k.KpiID
if err != nil {
kpiCVal.Value = 0.0
kpiCVal.Err = err.Error()
} else {
kpiCVal.Value = result
}
report.KpiValues = append(report.KpiValues, *kpiCVal)
// set KPIC event kpiid and value
kpiCEvent[kpiCVal.KPIID] = kpiCVal.Value
}
// KPI自定义指标入库
kpi_c_report.InsertKpiCReport(kpiData.NEType, report)
if neInfo.RmUID == kpiData.RmUid {
// 推送到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent)
// 推送自定义KPI到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent)
if neInfo.NeType == "UPF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+"_"+neInfo.NeId, kpiEvent)
// 更新UPF总流量
upValue := parse.Number(kpiEvent["UPF.03"])
downValue := parse.Number(kpiEvent["UPF.06"])
neDataService.NewPerfKPI.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue)
}
}
// 推送到redis队列
err = redisqueue.AddNbiKpiQueue(kpiData.NEType, strconv.Itoa(kpiData.ID))
if err != nil {
log.Warn("Failed to AddNbiKpiQueue:", err)
}
services.ResponseStatusOK204NoContent(w)
}
type MeasureTask struct {
Tasks []Task `json:"Tasks"`
NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */
}
type Task struct {
Id int `json:"Id"`
StartTime string `json:"StartTime"`
EndTime string `json:"EndTime"`
Schedule struct {
Type string `json:"Type"` // 计划类型Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行
Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31]
Periods []dborm.Period `json:"Periods"`
/*
Periods []struct {
Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍
End string `json:"End"` //零点加测量粒度的整数倍
} `json:"Periods"`
*/
} `json:"Schedule"`
GranulOption string `json:"GranulOption"` // 测量粒度选项15M/30M/60M/24H
KPISet []dborm.KpiSetJ `json:"KPISet"`
/*
KPISet []struct {
Code string `json:"Code"` // 统计编码 如SMFHA01
KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"]
} `json:"KPISet"`
*/
}
type MeasureReport struct {
Id int `json:"Id"`
TimeStamp string `json:"TimeStamp"`
NeName string `json:"NeName"`
RmUID string `json:"rmUID"`
NeType string `json:"NeType"`
Report struct {
Period struct {
StartTime string `json:"StartTime"`
EndTime string `json:"EndTime"`
} `json:"Period"`
Datas []struct {
Code string `json:"Code"` // 统计编码 如SMFHA01
KPIs []struct {
KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
KPIValues []struct {
Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项如Dnn的名称写对应的Dnn"cmnet"/"ims"
Value int64 `json:"Value"`
} `json:"KPIValues"`
} `json:"KPIs"`
} `json:"Datas"`
} `json:"Report"`
}
type MeasureData struct {
// Id int `json:"id" xorm:"pk 'id' autoincr"`
Id int `json:"id" xorm:"-"`
Date string `json:"date" xorm:"date"`
TaskId int `json:"taskId"`
NeType string `json:"neType" xorm:"ne_type"`
NeName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
GranulOption string `json:"granulOption" xorm:"granul_option"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
KpiCode string `json:"kpiCode" xorm:"kpi_code"`
KpiId string `json:"kpiId" xorm:"kpi_id"`
KpiExt string `json:"kpiExt" xorm:"kpi_ext"`
Value int64 `json:"value"`
Timestamp string `json:"timestamp"`
}
// process measure report from NFs
func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostMeasureReportFromNF processing... ")
// vars := mux.Vars(r)
// neType := vars["elementTypeValue"]
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
if err != nil {
log.Error("Faile to io.ReadAll: ", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
log.Debug("Request body:", string(body))
measureReport := new(MeasureReport)
_ = json.Unmarshal(body, &measureReport)
log.Debug("measureReport:", measureReport)
session := xEngine.NewSession()
defer session.Close()
measureData := new(MeasureData)
layout := global.DateTime
measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime)
measureData.TaskId = measureReport.Id
measureData.StartTime = measureReport.Report.Period.StartTime
measureData.EndTime = measureReport.Report.Period.EndTime
measureData.NeType = measureReport.NeType
measureData.NeName = measureReport.NeName
measureData.RmUid = measureReport.RmUID
measureData.GranulOption, _ = dborm.XormGetSingleCol("measure_task", "granul_option", fmt.Sprintf("id=%d", measureReport.Id))
t, _ := strconv.ParseInt(measureReport.TimeStamp, 10, 64)
timestamp := time.Unix(t, 0)
log.Debug("timestamp:", timestamp.Format(layout))
measureData.Timestamp = timestamp.Format(layout)
log.Debug("Datas:", measureReport.Report.Datas)
for _, d := range measureReport.Report.Datas {
measureData.KpiCode = d.Code
log.Debug("KPIs:", d.KPIs)
for _, k := range d.KPIs {
measureData.KpiId = k.KPIID
log.Debug("KPIValues:", k.KPIValues)
if len(k.KPIValues) != 0 {
for _, v := range k.KPIValues {
measureData.KpiExt = v.Name
measureData.Value = v.Value
log.Debug("measureData:", measureData)
affected, err := session.Insert(measureData)
if err != nil && affected <= 0 {
log.Error("Failed to insert measure_data:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
}
} else {
measureData.Value = 0
log.Debug("measureData:", measureData)
affected, err := session.Insert(measureData)
if err != nil && affected <= 0 {
log.Error("Failed to insert measure_data:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
}
}
}
services.ResponseStatusOK204NoContent(w)
}
func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostMeasureTaskToNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
params := r.URL.Query()
taskIds := params["id"]
log.Debug("taskIds:", taskIds)
var response *resty.Response
client := resty.New()
measureTask := new(MeasureTask)
measureTask.Tasks = make([]Task, 1)
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
log.Debug("Table Task:", task)
measureTask.Tasks[0].Id = task.Id
measureTask.Tasks[0].StartTime = task.StartTime
measureTask.Tasks[0].EndTime = task.EndTime
// v := new(dborm.ScheduleJson)
// _ = json.Unmarshal(task.Schedule, v)
// measureTask.Task[0].Schedule.Type = v.Type
// measureTask.Task[0].Schedule.Days = v.Days
if len(task.Schedule) >= 1 {
measureTask.Tasks[0].Schedule.Type = task.Schedule[0].Type
measureTask.Tasks[0].Schedule.Days = task.Schedule[0].Days
}
//v := new(dborm.ScheduleJ)
//_ = json.Unmarshal(task.Schedule, v)
measureTask.Tasks[0].Schedule.Periods = task.Periods
measureTask.Tasks[0].GranulOption = task.GranulOption
measureTask.Tasks[0].KPISet = task.KpiSet
ips, err := global.GetIps()
if err != nil {
log.Error("Failed to get local IP:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Debug("ips:", ips)
measureTask.NotifyUrl = global.SetNotifyUrl(ips[0], config.GetYamlConfig().Rest[0].Port, fmt.Sprintf(MeasureReportFmt, neType))
log.Debug("Measure Task to NF:", measureTask)
if len(task.NeIds) == 0 {
var neInfos []dborm.NeInfo
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
for _, neInfo := range neInfos {
task.NeIds = append(task.NeIds, neInfo.NeId)
}
}
for _, neId := range task.NeIds {
var err error
neInfo, err := dborm.XormGetNeInfo(neType, neId)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfo:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
if neInfo == nil {
err := fmt.Errorf("not found target NE neType=%s, neId=%s", neType, neId)
log.Error(err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: POST ", requestURI2NF)
switch task.Status {
case dborm.MeasureTaskStatusInactive:
body, _ := json.Marshal(measureTask)
log.Debug("body: ", string(body))
log.Debug("User-Agent: ", config.GetDefaultUserAgent())
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(body).
SetContentLength(true).
Post(requestURI2NF)
if err != nil {
log.Error("Post to NF failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
log.Debug("response info: ")
log.Debug("Status Code:", response.StatusCode())
log.Debug("Status:", response.Status())
log.Debug("Proto:", response.Proto())
log.Debug("Time:", response.Time())
log.Debug("Received At:", response.ReceivedAt())
log.Debug("Size:", response.Size())
case dborm.MeasureTaskStatusSuspend:
body, _ := json.Marshal(measureTask)
log.Debug("body: ", string(body))
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(body).
SetContentLength(true).
Put(requestURI2NF)
if err != nil {
log.Error("Put to NF failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
default:
err = fmt.Errorf("measure task status must be inactive id=%d", id)
log.Error("Unable to active measure task:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusActive
taskInfo.CreateTime = time.Now().Format(time.DateTime)
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
default:
log.Error("NF return failure to active measure task")
if response != nil {
log.Info("response body:", string(response.Body()))
services.TransportResponse(w, response.StatusCode(), response.Body())
return
} else {
err = fmt.Errorf("failed to active measure task, NF return error status=%v", response.Status())
log.Error("Unable to active measure task:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
}
}
services.ResponseStatusOK204NoContent(w)
}
func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
services.ResponseStatusOK200Null(w)
}
func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("DeleteMeasureTaskToNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
params := r.URL.Query()
taskIds := params["id"]
log.Debug("taskIds:", taskIds)
var response *resty.Response
respMsg := make(map[string]interface{})
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
log.Debug("Measure Task:", task)
if len(task.NeIds) == 0 {
var neInfos []dborm.NeInfo
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
for _, neInfo := range neInfos {
task.NeIds = append(task.NeIds, neInfo.NeId)
}
}
log.Debug("neIds:", task.NeIds)
if len(task.NeIds) == 0 {
log.Warn("Not found target NE in the measure task")
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
for _, neId := range task.NeIds {
var err error
neInfo, err := dborm.XormGetNeInfo(neType, neId)
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
if neInfo != nil {
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: DELETE ", requestURI2NF)
client := resty.New()
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Delete(requestURI2NF)
if err != nil {
// to avoid can't delete the task for abnormal NF
log.Error("Failed to resty delete:", err)
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
log.Info("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Infof("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
default:
log.Info("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
respMsg["error"] = body
}
} else {
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
}
}
services.ResponseWithJson(w, response.StatusCode(), respMsg)
}
func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PatchMeasureTaskToNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
params := r.URL.Query()
taskIds := params["id"]
log.Debug("taskIds:", taskIds)
var response *resty.Response
respMsg := make(map[string]interface{})
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
log.Debug("Measure Task:", task)
// for neType
if len(task.NeIds) == 0 {
var neInfos []dborm.NeInfo
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
for _, neInfo := range neInfos {
task.NeIds = append(task.NeIds, neInfo.NeId)
}
}
if len(task.NeIds) == 0 {
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
for _, neId := range task.NeIds {
var err error
neInfo, err := dborm.XormGetNeInfo(neType, neId)
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
if neInfo == nil {
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
//services.ResponseInternalServerError500ProcessError(w, em)
return
}
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: PATCH ", requestURI2NF)
client := resty.New()
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Patch(requestURI2NF)
if err != nil {
log.Error("Patch to NF failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
default:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
respMsg["error"] = body
}
}
}
services.ResponseWithJson(w, response.StatusCode(), respMsg)
}
type Measurement struct {
// Date is the measurement date in YYYY-MM-DD format (internal use only, omitted from JSON).
Date string `json:"-" xorm:"date"`
Index int `json:"Index"` // Index of 15-minute intervals (time granularity) in a day, range: 0~95
// Timestamp represents the measurement time in Unix milliseconds, populated from measurement time.
NeName string `json:"NeName"` // 网元用户标签UserLabel用于显示和唯一标识网元名称
// RmUID is the unique identifier for the network element (unique per NE, used for database joins).
RmUID string `json:"RmUID" xorm:"rm_uid"`
// Dn is the Network Element Distinguished Name (DN), uniquely identifies the network element.
// Format example: "RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456"
// Dn为网元唯一标识(DN),格式示例:"RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456"
Dn string `json:"Dn" xorm:"dn"`
// TimeZone specifies the time zone for the measurement, e.g., "Asia/Shanghai" or "+08:00".
TimeZone string `json:"TimeZone"`
NeType string `json:"NeType"` // 网元类型
StartTime string `json:"StartTime"`
// List of KPIs grouped by ObjectType.
Datas []Data `json:"Datas"`
}
type KPIValue struct {
Name string `json:"Name"` // "Total" is used for aggregate values; other names (e.g., "cmnet", "ims") are used for specific measurement items such as DNN names.
Value int64 `json:"Value"`
}
type KPI struct {
KPIID string `json:"KPIID"`
KPIValues []KPIValue `json:"KPIValues"`
}
type Data struct {
ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如SmfFunction
KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
}
// 添加配置结构体
// type PMConfig struct {
// SchemaPath string `json:"schemaPath"` // schema文件路径
// RandomMin int `json:"randomMin"` // 随机数最小值
// RandomMax int `json:"randomMax"` // 随机数最大值
// MergeMode string `json:"mergeMode"` // 合并模式: "merge"(合并), "none"(不合并), "schema"(仅schema随机值)
// }
// 定义合并模式常量
const (
MergeModeNone = "none" // 不合并,仅使用网元发送的数据
MergeModeMerge = "merge" // 合并模式,优先使用网元数据,缺失部分用随机值补充
MergeModeSchema = "schema" // 仅使用schema数据全部用随机值
)
// 全局配置
// SchemaPath should be configured per environment; by default, it uses "/home/simon/omc.git/be.ems/config/schema".
// You can override this by setting the environment variable EMS_PM_SCHEMA_PATH.
// var pmConfig = PMConfig{
// SchemaPath: func() string {
// if envPath := os.Getenv("EMS_PM_SCHEMA_PATH"); envPath != "" {
// return envPath
// }
// return "/home/simon/omc.git/be.ems/config/schema"
// }(),
// RandomMin: 1,
// RandomMax: 16,
// MergeMode: MergeModeSchema,
// }
// var pmConfig = PMConfig{
// SchemaPath: config.GetNbiPmConfig().SchemaPath,
// RandomMin: config.GetNbiPmConfig().RandomMin,
// RandomMax: config.GetNbiPmConfig().RandomMax,
// MergeMode: config.GetNbiPmConfig().MergeMode,
// }
// schema数据结构
type SchemaKPIValue struct {
Name string `json:"Name"`
Value int `json:"Value"`
}
type SchemaKPI struct {
KPIID string `json:"KPIID"`
KPIValues []SchemaKPIValue `json:"KPIValues"`
}
type SchemaObject struct {
ObjectType string `json:"ObjectType"`
KPIs []SchemaKPI `json:"KPIs"`
}
type SchemaData []SchemaObject
// 读取schema文件
func loadSchemaData(neType string) (*SchemaData, error) {
schemaFile := filepath.Join(config.GetNbiPmConfig().SchemaPath, strings.ToLower(neType)+"-nbi-pm-schema.json")
if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
log.Warnf("Schema file not found: %s", schemaFile)
return nil, nil // 文件不存在时返回 nil不是错误
}
data, err := os.ReadFile(schemaFile)
if err != nil {
return nil, fmt.Errorf("failed to read schema file: %v", err)
}
var schema SchemaData
if err := json.Unmarshal(data, &schema); err != nil {
return nil, fmt.Errorf("failed to parse schema file: %v", err)
}
return &schema, nil
}
// 生成随机值
func generateRandomKPIValue() int64 {
return int64(rand.Intn(config.GetNbiPmConfig().RandomMax-config.GetNbiPmConfig().RandomMin+1) +
config.GetNbiPmConfig().RandomMin)
}
// 合并网元数据和schema数据
func mergeWithSchema(measurement *dborm.NorthboundPm, schema *SchemaData) {
if schema == nil {
return
}
switch config.GetNbiPmConfig().MergeMode {
case MergeModeNone:
// 不合并,直接返回
log.Debug("Merge mode is 'none', skipping schema merge")
return
case MergeModeSchema:
// 仅使用schema数据清空原有数据全部用随机值
log.Debug("Merge mode is 'schema', replacing all data with schema random values")
measurement.Datas = nil
generateSchemaOnlyData(measurement, schema)
return
case MergeModeMerge:
// 合并模式,优先使用网元数据,缺失部分用随机值补充
log.Debug("Merge mode is 'merge', merging NE data with schema")
mergeNeDataWithSchema(measurement, schema)
return
default:
log.Warnf("Unknown merge mode: %s, using default merge mode", config.GetNbiPmConfig().MergeMode)
mergeNeDataWithSchema(measurement, schema)
}
}
// 仅使用schema数据生成随机值
func generateSchemaOnlyData(measurement *dborm.NorthboundPm, schema *SchemaData) {
for _, schemaObj := range *schema {
newData := struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
}{
ObjectType: schemaObj.ObjectType,
PmDatas: []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{},
}
// 遍历schema中的KPI
for _, schemaKPI := range schemaObj.KPIs {
newKPI := struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{
PmName: schemaKPI.KPIID,
SubDatas: []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{},
}
// 为每个KPI值生成随机数
for _, schemaValue := range schemaKPI.KPIValues {
randomValue := generateRandomKPIValue()
newSubData := struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{
SN: schemaValue.Name,
SV: randomValue,
}
newKPI.SubDatas = append(newKPI.SubDatas, newSubData)
log.Debugf("Generated schema random value for %s.%s: %d",
schemaKPI.KPIID, schemaValue.Name, randomValue)
}
if len(newKPI.SubDatas) > 0 {
newData.PmDatas = append(newData.PmDatas, newKPI)
}
}
if len(newData.PmDatas) > 0 {
measurement.Datas = append(measurement.Datas, newData)
log.Debugf("Created ObjectType with schema data: %s (%d KPIs)",
schemaObj.ObjectType, len(newData.PmDatas))
}
}
log.Debugf("Schema-only mode: generated %d object types", len(measurement.Datas))
}
// 合并网元数据和schema数据原有逻辑
func mergeNeDataWithSchema(measurement *dborm.NorthboundPm, schema *SchemaData) {
// 创建网元已有数据的映射,用于快速查找
neDataMap := make(map[string]map[string]map[string]int64) // ObjectType -> KPIID -> Name -> Value
for _, data := range measurement.Datas {
if neDataMap[data.ObjectType] == nil {
neDataMap[data.ObjectType] = make(map[string]map[string]int64)
}
for _, pmData := range data.PmDatas {
if neDataMap[data.ObjectType][pmData.PmName] == nil {
neDataMap[data.ObjectType][pmData.PmName] = make(map[string]int64)
}
for _, subData := range pmData.SubDatas {
neDataMap[data.ObjectType][pmData.PmName][subData.SN] = subData.SV
}
}
}
log.Debugf("Original measurement data contains %d object types", len(measurement.Datas))
// 遍历schema补充缺失的数据
for _, schemaObj := range *schema {
// 查找或创建对应的ObjectType
var targetData *struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
}
for i := range measurement.Datas {
if measurement.Datas[i].ObjectType == schemaObj.ObjectType {
targetData = &measurement.Datas[i]
break
}
}
// 如果没找到对应的ObjectType创建新的
if targetData == nil {
newData := struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
}{
ObjectType: schemaObj.ObjectType,
PmDatas: []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{},
}
measurement.Datas = append(measurement.Datas, newData)
targetData = &measurement.Datas[len(measurement.Datas)-1]
log.Debugf("Created new ObjectType: %s", schemaObj.ObjectType)
}
// 遍历schema中的KPI
for _, schemaKPI := range schemaObj.KPIs {
// 查找是否已存在该KPI
var targetKPI *struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}
for i := range targetData.PmDatas {
if targetData.PmDatas[i].PmName == schemaKPI.KPIID {
targetKPI = &targetData.PmDatas[i]
break
}
}
// 如果没找到对应的KPI创建新的
if targetKPI == nil {
newKPI := struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
}{
PmName: schemaKPI.KPIID,
SubDatas: []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{},
}
targetData.PmDatas = append(targetData.PmDatas, newKPI)
targetKPI = &targetData.PmDatas[len(targetData.PmDatas)-1]
log.Debugf("Created new KPI: %s", schemaKPI.KPIID)
}
// 遍历schema中的KPI值
for _, schemaValue := range schemaKPI.KPIValues {
// 检查是否已存在该值
exists := false
for i := range targetKPI.SubDatas {
if targetKPI.SubDatas[i].SN == schemaValue.Name {
exists = true
// 如果网元没有提供该值,使用随机值
if neDataMap[schemaObj.ObjectType] == nil ||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID] == nil ||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name] == 0 {
targetKPI.SubDatas[i].SV = generateRandomKPIValue()
log.Debugf("Updated KPI %s.%s with random value: %d",
schemaKPI.KPIID, schemaValue.Name, targetKPI.SubDatas[i].SV)
}
break
}
}
// 如果不存在,添加新的值
if !exists {
var value int64
// 优先使用网元发送的值
if neDataMap[schemaObj.ObjectType] != nil &&
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID] != nil &&
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name] != 0 {
value = neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name]
log.Debugf("Using NE provided value for %s.%s: %d",
schemaKPI.KPIID, schemaValue.Name, value)
} else {
value = generateRandomKPIValue()
log.Debugf("Generated random value for %s.%s: %d",
schemaKPI.KPIID, schemaValue.Name, value)
}
newSubData := struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
}{
SN: schemaValue.Name,
SV: value,
}
targetKPI.SubDatas = append(targetKPI.SubDatas, newSubData)
}
}
}
}
}
// Add this function before mergeNeDataWithSchema
func mergeDuplicateObjectTypes(measurement *dborm.NorthboundPm) {
if len(measurement.Datas) <= 1 {
return
}
// Create a map to group data by ObjectType
objectTypeMap := make(map[string]*struct {
ObjectType string `json:"ObjectType" xorm:"object_type"`
PmDatas []struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
})
// Group data by ObjectType
for _, data := range measurement.Datas {
if existingData, exists := objectTypeMap[data.ObjectType]; exists {
// Merge PmDatas - check for duplicate KPIs
kpiMap := make(map[string]*struct {
PmName string `json:"KPIID" xorm:"pm_name"`
SubDatas []struct {
SN string `json:"Name" xorm:"sn"`
SV int64 `json:"Value" xorm:"sv"`
} `json:"KPIValues" xorm:"sub_datas"`
})
// Index existing KPIs
for i := range existingData.PmDatas {
kpiMap[existingData.PmDatas[i].PmName] = &existingData.PmDatas[i]
}
// Merge new KPIs
for _, newPmData := range data.PmDatas {
if existingKpi, exists := kpiMap[newPmData.PmName]; exists {
// Merge SubDatas for existing KPI
subDataMap := make(map[string]bool)
for _, subData := range existingKpi.SubDatas {
subDataMap[subData.SN] = true
}
// Add new SubDatas that don't exist
for _, newSubData := range newPmData.SubDatas {
if !subDataMap[newSubData.SN] {
existingKpi.SubDatas = append(existingKpi.SubDatas, newSubData)
}
}
} else {
// Add new KPI
existingData.PmDatas = append(existingData.PmDatas, newPmData)
}
}
} else {
// Create new entry for this ObjectType
objectTypeMap[data.ObjectType] = &data
}
}
// Replace measurement.Datas with merged data
measurement.Datas = nil
for _, mergedData := range objectTypeMap {
measurement.Datas = append(measurement.Datas, *mergedData)
}
log.Debugf("Merged duplicate ObjectTypes: original count %d, merged count %d",
len(measurement.Datas), len(objectTypeMap))
}
// process measurement post message from NFs
func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostMeasurementFromNF processing... ")
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
if err != nil {
log.Error("Faile to io.ReadAll: ", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
//log.Debug("Request body:", string(body))
measurement := new(dborm.NorthboundPm)
_ = json.Unmarshal(body, &measurement)
// Merge duplicate ObjectTypes before processing
mergeDuplicateObjectTypes(measurement)
neInfo, err := dborm.XormGetNeInfoByRmUID(measurement.NeType, measurement.RmUID)
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
if neInfo == nil {
log.Error("Not found target NE neType:", measurement.NeType, "RmUID:", measurement.RmUID)
services.ResponseNotFound404UriNotExist(w, r)
return
}
measurement.NeName = neInfo.NeName
measurement.Dn = neInfo.Dn
// 加载schema数据并进行合并
if config.GetNbiPmConfig().MergeMode != MergeModeNone {
log.Debugf("Loading schema for neType: %s (mode: %s)", measurement.NeType, config.GetNbiPmConfig().MergeMode)
schema, err := loadSchemaData(measurement.NeType)
if err != nil {
log.Warnf("Failed to load schema for %s: %v", measurement.NeType, err)
} else if schema != nil {
log.Debugf("Successfully loaded schema for %s, processing with mode: %s", measurement.NeType, config.GetNbiPmConfig().MergeMode)
mergeWithSchema(measurement, schema)
log.Debug("Data processing completed")
} else {
log.Debugf("No schema file found for %s, using original data", measurement.NeType)
}
}
session := xEngine.NewSession()
defer session.Close()
layout := time.RFC3339
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
affected, err := session.Table("nbi_pm").Insert(measurement)
if err != nil && affected <= 0 {
log.Error("Failed to insert nbi_pm:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
// 推送到redis队列
err = redisqueue.AddNbiPMQueue(strconv.Itoa(measurement.Id))
if err != nil {
log.Warn("Failed to AddNbiPMQueue:", err)
}
services.ResponseStatusOK204NoContent(w)
}
// get measurement message from NFs
func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("GetMeasurementFromNF processing... ")
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
neType := vars["elementTypeValue"]
if neType == "" {
log.Error("elementTypeValue is null.")
services.ResponseNotFound404UriNotExist(w, r)
return
}
params := r.URL.Query()
neIds := params["ne_id"]
if len(neIds) == 0 {
log.Error("ne_id NOT FOUND")
services.ResponseBadRequest400WrongParamValue(w)
return
}
log.Debugf("neType: %s neId:%s", neType, neIds)
neInfo := new(dborm.NeInfo)
neInfo, err := dborm.XormGetNeInfo(neType, neIds[0])
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: GET ", requestURI2NF)
client := resty.New()
response, err := client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Get(requestURI2NF)
if err != nil {
log.Error("Failed to Get from NF:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
respMsg := make(map[string]interface{})
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
log.Debug("response:", response)
measurement := new(dborm.NorthboundPm)
_ = json.Unmarshal(response.Body(), &measurement)
// Merge duplicate ObjectTypes before processing
mergeDuplicateObjectTypes(measurement)
// Load schema and merge if needed
if config.GetNbiPmConfig().MergeMode != MergeModeNone {
log.Debugf("Loading schema for neType: %s (mode: %s)", measurement.NeType, config.GetNbiPmConfig().MergeMode)
schema, err := loadSchemaData(measurement.NeType)
if err != nil {
log.Warnf("Failed to load schema for %s: %v", measurement.NeType, err)
} else if schema != nil {
log.Debugf("Successfully loaded schema for %s, processing with mode: %s", measurement.NeType, config.GetNbiPmConfig().MergeMode)
mergeWithSchema(measurement, schema)
log.Debug("Data processing completed")
} else {
log.Debugf("No schema file found for %s, using original data", measurement.NeType)
}
}
session := dborm.DbClient.XEngine.NewSession()
defer session.Close()
layout := time.RFC3339
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
affected, err := session.Table("nbi_pm").Insert(measurement)
if err != nil && affected <= 0 {
log.Error("Failed to insert nbi_pm:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
default:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
respMsg["error"] = body
}
services.ResponseWithJson(w, response.StatusCode(), respMsg)
}