1555 lines
52 KiB
Go
1555 lines
52 KiB
Go
package pm
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"math"
|
||
"math/rand"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"be.ems/features/nbi/redisqueue"
|
||
"be.ems/features/pm/kpi_c_report"
|
||
"be.ems/features/pm/kpi_c_title"
|
||
"be.ems/lib/config"
|
||
"be.ems/lib/dborm"
|
||
evaluate "be.ems/lib/eval"
|
||
"be.ems/lib/global"
|
||
"be.ems/lib/log"
|
||
"be.ems/lib/services"
|
||
|
||
"be.ems/src/framework/utils/parse"
|
||
neDataService "be.ems/src/modules/network_data/service"
|
||
neService "be.ems/src/modules/network_element/service"
|
||
wsService "be.ems/src/modules/ws/service"
|
||
"github.com/go-resty/resty/v2"
|
||
_ "github.com/go-sql-driver/mysql"
|
||
"github.com/gorilla/mux"
|
||
"xorm.io/xorm"
|
||
)
|
||
|
||
type Response struct {
|
||
Data interface{} `json:"data"`
|
||
}
|
||
|
||
type KpiReport struct {
|
||
Timestamp string `json:"TimeStamp"`
|
||
Task struct {
|
||
Period struct {
|
||
StartTime string `json:"StartTime"`
|
||
EndTime string `json:"EndTime"`
|
||
} `json:"Period"`
|
||
NE struct {
|
||
NEName string `json:"NEName"`
|
||
RmUID string `json:"rmUID"`
|
||
NeType string `json:"NeType"`
|
||
KPIs []struct {
|
||
KPIID string `json:"KPIID"`
|
||
Value int64 `json:"Value"`
|
||
Err string `json:"Err"`
|
||
} `json:"KPIs"`
|
||
} `json:"NE"`
|
||
} `json:"Task"`
|
||
}
|
||
|
||
type GoldKpi struct {
|
||
// Id int `json:"-" xorm:"pk 'id' autoincr"`
|
||
Date string `json:"date" xorm:"date"`
|
||
Index int `json:"index"`
|
||
Granularity int8 `json:"granularity"`
|
||
StartTime string `json:"startTime"`
|
||
EndTime string `json:"endTime"`
|
||
NEName string `json:"neName" xorm:"ne_name"`
|
||
RmUid string `json:"rmUid" xorm:"rm_uid"`
|
||
NEType string `json:"neType" xorm:"ne_type"`
|
||
KpiId string `json:"kpiId" xorm:"kpi_id"`
|
||
Value int64 `json:"value"`
|
||
Error string `json:"error"`
|
||
Timestamp string `json:"timestamp"`
|
||
}
|
||
|
||
type KpiData struct {
|
||
ID int `json:"id" xorm:"pk 'id' '<-' autoincr"`
|
||
NEType string `json:"neType" xorm:"ne_type"`
|
||
NEName string `json:"neName" xorm:"ne_name"`
|
||
RmUid string `json:"rmUid" xorm:"rm_uid"`
|
||
Date string `json:"date" xorm:"date"`
|
||
StartTime string `json:"startTime" xorm:"start_time"`
|
||
EndTime string `json:"endTime" xorm:"end_time"`
|
||
Index int `json:"index" xorm:"index"`
|
||
Granularity int8 `json:"granularity" xorm:"granularity"`
|
||
KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"`
|
||
//CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"`
|
||
CreatedAt int64 `json:"createdAt" xorm:"'created_at'"`
|
||
}
|
||
type KPIVal struct {
|
||
KPIID string `json:"kpi_id" xorm:"kpi_id"`
|
||
Value int64 `json:"value" xorm:"value"`
|
||
Err string `json:"err" xorm:"err"`
|
||
}
|
||
|
||
var (
|
||
// performance management
|
||
PerformanceUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
|
||
MeasureTaskUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask"
|
||
MeasureReportUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport"
|
||
MeasureReportFmt = config.DefaultUriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport"
|
||
MeasurementUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}"
|
||
UriMeasureTask = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
|
||
|
||
// performance management
|
||
CustomPerformanceUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
|
||
CustomMeasureTaskUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask"
|
||
CustomMeasureReportUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport"
|
||
CustomMeasureReportFmt = config.UriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport"
|
||
CustomMeasurementUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}"
|
||
CustomUriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
|
||
)
|
||
|
||
var xEngine *xorm.Engine
|
||
|
||
type DatabaseClient struct {
|
||
dbType string
|
||
dbUrl string
|
||
dbConnMaxLifetime time.Duration
|
||
dbMaxIdleConns int
|
||
dbMaxOpenConns int
|
||
IsShowSQL bool
|
||
|
||
XEngine *xorm.Engine
|
||
}
|
||
|
||
var DbClient DatabaseClient
|
||
|
||
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error {
|
||
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s",
|
||
dbUser, dbPassword, dbHost, dbPort, dbName, dbParam)
|
||
DbClient.dbType = dbType
|
||
DbClient.dbConnMaxLifetime = 0
|
||
DbClient.dbMaxIdleConns = 0
|
||
DbClient.dbMaxOpenConns = 0
|
||
if log.GetLevel() == log.LOG_TRACE {
|
||
DbClient.IsShowSQL = true
|
||
}
|
||
log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl)
|
||
|
||
var err error
|
||
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
|
||
if err != nil {
|
||
log.Error("Failed to connet database:", err)
|
||
return err
|
||
}
|
||
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
|
||
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
|
||
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
|
||
DbClient.XEngine.DatabaseTZ = time.Local // 必须
|
||
DbClient.XEngine.TZLocation = time.Local // 必须
|
||
if DbClient.IsShowSQL {
|
||
DbClient.XEngine.ShowSQL(true)
|
||
}
|
||
xEngine = DbClient.XEngine
|
||
|
||
// exist, err := xEngine.IsTableExist("kpi_report")
|
||
// if err != nil {
|
||
// log.Error("Failed to IsTableExist:", err)
|
||
// return err
|
||
// }
|
||
// if exist {
|
||
// // 复制表结构到新表
|
||
// sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` AS SELECT * FROM kpi_report WHERE 1=0", "kpi_report_amf")
|
||
// _, err := xEngine.Exec(sql)
|
||
// if err != nil {
|
||
// log.Error("Failed to Exec:", err)
|
||
// return err
|
||
// }
|
||
// }
|
||
|
||
return nil
|
||
}
|
||
|
||
func XormConnectDatabase(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) (*xorm.Engine, error) {
|
||
sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
|
||
dbUser, dbPassword, dbHost, dbPort, dbName)
|
||
log.Debugf("dbType:%s Connect to:%s:******@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
|
||
dbType, dbUser, dbHost, dbPort, dbName)
|
||
var err error
|
||
xEngine, err = xorm.NewEngine(dbType, sqlStr) //1、Create xorm engine
|
||
if err != nil {
|
||
log.Error("Failed to connect database:", err)
|
||
return nil, err
|
||
}
|
||
if log.GetLevel() == log.LOG_TRACE {
|
||
xEngine.ShowSQL(true)
|
||
}
|
||
return xEngine, nil
|
||
}
|
||
|
||
func GetDateFromTimeString(fmtString string, timeString string) string {
|
||
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
|
||
return t.Format("2006-01-02")
|
||
}
|
||
|
||
func GetDateTimeFromTimeString(fmtString string, timeString string) string {
|
||
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
|
||
return t.Format(global.DateTime)
|
||
}
|
||
|
||
// process KPI report post message from NFs
|
||
func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||
log.Debug("PostKPIReportFromNF processing... ")
|
||
|
||
vars := mux.Vars(r)
|
||
apiVer := vars["apiVersion"]
|
||
if apiVer != global.ApiVersionV1 {
|
||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
|
||
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||
if err != nil {
|
||
log.Error("Faile to io.ReadAll: ", err)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
//log.Trace("Request body:", string(body))
|
||
kpiReport := new(KpiReport)
|
||
_ = json.Unmarshal(body, &kpiReport)
|
||
//log.Trace("kpiReport:", kpiReport)
|
||
|
||
layout := time.RFC3339Nano
|
||
//kpiDate := GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime)
|
||
kpiIndex, _ := strconv.Atoi(vars["index"])
|
||
startTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime)
|
||
endTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime)
|
||
// get time granularity from startTime and endTime
|
||
seconds, _ := global.GetSecondDuration(startTime, endTime)
|
||
var granularity int8 = 60
|
||
if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 {
|
||
granularity = int8(seconds)
|
||
}
|
||
|
||
// insert into new kpi_report_xxx table
|
||
kpiData := new(KpiData)
|
||
kpiData.Date = startTime
|
||
kpiData.Index = kpiIndex
|
||
//stime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local)
|
||
//etime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local)
|
||
kpiData.StartTime = startTime
|
||
kpiData.EndTime = endTime
|
||
kpiData.Granularity = granularity
|
||
kpiData.NEName = kpiReport.Task.NE.NEName
|
||
kpiData.NEType = kpiReport.Task.NE.NeType
|
||
kpiData.RmUid = kpiReport.Task.NE.RmUID
|
||
kpiVal := new(KPIVal)
|
||
kpiData.CreatedAt = time.Now().UnixMilli()
|
||
|
||
// 黄金指标事件对象
|
||
kpiEvent := map[string]any{
|
||
// kip_id ...
|
||
"neType": kpiReport.Task.NE.NeType,
|
||
"neName": kpiReport.Task.NE.NEName,
|
||
"rmUID": kpiReport.Task.NE.RmUID,
|
||
"startIndex": kpiIndex,
|
||
"timeGroup": kpiData.CreatedAt,
|
||
}
|
||
|
||
// for custom kpi
|
||
kpiValMap := map[string]any{}
|
||
for _, k := range kpiReport.Task.NE.KPIs {
|
||
kpiEvent[k.KPIID] = k.Value // kip_id
|
||
|
||
kpiVal.KPIID = k.KPIID
|
||
kpiVal.Value = int64(k.Value)
|
||
kpiVal.Err = k.Err
|
||
kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal)
|
||
kpiValMap[k.KPIID] = k.Value
|
||
}
|
||
kpiValMap["granularity"] = kpiData.Granularity
|
||
|
||
// insert kpi_report table, no session
|
||
tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType)
|
||
affected, err := xEngine.Table(tableName).Insert(kpiData)
|
||
if err != nil && affected <= 0 {
|
||
log.Errorf("Failed to insert %s:%v", tableName, err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
|
||
report := kpi_c_report.KpiCReport{
|
||
NeType: &kpiData.NEType,
|
||
NeName: &kpiData.NEName,
|
||
RmUID: &kpiData.RmUid,
|
||
Date: kpiData.Date,
|
||
StartTime: &kpiData.StartTime,
|
||
EndTime: &kpiData.EndTime,
|
||
Index: int16(kpiData.Index),
|
||
Granularity: &kpiData.Granularity,
|
||
}
|
||
|
||
// 发送到匹配的网元
|
||
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(kpiData.RmUid)
|
||
// custom kpi report to FE
|
||
kpiCEvent := map[string]any{
|
||
// kip_id ...
|
||
"neType": kpiData.NEType,
|
||
"neId": neInfo.NeId,
|
||
"neName": kpiData.NEName,
|
||
"rmUID": kpiData.RmUid,
|
||
"startIndex": kpiData.Index,
|
||
"timeGroup": kpiData.Date[:10] + " " + kpiData.EndTime,
|
||
"createdAt": kpiData.CreatedAt,
|
||
"granularity": kpiData.Granularity,
|
||
}
|
||
kpiCList := kpi_c_title.GetActiveKPICList(kpiData.NEType)
|
||
for _, k := range kpiCList {
|
||
result, err := evaluate.CalcExpr(*k.Expression, kpiValMap)
|
||
kpiCVal := new(kpi_c_report.KpiCVal)
|
||
kpiCVal.KPIID = *k.KpiID
|
||
if err != nil {
|
||
kpiCVal.Value = 0.0
|
||
kpiCVal.Err = err.Error()
|
||
} else {
|
||
kpiCVal.Value = result
|
||
}
|
||
|
||
report.KpiValues = append(report.KpiValues, *kpiCVal)
|
||
|
||
// set KPIC event kpiid and value
|
||
kpiCEvent[kpiCVal.KPIID] = kpiCVal.Value
|
||
}
|
||
|
||
// KPI自定义指标入库
|
||
kpi_c_report.InsertKpiCReport(kpiData.NEType, report)
|
||
|
||
if neInfo.RmUID == kpiData.RmUid {
|
||
// 推送到ws订阅组
|
||
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent)
|
||
// 推送自定义KPI到ws订阅组
|
||
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent)
|
||
if neInfo.NeType == "UPF" {
|
||
wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+"_"+neInfo.NeId, kpiEvent)
|
||
// 更新UPF总流量
|
||
upValue := parse.Number(kpiEvent["UPF.03"])
|
||
downValue := parse.Number(kpiEvent["UPF.06"])
|
||
neDataService.NewPerfKPI.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue)
|
||
}
|
||
}
|
||
|
||
// 推送到redis队列
|
||
err = redisqueue.AddNbiKpiQueue(kpiData.NEType, strconv.Itoa(kpiData.ID))
|
||
if err != nil {
|
||
log.Warn("Failed to AddNbiKpiQueue:", err)
|
||
}
|
||
|
||
services.ResponseStatusOK204NoContent(w)
|
||
}
|
||
|
||
type MeasureTask struct {
|
||
Tasks []Task `json:"Tasks"`
|
||
NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */
|
||
}
|
||
|
||
type Task struct {
|
||
Id int `json:"Id"`
|
||
|
||
StartTime string `json:"StartTime"`
|
||
EndTime string `json:"EndTime"`
|
||
|
||
Schedule struct {
|
||
Type string `json:"Type"` // 计划类型:Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行
|
||
Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31]
|
||
Periods []dborm.Period `json:"Periods"`
|
||
/*
|
||
Periods []struct {
|
||
Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍
|
||
End string `json:"End"` //零点加测量粒度的整数倍
|
||
} `json:"Periods"`
|
||
*/
|
||
} `json:"Schedule"`
|
||
|
||
GranulOption string `json:"GranulOption"` // 测量粒度选项:15M/30M/60M/24H
|
||
KPISet []dborm.KpiSetJ `json:"KPISet"`
|
||
/*
|
||
KPISet []struct {
|
||
Code string `json:"Code"` // 统计编码 如:SMFHA01
|
||
KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"]
|
||
} `json:"KPISet"`
|
||
*/
|
||
}
|
||
|
||
type MeasureReport struct {
|
||
Id int `json:"Id"`
|
||
TimeStamp string `json:"TimeStamp"`
|
||
NeName string `json:"NeName"`
|
||
RmUID string `json:"rmUID"`
|
||
NeType string `json:"NeType"`
|
||
|
||
Report struct {
|
||
Period struct {
|
||
StartTime string `json:"StartTime"`
|
||
EndTime string `json:"EndTime"`
|
||
} `json:"Period"`
|
||
|
||
Datas []struct {
|
||
Code string `json:"Code"` // 统计编码 如:SMFHA01
|
||
KPIs []struct {
|
||
KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
|
||
KPIValues []struct {
|
||
Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims"
|
||
Value int64 `json:"Value"`
|
||
} `json:"KPIValues"`
|
||
} `json:"KPIs"`
|
||
} `json:"Datas"`
|
||
} `json:"Report"`
|
||
}
|
||
|
||
type MeasureData struct {
|
||
// Id int `json:"id" xorm:"pk 'id' autoincr"`
|
||
Id int `json:"id" xorm:"-"`
|
||
Date string `json:"date" xorm:"date"`
|
||
TaskId int `json:"taskId"`
|
||
NeType string `json:"neType" xorm:"ne_type"`
|
||
NeName string `json:"neName" xorm:"ne_name"`
|
||
RmUid string `json:"rmUid" xorm:"rm_uid"`
|
||
GranulOption string `json:"granulOption" xorm:"granul_option"`
|
||
StartTime string `json:"startTime"`
|
||
EndTime string `json:"endTime"`
|
||
KpiCode string `json:"kpiCode" xorm:"kpi_code"`
|
||
KpiId string `json:"kpiId" xorm:"kpi_id"`
|
||
KpiExt string `json:"kpiExt" xorm:"kpi_ext"`
|
||
Value int64 `json:"value"`
|
||
Timestamp string `json:"timestamp"`
|
||
}
|
||
|
||
// process measure report from NFs
|
||
func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||
log.Debug("PostMeasureReportFromNF processing... ")
|
||
|
||
// vars := mux.Vars(r)
|
||
// neType := vars["elementTypeValue"]
|
||
vars := mux.Vars(r)
|
||
apiVer := vars["apiVersion"]
|
||
if apiVer != global.ApiVersionV1 {
|
||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
|
||
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||
if err != nil {
|
||
log.Error("Faile to io.ReadAll: ", err)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
log.Debug("Request body:", string(body))
|
||
measureReport := new(MeasureReport)
|
||
_ = json.Unmarshal(body, &measureReport)
|
||
log.Debug("measureReport:", measureReport)
|
||
|
||
session := xEngine.NewSession()
|
||
defer session.Close()
|
||
measureData := new(MeasureData)
|
||
layout := global.DateTime
|
||
measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime)
|
||
measureData.TaskId = measureReport.Id
|
||
measureData.StartTime = measureReport.Report.Period.StartTime
|
||
measureData.EndTime = measureReport.Report.Period.EndTime
|
||
measureData.NeType = measureReport.NeType
|
||
measureData.NeName = measureReport.NeName
|
||
measureData.RmUid = measureReport.RmUID
|
||
measureData.GranulOption, _ = dborm.XormGetSingleCol("measure_task", "granul_option", fmt.Sprintf("id=%d", measureReport.Id))
|
||
t, _ := strconv.ParseInt(measureReport.TimeStamp, 10, 64)
|
||
timestamp := time.Unix(t, 0)
|
||
log.Debug("timestamp:", timestamp.Format(layout))
|
||
measureData.Timestamp = timestamp.Format(layout)
|
||
log.Debug("Datas:", measureReport.Report.Datas)
|
||
for _, d := range measureReport.Report.Datas {
|
||
measureData.KpiCode = d.Code
|
||
|
||
log.Debug("KPIs:", d.KPIs)
|
||
for _, k := range d.KPIs {
|
||
measureData.KpiId = k.KPIID
|
||
|
||
log.Debug("KPIValues:", k.KPIValues)
|
||
if len(k.KPIValues) != 0 {
|
||
for _, v := range k.KPIValues {
|
||
measureData.KpiExt = v.Name
|
||
measureData.Value = v.Value
|
||
log.Debug("measureData:", measureData)
|
||
|
||
affected, err := session.Insert(measureData)
|
||
if err != nil && affected <= 0 {
|
||
log.Error("Failed to insert measure_data:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
}
|
||
} else {
|
||
measureData.Value = 0
|
||
log.Debug("measureData:", measureData)
|
||
|
||
affected, err := session.Insert(measureData)
|
||
if err != nil && affected <= 0 {
|
||
log.Error("Failed to insert measure_data:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
services.ResponseStatusOK204NoContent(w)
|
||
}
|
||
|
||
func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||
log.Debug("PostMeasureTaskToNF processing... ")
|
||
|
||
// _, err := services.CheckFrontValidRequest(w, r)
|
||
// if err != nil {
|
||
// log.Error("Request error:", err)
|
||
// return
|
||
// }
|
||
|
||
vars := mux.Vars(r)
|
||
neType := vars["elementTypeValue"]
|
||
params := r.URL.Query()
|
||
taskIds := params["id"]
|
||
log.Debug("taskIds:", taskIds)
|
||
|
||
var response *resty.Response
|
||
client := resty.New()
|
||
measureTask := new(MeasureTask)
|
||
measureTask.Tasks = make([]Task, 1)
|
||
for _, taskId := range taskIds {
|
||
id, _ := strconv.Atoi(taskId)
|
||
task, err := dborm.GetMeasureTask(id)
|
||
if err != nil {
|
||
log.Error("Failed to connect database: ", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
log.Debug("Table Task:", task)
|
||
|
||
measureTask.Tasks[0].Id = task.Id
|
||
measureTask.Tasks[0].StartTime = task.StartTime
|
||
measureTask.Tasks[0].EndTime = task.EndTime
|
||
// v := new(dborm.ScheduleJson)
|
||
// _ = json.Unmarshal(task.Schedule, v)
|
||
// measureTask.Task[0].Schedule.Type = v.Type
|
||
// measureTask.Task[0].Schedule.Days = v.Days
|
||
if len(task.Schedule) >= 1 {
|
||
measureTask.Tasks[0].Schedule.Type = task.Schedule[0].Type
|
||
measureTask.Tasks[0].Schedule.Days = task.Schedule[0].Days
|
||
}
|
||
//v := new(dborm.ScheduleJ)
|
||
//_ = json.Unmarshal(task.Schedule, v)
|
||
measureTask.Tasks[0].Schedule.Periods = task.Periods
|
||
measureTask.Tasks[0].GranulOption = task.GranulOption
|
||
|
||
measureTask.Tasks[0].KPISet = task.KpiSet
|
||
ips, err := global.GetIps()
|
||
if err != nil {
|
||
log.Error("Failed to get local IP:", err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
log.Debug("ips:", ips)
|
||
|
||
measureTask.NotifyUrl = global.SetNotifyUrl(ips[0], config.GetYamlConfig().Rest[0].Port, fmt.Sprintf(MeasureReportFmt, neType))
|
||
log.Debug("Measure Task to NF:", measureTask)
|
||
|
||
if len(task.NeIds) == 0 {
|
||
var neInfos []dborm.NeInfo
|
||
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
|
||
if err != nil {
|
||
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
for _, neInfo := range neInfos {
|
||
task.NeIds = append(task.NeIds, neInfo.NeId)
|
||
}
|
||
}
|
||
|
||
for _, neId := range task.NeIds {
|
||
var err error
|
||
neInfo, err := dborm.XormGetNeInfo(neType, neId)
|
||
if err != nil {
|
||
log.Error("Failed to dborm.XormGetNeInfo:", err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
if neInfo == nil {
|
||
err := fmt.Errorf("not found target NE neType=%s, neId=%s", neType, neId)
|
||
log.Error(err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
|
||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||
log.Debug("requestURI2NF: POST ", requestURI2NF)
|
||
|
||
switch task.Status {
|
||
case dborm.MeasureTaskStatusInactive:
|
||
body, _ := json.Marshal(measureTask)
|
||
log.Debug("body: ", string(body))
|
||
|
||
log.Debug("User-Agent: ", config.GetDefaultUserAgent())
|
||
response, err = client.R().
|
||
EnableTrace().
|
||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||
SetBody(body).
|
||
SetContentLength(true).
|
||
Post(requestURI2NF)
|
||
|
||
if err != nil {
|
||
log.Error("Post to NF failed:", err)
|
||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||
return
|
||
}
|
||
log.Debug("response info: ")
|
||
log.Debug("Status Code:", response.StatusCode())
|
||
log.Debug("Status:", response.Status())
|
||
log.Debug("Proto:", response.Proto())
|
||
log.Debug("Time:", response.Time())
|
||
log.Debug("Received At:", response.ReceivedAt())
|
||
log.Debug("Size:", response.Size())
|
||
|
||
case dborm.MeasureTaskStatusSuspend:
|
||
body, _ := json.Marshal(measureTask)
|
||
log.Debug("body: ", string(body))
|
||
response, err = client.R().
|
||
EnableTrace().
|
||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||
SetBody(body).
|
||
SetContentLength(true).
|
||
Put(requestURI2NF)
|
||
|
||
if err != nil {
|
||
log.Error("Put to NF failed:", err)
|
||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||
return
|
||
}
|
||
default:
|
||
err = fmt.Errorf("measure task status must be inactive id=%d", id)
|
||
log.Error("Unable to active measure task:", err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
|
||
log.Debug("StatusCode: ", response.StatusCode())
|
||
switch response.StatusCode() {
|
||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusActive
|
||
taskInfo.CreateTime = time.Now().Format(time.DateTime)
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Info("Not record affected in measure_task")
|
||
}
|
||
default:
|
||
log.Error("NF return failure to active measure task")
|
||
if response != nil {
|
||
log.Info("response body:", string(response.Body()))
|
||
services.TransportResponse(w, response.StatusCode(), response.Body())
|
||
return
|
||
} else {
|
||
err = fmt.Errorf("failed to active measure task, NF return error status=%v", response.Status())
|
||
log.Error("Unable to active measure task:", err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
services.ResponseStatusOK204NoContent(w)
|
||
}
|
||
|
||
func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||
|
||
services.ResponseStatusOK200Null(w)
|
||
}
|
||
|
||
func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||
log.Debug("DeleteMeasureTaskToNF processing... ")
|
||
|
||
// _, err := services.CheckFrontValidRequest(w, r)
|
||
// if err != nil {
|
||
// log.Error("Request error:", err)
|
||
// return
|
||
// }
|
||
|
||
vars := mux.Vars(r)
|
||
neType := vars["elementTypeValue"]
|
||
params := r.URL.Query()
|
||
taskIds := params["id"]
|
||
log.Debug("taskIds:", taskIds)
|
||
|
||
var response *resty.Response
|
||
respMsg := make(map[string]interface{})
|
||
for _, taskId := range taskIds {
|
||
id, _ := strconv.Atoi(taskId)
|
||
task, err := dborm.GetMeasureTask(id)
|
||
if err != nil {
|
||
log.Error("Failed to connect database: ", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
log.Debug("Measure Task:", task)
|
||
|
||
if len(task.NeIds) == 0 {
|
||
var neInfos []dborm.NeInfo
|
||
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
|
||
if err != nil {
|
||
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
for _, neInfo := range neInfos {
|
||
task.NeIds = append(task.NeIds, neInfo.NeId)
|
||
}
|
||
}
|
||
log.Debug("neIds:", task.NeIds)
|
||
if len(task.NeIds) == 0 {
|
||
log.Warn("Not found target NE in the measure task")
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Info("Not record affected in measure_task")
|
||
}
|
||
services.ResponseStatusOK204NoContent(w)
|
||
return
|
||
}
|
||
|
||
for _, neId := range task.NeIds {
|
||
var err error
|
||
neInfo, err := dborm.XormGetNeInfo(neType, neId)
|
||
if err != nil {
|
||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
if neInfo != nil {
|
||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||
log.Debug("requestURI2NF: DELETE ", requestURI2NF)
|
||
client := resty.New()
|
||
response, err = client.R().
|
||
EnableTrace().
|
||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||
Delete(requestURI2NF)
|
||
if err != nil {
|
||
// to avoid can't delete the task for abnormal NF
|
||
log.Error("Failed to resty delete:", err)
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Info("Not record affected in measure_task")
|
||
}
|
||
services.ResponseStatusOK204NoContent(w)
|
||
return
|
||
}
|
||
|
||
log.Info("StatusCode: ", response.StatusCode())
|
||
switch response.StatusCode() {
|
||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Infof("Not record affected in measure_task")
|
||
}
|
||
services.ResponseStatusOK204NoContent(w)
|
||
return
|
||
default:
|
||
log.Info("response body:", string(response.Body()))
|
||
body := new(map[string]interface{})
|
||
_ = json.Unmarshal(response.Body(), &body)
|
||
respMsg["error"] = body
|
||
}
|
||
} else {
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Info("Not record affected in measure_task")
|
||
}
|
||
services.ResponseStatusOK204NoContent(w)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
services.ResponseWithJson(w, response.StatusCode(), respMsg)
|
||
}
|
||
|
||
func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||
log.Debug("PatchMeasureTaskToNF processing... ")
|
||
|
||
// _, err := services.CheckFrontValidRequest(w, r)
|
||
// if err != nil {
|
||
// log.Error("Request error:", err)
|
||
// return
|
||
// }
|
||
|
||
vars := mux.Vars(r)
|
||
neType := vars["elementTypeValue"]
|
||
params := r.URL.Query()
|
||
taskIds := params["id"]
|
||
log.Debug("taskIds:", taskIds)
|
||
|
||
var response *resty.Response
|
||
respMsg := make(map[string]interface{})
|
||
for _, taskId := range taskIds {
|
||
id, _ := strconv.Atoi(taskId)
|
||
task, err := dborm.GetMeasureTask(id)
|
||
if err != nil {
|
||
log.Error("Failed to connect database: ", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
log.Debug("Measure Task:", task)
|
||
|
||
// for neType
|
||
if len(task.NeIds) == 0 {
|
||
var neInfos []dborm.NeInfo
|
||
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
|
||
if err != nil {
|
||
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
|
||
services.ResponseInternalServerError500ProcessError(w, err)
|
||
return
|
||
}
|
||
for _, neInfo := range neInfos {
|
||
task.NeIds = append(task.NeIds, neInfo.NeId)
|
||
}
|
||
}
|
||
|
||
if len(task.NeIds) == 0 {
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusInactive
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Info("Not record affected in measure_task")
|
||
}
|
||
services.ResponseStatusOK204NoContent(w)
|
||
return
|
||
}
|
||
|
||
for _, neId := range task.NeIds {
|
||
var err error
|
||
neInfo, err := dborm.XormGetNeInfo(neType, neId)
|
||
if err != nil {
|
||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
if neInfo == nil {
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusInactive
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Info("Not record affected in measure_task")
|
||
}
|
||
services.ResponseStatusOK204NoContent(w)
|
||
//services.ResponseInternalServerError500ProcessError(w, em)
|
||
return
|
||
}
|
||
|
||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||
log.Debug("requestURI2NF: PATCH ", requestURI2NF)
|
||
client := resty.New()
|
||
response, err = client.R().
|
||
EnableTrace().
|
||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||
Patch(requestURI2NF)
|
||
if err != nil {
|
||
log.Error("Patch to NF failed:", err)
|
||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||
return
|
||
}
|
||
|
||
log.Debug("StatusCode: ", response.StatusCode())
|
||
switch response.StatusCode() {
|
||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||
taskInfo := new(dborm.MeasureTask)
|
||
taskInfo.Status = dborm.MeasureTaskStatusInactive
|
||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||
if err != nil {
|
||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
} else if affected <= 0 {
|
||
log.Info("Not record affected in measure_task")
|
||
}
|
||
default:
|
||
log.Debug("response body:", string(response.Body()))
|
||
body := new(map[string]interface{})
|
||
_ = json.Unmarshal(response.Body(), &body)
|
||
respMsg["error"] = body
|
||
}
|
||
}
|
||
}
|
||
|
||
services.ResponseWithJson(w, response.StatusCode(), respMsg)
|
||
}
|
||
|
||
type Measurement struct {
|
||
// Date is the measurement date in YYYY-MM-DD format (internal use only, omitted from JSON).
|
||
Date string `json:"-" xorm:"date"`
|
||
Index int `json:"Index"` // Index of 15-minute intervals (time granularity) in a day, range: 0~95
|
||
// Timestamp represents the measurement time in Unix milliseconds, populated from measurement time.
|
||
NeName string `json:"NeName"` // 网元用户标签(UserLabel),用于显示和唯一标识网元名称
|
||
// RmUID is the unique identifier for the network element (unique per NE, used for database joins).
|
||
RmUID string `json:"RmUID" xorm:"rm_uid"`
|
||
// Dn is the Network Element Distinguished Name (DN), uniquely identifies the network element.
|
||
// Format example: "RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456"
|
||
// Dn为网元唯一标识(DN),格式示例:"RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456"
|
||
Dn string `json:"Dn" xorm:"dn"`
|
||
// TimeZone specifies the time zone for the measurement, e.g., "Asia/Shanghai" or "+08:00".
|
||
TimeZone string `json:"TimeZone"`
|
||
NeType string `json:"NeType"` // 网元类型
|
||
StartTime string `json:"StartTime"`
|
||
// List of KPIs grouped by ObjectType.
|
||
Datas []Data `json:"Datas"`
|
||
}
|
||
|
||
type KPIValue struct {
|
||
Name string `json:"Name"` // "Total" is used for aggregate values; other names (e.g., "cmnet", "ims") are used for specific measurement items such as DNN names.
|
||
Value int64 `json:"Value"`
|
||
}
|
||
|
||
type KPI struct {
|
||
KPIID string `json:"KPIID"`
|
||
KPIValues []KPIValue `json:"KPIValues"`
|
||
}
|
||
type Data struct {
|
||
ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如:SmfFunction
|
||
KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
|
||
}
|
||
|
||
// 添加配置结构体
|
||
// type PMConfig struct {
|
||
// SchemaPath string `json:"schemaPath"` // schema文件路径
|
||
// RandomMin int `json:"randomMin"` // 随机数最小值
|
||
// RandomMax int `json:"randomMax"` // 随机数最大值
|
||
// MergeMode string `json:"mergeMode"` // 合并模式: "merge"(合并), "none"(不合并), "schema"(仅schema随机值)
|
||
// }
|
||
|
||
// 定义合并模式常量
|
||
const (
|
||
MergeModeNone = "none" // 不合并,仅使用网元发送的数据
|
||
MergeModeMerge = "merge" // 合并模式,优先使用网元数据,缺失部分用随机值补充
|
||
MergeModeSchema = "schema" // 仅使用schema数据,全部用随机值
|
||
)
|
||
|
||
// 全局配置
|
||
// SchemaPath should be configured per environment; by default, it uses "/home/simon/omc.git/be.ems/config/schema".
|
||
// You can override this by setting the environment variable EMS_PM_SCHEMA_PATH.
|
||
// var pmConfig = PMConfig{
|
||
// SchemaPath: func() string {
|
||
// if envPath := os.Getenv("EMS_PM_SCHEMA_PATH"); envPath != "" {
|
||
// return envPath
|
||
// }
|
||
// return "/home/simon/omc.git/be.ems/config/schema"
|
||
// }(),
|
||
// RandomMin: 1,
|
||
// RandomMax: 16,
|
||
// MergeMode: MergeModeSchema,
|
||
// }
|
||
|
||
// var pmConfig = PMConfig{
|
||
// SchemaPath: config.GetNbiPmConfig().SchemaPath,
|
||
// RandomMin: config.GetNbiPmConfig().RandomMin,
|
||
// RandomMax: config.GetNbiPmConfig().RandomMax,
|
||
// MergeMode: config.GetNbiPmConfig().MergeMode,
|
||
// }
|
||
|
||
// schema数据结构
|
||
type SchemaKPIValue struct {
|
||
Name string `json:"Name"`
|
||
Value int `json:"Value"`
|
||
}
|
||
|
||
type SchemaKPI struct {
|
||
KPIID string `json:"KPIID"`
|
||
KPIValues []SchemaKPIValue `json:"KPIValues"`
|
||
}
|
||
|
||
type SchemaObject struct {
|
||
ObjectType string `json:"ObjectType"`
|
||
KPIs []SchemaKPI `json:"KPIs"`
|
||
}
|
||
|
||
type SchemaData []SchemaObject
|
||
|
||
// 读取schema文件
|
||
func loadSchemaData(neType string) (*SchemaData, error) {
|
||
schemaFile := filepath.Join(config.GetNbiPmConfig().SchemaPath, strings.ToLower(neType)+"-nbi-pm-schema.json")
|
||
|
||
if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
|
||
log.Warnf("Schema file not found: %s", schemaFile)
|
||
return nil, nil // 文件不存在时返回 nil,不是错误
|
||
}
|
||
|
||
data, err := os.ReadFile(schemaFile)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to read schema file: %v", err)
|
||
}
|
||
|
||
var schema SchemaData
|
||
if err := json.Unmarshal(data, &schema); err != nil {
|
||
return nil, fmt.Errorf("failed to parse schema file: %v", err)
|
||
}
|
||
|
||
return &schema, nil
|
||
}
|
||
|
||
// 生成随机值
|
||
func generateRandomKPIValue() int64 {
|
||
return int64(rand.Intn(config.GetNbiPmConfig().RandomMax-config.GetNbiPmConfig().RandomMin+1) +
|
||
config.GetNbiPmConfig().RandomMin)
|
||
}
|
||
|
||
// 合并网元数据和schema数据
|
||
func mergeWithSchema(measurement *dborm.NorthboundPm, schema *SchemaData) {
|
||
if schema == nil {
|
||
return
|
||
}
|
||
|
||
switch config.GetNbiPmConfig().MergeMode {
|
||
case MergeModeNone:
|
||
// 不合并,直接返回
|
||
log.Debug("Merge mode is 'none', skipping schema merge")
|
||
return
|
||
|
||
case MergeModeSchema:
|
||
// 仅使用schema数据,清空原有数据,全部用随机值
|
||
log.Debug("Merge mode is 'schema', replacing all data with schema random values")
|
||
measurement.Datas = nil
|
||
generateSchemaOnlyData(measurement, schema)
|
||
return
|
||
|
||
case MergeModeMerge:
|
||
// 合并模式,优先使用网元数据,缺失部分用随机值补充
|
||
log.Debug("Merge mode is 'merge', merging NE data with schema")
|
||
mergeNeDataWithSchema(measurement, schema)
|
||
return
|
||
|
||
default:
|
||
log.Warnf("Unknown merge mode: %s, using default merge mode", config.GetNbiPmConfig().MergeMode)
|
||
mergeNeDataWithSchema(measurement, schema)
|
||
}
|
||
}
|
||
|
||
// 仅使用schema数据生成随机值
|
||
func generateSchemaOnlyData(measurement *dborm.NorthboundPm, schema *SchemaData) {
|
||
for _, schemaObj := range *schema {
|
||
newData := struct {
|
||
ObjectType string `json:"ObjectType" xorm:"object_type"`
|
||
PmDatas []struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
} `json:"KPIs" xorm:"pm_datas"`
|
||
}{
|
||
ObjectType: schemaObj.ObjectType,
|
||
PmDatas: []struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
}{},
|
||
}
|
||
|
||
// 遍历schema中的KPI
|
||
for _, schemaKPI := range schemaObj.KPIs {
|
||
newKPI := struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
}{
|
||
PmName: schemaKPI.KPIID,
|
||
SubDatas: []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
}{},
|
||
}
|
||
|
||
// 为每个KPI值生成随机数
|
||
for _, schemaValue := range schemaKPI.KPIValues {
|
||
randomValue := generateRandomKPIValue()
|
||
newSubData := struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
}{
|
||
SN: schemaValue.Name,
|
||
SV: randomValue,
|
||
}
|
||
newKPI.SubDatas = append(newKPI.SubDatas, newSubData)
|
||
log.Debugf("Generated schema random value for %s.%s: %d",
|
||
schemaKPI.KPIID, schemaValue.Name, randomValue)
|
||
}
|
||
|
||
if len(newKPI.SubDatas) > 0 {
|
||
newData.PmDatas = append(newData.PmDatas, newKPI)
|
||
}
|
||
}
|
||
|
||
if len(newData.PmDatas) > 0 {
|
||
measurement.Datas = append(measurement.Datas, newData)
|
||
log.Debugf("Created ObjectType with schema data: %s (%d KPIs)",
|
||
schemaObj.ObjectType, len(newData.PmDatas))
|
||
}
|
||
}
|
||
|
||
log.Debugf("Schema-only mode: generated %d object types", len(measurement.Datas))
|
||
}
|
||
|
||
// 合并网元数据和schema数据(原有逻辑)
|
||
func mergeNeDataWithSchema(measurement *dborm.NorthboundPm, schema *SchemaData) {
|
||
// 创建网元已有数据的映射,用于快速查找
|
||
neDataMap := make(map[string]map[string]map[string]int64) // ObjectType -> KPIID -> Name -> Value
|
||
|
||
for _, data := range measurement.Datas {
|
||
if neDataMap[data.ObjectType] == nil {
|
||
neDataMap[data.ObjectType] = make(map[string]map[string]int64)
|
||
}
|
||
|
||
for _, pmData := range data.PmDatas {
|
||
if neDataMap[data.ObjectType][pmData.PmName] == nil {
|
||
neDataMap[data.ObjectType][pmData.PmName] = make(map[string]int64)
|
||
}
|
||
|
||
for _, subData := range pmData.SubDatas {
|
||
neDataMap[data.ObjectType][pmData.PmName][subData.SN] = subData.SV
|
||
}
|
||
}
|
||
}
|
||
|
||
log.Debugf("Original measurement data contains %d object types", len(measurement.Datas))
|
||
|
||
// 遍历schema,补充缺失的数据
|
||
for _, schemaObj := range *schema {
|
||
// 查找或创建对应的ObjectType
|
||
var targetData *struct {
|
||
ObjectType string `json:"ObjectType" xorm:"object_type"`
|
||
PmDatas []struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
} `json:"KPIs" xorm:"pm_datas"`
|
||
}
|
||
|
||
for i := range measurement.Datas {
|
||
if measurement.Datas[i].ObjectType == schemaObj.ObjectType {
|
||
targetData = &measurement.Datas[i]
|
||
break
|
||
}
|
||
}
|
||
// 如果没找到对应的ObjectType,创建新的
|
||
if targetData == nil {
|
||
newData := struct {
|
||
ObjectType string `json:"ObjectType" xorm:"object_type"`
|
||
PmDatas []struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
} `json:"KPIs" xorm:"pm_datas"`
|
||
}{
|
||
ObjectType: schemaObj.ObjectType,
|
||
PmDatas: []struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
}{},
|
||
}
|
||
measurement.Datas = append(measurement.Datas, newData)
|
||
targetData = &measurement.Datas[len(measurement.Datas)-1]
|
||
log.Debugf("Created new ObjectType: %s", schemaObj.ObjectType)
|
||
}
|
||
|
||
// 遍历schema中的KPI
|
||
for _, schemaKPI := range schemaObj.KPIs {
|
||
// 查找是否已存在该KPI
|
||
var targetKPI *struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
}
|
||
|
||
for i := range targetData.PmDatas {
|
||
if targetData.PmDatas[i].PmName == schemaKPI.KPIID {
|
||
targetKPI = &targetData.PmDatas[i]
|
||
break
|
||
}
|
||
}
|
||
|
||
// 如果没找到对应的KPI,创建新的
|
||
if targetKPI == nil {
|
||
newKPI := struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
}{
|
||
PmName: schemaKPI.KPIID,
|
||
SubDatas: []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
}{},
|
||
}
|
||
targetData.PmDatas = append(targetData.PmDatas, newKPI)
|
||
targetKPI = &targetData.PmDatas[len(targetData.PmDatas)-1]
|
||
log.Debugf("Created new KPI: %s", schemaKPI.KPIID)
|
||
}
|
||
|
||
// 遍历schema中的KPI值
|
||
for _, schemaValue := range schemaKPI.KPIValues {
|
||
// 检查是否已存在该值
|
||
exists := false
|
||
for i := range targetKPI.SubDatas {
|
||
if targetKPI.SubDatas[i].SN == schemaValue.Name {
|
||
exists = true
|
||
// 如果网元没有提供该值,使用随机值
|
||
if neDataMap[schemaObj.ObjectType] == nil ||
|
||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID] == nil ||
|
||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name] == 0 {
|
||
targetKPI.SubDatas[i].SV = generateRandomKPIValue()
|
||
log.Debugf("Updated KPI %s.%s with random value: %d",
|
||
schemaKPI.KPIID, schemaValue.Name, targetKPI.SubDatas[i].SV)
|
||
}
|
||
break
|
||
}
|
||
}
|
||
|
||
// 如果不存在,添加新的值
|
||
if !exists {
|
||
var value int64
|
||
// 优先使用网元发送的值
|
||
if neDataMap[schemaObj.ObjectType] != nil &&
|
||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID] != nil &&
|
||
neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name] != 0 {
|
||
value = neDataMap[schemaObj.ObjectType][schemaKPI.KPIID][schemaValue.Name]
|
||
log.Debugf("Using NE provided value for %s.%s: %d",
|
||
schemaKPI.KPIID, schemaValue.Name, value)
|
||
} else {
|
||
value = generateRandomKPIValue()
|
||
log.Debugf("Generated random value for %s.%s: %d",
|
||
schemaKPI.KPIID, schemaValue.Name, value)
|
||
}
|
||
|
||
newSubData := struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
}{
|
||
SN: schemaValue.Name,
|
||
SV: value,
|
||
}
|
||
targetKPI.SubDatas = append(targetKPI.SubDatas, newSubData)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Add this function before mergeNeDataWithSchema
|
||
func mergeDuplicateObjectTypes(measurement *dborm.NorthboundPm) {
|
||
if len(measurement.Datas) <= 1 {
|
||
return
|
||
}
|
||
|
||
// Create a map to group data by ObjectType
|
||
objectTypeMap := make(map[string]*struct {
|
||
ObjectType string `json:"ObjectType" xorm:"object_type"`
|
||
PmDatas []struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
} `json:"KPIs" xorm:"pm_datas"`
|
||
})
|
||
|
||
// Group data by ObjectType
|
||
for _, data := range measurement.Datas {
|
||
if existingData, exists := objectTypeMap[data.ObjectType]; exists {
|
||
// Merge PmDatas - check for duplicate KPIs
|
||
kpiMap := make(map[string]*struct {
|
||
PmName string `json:"KPIID" xorm:"pm_name"`
|
||
SubDatas []struct {
|
||
SN string `json:"Name" xorm:"sn"`
|
||
SV int64 `json:"Value" xorm:"sv"`
|
||
} `json:"KPIValues" xorm:"sub_datas"`
|
||
})
|
||
|
||
// Index existing KPIs
|
||
for i := range existingData.PmDatas {
|
||
kpiMap[existingData.PmDatas[i].PmName] = &existingData.PmDatas[i]
|
||
}
|
||
|
||
// Merge new KPIs
|
||
for _, newPmData := range data.PmDatas {
|
||
if existingKpi, exists := kpiMap[newPmData.PmName]; exists {
|
||
// Merge SubDatas for existing KPI
|
||
subDataMap := make(map[string]bool)
|
||
for _, subData := range existingKpi.SubDatas {
|
||
subDataMap[subData.SN] = true
|
||
}
|
||
|
||
// Add new SubDatas that don't exist
|
||
for _, newSubData := range newPmData.SubDatas {
|
||
if !subDataMap[newSubData.SN] {
|
||
existingKpi.SubDatas = append(existingKpi.SubDatas, newSubData)
|
||
}
|
||
}
|
||
} else {
|
||
// Add new KPI
|
||
existingData.PmDatas = append(existingData.PmDatas, newPmData)
|
||
}
|
||
}
|
||
} else {
|
||
// Create new entry for this ObjectType
|
||
objectTypeMap[data.ObjectType] = &data
|
||
}
|
||
}
|
||
|
||
// Replace measurement.Datas with merged data
|
||
measurement.Datas = nil
|
||
for _, mergedData := range objectTypeMap {
|
||
measurement.Datas = append(measurement.Datas, *mergedData)
|
||
}
|
||
|
||
log.Debugf("Merged duplicate ObjectTypes: original count %d, merged count %d",
|
||
len(measurement.Datas), len(objectTypeMap))
|
||
}
|
||
|
||
// process measurement post message from NFs
|
||
func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
|
||
log.Debug("PostMeasurementFromNF processing... ")
|
||
|
||
vars := mux.Vars(r)
|
||
apiVer := vars["apiVersion"]
|
||
if apiVer != global.ApiVersionV1 {
|
||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
|
||
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||
if err != nil {
|
||
log.Error("Faile to io.ReadAll: ", err)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
//log.Debug("Request body:", string(body))
|
||
measurement := new(dborm.NorthboundPm)
|
||
_ = json.Unmarshal(body, &measurement)
|
||
|
||
// Merge duplicate ObjectTypes before processing
|
||
mergeDuplicateObjectTypes(measurement)
|
||
|
||
neInfo, err := dborm.XormGetNeInfoByRmUID(measurement.NeType, measurement.RmUID)
|
||
if err != nil {
|
||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
if neInfo == nil {
|
||
log.Error("Not found target NE neType:", measurement.NeType, "RmUID:", measurement.RmUID)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
measurement.NeName = neInfo.NeName
|
||
measurement.Dn = neInfo.Dn
|
||
|
||
// 加载schema数据并进行合并
|
||
if config.GetNbiPmConfig().MergeMode != MergeModeNone {
|
||
log.Debugf("Loading schema for neType: %s (mode: %s)", measurement.NeType, config.GetNbiPmConfig().MergeMode)
|
||
schema, err := loadSchemaData(measurement.NeType)
|
||
if err != nil {
|
||
log.Warnf("Failed to load schema for %s: %v", measurement.NeType, err)
|
||
} else if schema != nil {
|
||
log.Debugf("Successfully loaded schema for %s, processing with mode: %s", measurement.NeType, config.GetNbiPmConfig().MergeMode)
|
||
mergeWithSchema(measurement, schema)
|
||
log.Debug("Data processing completed")
|
||
} else {
|
||
log.Debugf("No schema file found for %s, using original data", measurement.NeType)
|
||
}
|
||
}
|
||
|
||
session := xEngine.NewSession()
|
||
defer session.Close()
|
||
|
||
layout := time.RFC3339
|
||
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
|
||
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
|
||
|
||
affected, err := session.Table("nbi_pm").Insert(measurement)
|
||
if err != nil && affected <= 0 {
|
||
log.Error("Failed to insert nbi_pm:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
|
||
// 推送到redis队列
|
||
err = redisqueue.AddNbiPMQueue(strconv.Itoa(measurement.Id))
|
||
if err != nil {
|
||
log.Warn("Failed to AddNbiPMQueue:", err)
|
||
}
|
||
|
||
services.ResponseStatusOK204NoContent(w)
|
||
}
|
||
|
||
// get measurement message from NFs
|
||
func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
|
||
log.Debug("GetMeasurementFromNF processing... ")
|
||
|
||
vars := mux.Vars(r)
|
||
apiVer := vars["apiVersion"]
|
||
if apiVer != global.ApiVersionV1 {
|
||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
|
||
neType := vars["elementTypeValue"]
|
||
if neType == "" {
|
||
log.Error("elementTypeValue is null.")
|
||
services.ResponseNotFound404UriNotExist(w, r)
|
||
return
|
||
}
|
||
params := r.URL.Query()
|
||
neIds := params["ne_id"]
|
||
if len(neIds) == 0 {
|
||
log.Error("ne_id NOT FOUND")
|
||
services.ResponseBadRequest400WrongParamValue(w)
|
||
return
|
||
}
|
||
log.Debugf("neType: %s neId:%s", neType, neIds)
|
||
|
||
neInfo := new(dborm.NeInfo)
|
||
neInfo, err := dborm.XormGetNeInfo(neType, neIds[0])
|
||
if err != nil {
|
||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
|
||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||
log.Debug("requestURI2NF: GET ", requestURI2NF)
|
||
|
||
client := resty.New()
|
||
response, err := client.R().
|
||
EnableTrace().
|
||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||
Get(requestURI2NF)
|
||
if err != nil {
|
||
log.Error("Failed to Get from NF:", err)
|
||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||
return
|
||
}
|
||
|
||
respMsg := make(map[string]interface{})
|
||
switch response.StatusCode() {
|
||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||
log.Debug("response:", response)
|
||
measurement := new(dborm.NorthboundPm)
|
||
_ = json.Unmarshal(response.Body(), &measurement)
|
||
|
||
// Merge duplicate ObjectTypes before processing
|
||
mergeDuplicateObjectTypes(measurement)
|
||
|
||
// Load schema and merge if needed
|
||
if config.GetNbiPmConfig().MergeMode != MergeModeNone {
|
||
log.Debugf("Loading schema for neType: %s (mode: %s)", measurement.NeType, config.GetNbiPmConfig().MergeMode)
|
||
schema, err := loadSchemaData(measurement.NeType)
|
||
if err != nil {
|
||
log.Warnf("Failed to load schema for %s: %v", measurement.NeType, err)
|
||
} else if schema != nil {
|
||
log.Debugf("Successfully loaded schema for %s, processing with mode: %s", measurement.NeType, config.GetNbiPmConfig().MergeMode)
|
||
mergeWithSchema(measurement, schema)
|
||
log.Debug("Data processing completed")
|
||
} else {
|
||
log.Debugf("No schema file found for %s, using original data", measurement.NeType)
|
||
}
|
||
}
|
||
|
||
session := dborm.DbClient.XEngine.NewSession()
|
||
defer session.Close()
|
||
|
||
layout := time.RFC3339
|
||
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
|
||
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
|
||
affected, err := session.Table("nbi_pm").Insert(measurement)
|
||
if err != nil && affected <= 0 {
|
||
log.Error("Failed to insert nbi_pm:", err)
|
||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||
return
|
||
}
|
||
default:
|
||
log.Debug("response body:", string(response.Body()))
|
||
body := new(map[string]interface{})
|
||
_ = json.Unmarshal(response.Body(), &body)
|
||
respMsg["error"] = body
|
||
}
|
||
|
||
services.ResponseWithJson(w, response.StatusCode(), respMsg)
|
||
}
|