package pm import ( "encoding/json" "fmt" "io" "math" "net/http" "strconv" "strings" "time" "be.ems/features/pm/kpi_c_report" "be.ems/features/pm/kpi_c_title" "be.ems/lib/config" "be.ems/lib/core/ctx" "be.ems/lib/dborm" evaluate "be.ems/lib/eval" "be.ems/lib/global" "be.ems/lib/log" "be.ems/lib/services" "be.ems/src/framework/database/db" "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/parse" neDataModel "be.ems/src/modules/network_data/model" neDataService "be.ems/src/modules/network_data/service" neService "be.ems/src/modules/network_element/service" wsService "be.ems/src/modules/ws/service" "github.com/go-resty/resty/v2" "github.com/gorilla/mux" ) type Response struct { Data interface{} `json:"data"` } type KpiReport struct { Timestamp string `json:"TimeStamp"` Task struct { Period struct { StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` } `json:"Period"` NE struct { NEName string `json:"NEName"` RmUID string `json:"rmUID"` NeType string `json:"NeType"` KPIs []struct { KPIID string `json:"KPIID"` Value int64 `json:"Value"` Err string `json:"Err"` } `json:"KPIs"` } `json:"NE"` } `json:"Task"` } type GoldKpi struct { // Id int `json:"-" xorm:"pk 'id' autoincr"` Date string `json:"date" xorm:"date"` Index int `json:"index"` Granularity int8 `json:"granularity"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` NEName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` NEType string `json:"neType" xorm:"ne_type"` KpiId string `json:"kpiId" xorm:"kpi_id"` Value int64 `json:"value"` Error string `json:"error"` Timestamp string `json:"timestamp"` } type KpiData struct { ID int `json:"id" xorm:"pk 'id' '<-' autoincr"` NEType string `json:"neType" xorm:"ne_type"` NEName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` Date string `json:"date" xorm:"date"` StartTime string `json:"startTime" xorm:"start_time"` EndTime string `json:"endTime" xorm:"end_time"` Index int `json:"index" xorm:"index"` Granularity int64 `json:"granularity" xorm:"granularity"` KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"` //CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"` CreatedAt int64 `json:"createdAt" xorm:"'created_at'"` } type KPIVal struct { KPIID string `json:"kpi_id" xorm:"kpi_id"` Value int64 `json:"value" xorm:"value"` Err string `json:"err" xorm:"err"` } var ( // performance management PerformanceUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}" MeasureTaskUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask" MeasureReportUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport" MeasureReportFmt = config.DefaultUriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport" MeasurementUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}" UriMeasureTask = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}" // performance management CustomPerformanceUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}" CustomMeasureTaskUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask" CustomMeasureReportUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport" CustomMeasureReportFmt = config.UriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport" CustomMeasurementUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}" CustomUriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}" ) func GetDateFromTimeString(fmtString string, timeString string) string { t, _ := time.ParseInLocation(fmtString, timeString, time.Local) return t.Format("2006-01-02") } func GetDateTimeFromTimeString(fmtString string, timeString string) string { t, _ := time.ParseInLocation(fmtString, timeString, time.Local) return t.Format(global.DateTime) } // process KPI report post message from NFs func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostKPIReportFromNF processing... ") apiVer := ctx.GetParam(r, "apiVersion") if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } var kpiReport KpiReport if err := ctx.ShouldBindJSON(r, &kpiReport); err != nil { services.ResponseInternalServerError500ProcessError(w, err) return } kpiIndexStr := ctx.GetParam(r, "index") // insert kpi_report table, no session saveKPIData(kpiReport, parse.Number(kpiIndexStr)) saveKPIDataC(kpiReport, parse.Number(kpiIndexStr)) services.ResponseStatusOK204NoContent(w) } // saveKPIData 存储KPI数据并推送到ws订阅组 func saveKPIData(kpiReport KpiReport, index int64) int64 { timestamp := kpiReport.Timestamp taskPeriod := kpiReport.Task.Period taskNe := kpiReport.Task.NE taskNeKPIs := kpiReport.Task.NE.KPIs // 时间数据处理 receiverTime := date.ParseStrToDate(timestamp, date.YYYY_MM_DDTHH_MM_SSZ) startTime := date.ParseStrToDate(taskPeriod.StartTime, date.YYYY_MM_DDTHH_MM_SSZ) endTime := date.ParseStrToDate(taskPeriod.EndTime, date.YYYY_MM_DDTHH_MM_SSZ) granularity := parse.Number(endTime.Sub(startTime).Seconds()) // kpi data数据json KpiValues := make([]map[string]any, 0) for _, v := range taskNeKPIs { KpiValues = append(KpiValues, map[string]any{ "kpiId": v.KPIID, "value": v.Value, "err": v.Err, }) } KpiValuesByte, _ := json.Marshal(KpiValues) kpiData := neDataModel.KpiReport{ NeType: taskNe.NeType, NeName: taskNe.NEName, RmUid: taskNe.RmUID, Date: date.ParseDateToStr(receiverTime, "2006-01-02"), StartTime: date.ParseDateToStr(startTime, "15:04:05"), EndTime: date.ParseDateToStr(endTime, "15:04:05"), Index: index, Granularity: granularity, KpiValues: string(KpiValuesByte), CreatedAt: receiverTime.UnixMilli(), // 时间戳毫秒实际记录到秒 } insertId := neDataService.NewKpiReport.Insert(kpiData) if insertId > 0 { // 指标事件对象 kpiEvent := map[string]any{ "neType": kpiData.NeType, "neName": kpiData.NeName, "rmUID": kpiData.RmUid, "startIndex": kpiData.Index, "timeGroup": kpiData.CreatedAt, // kip_id ... } for _, v := range taskNeKPIs { kpiEvent[v.KPIID] = v.Value } // 发送到匹配的网元 neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid) if neInfo.RmUID == kpiData.RmUid { // 推送到ws订阅组 wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) if neInfo.NeType == "UPF" { // 更新UPF总流量 upValue := parse.Number(kpiEvent["UPF.03"]) downValue := parse.Number(kpiEvent["UPF.06"]) neDataService.NewKpiReport.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue) } } } return insertId } // saveKPIDataC 存储自定义KPI数据并推送到ws订阅组 func saveKPIDataC(kpiReport KpiReport, index int64) int64 { timestamp := kpiReport.Timestamp taskPeriod := kpiReport.Task.Period taskNe := kpiReport.Task.NE taskNeKPIs := kpiReport.Task.NE.KPIs // 时间数据处理 receiverTime := date.ParseStrToDate(timestamp, date.YYYY_MM_DDTHH_MM_SSZ) startTime := date.ParseStrToDate(taskPeriod.StartTime, date.YYYY_MM_DDTHH_MM_SSZ) endTime := date.ParseStrToDate(taskPeriod.EndTime, date.YYYY_MM_DDTHH_MM_SSZ) granularity := parse.Number(endTime.Sub(startTime).Seconds()) // kpi data数据 KpiValues := make([]map[string]any, 0) kpiValMap := map[string]any{} for _, v := range taskNeKPIs { kpiValMap[v.KPIID] = v.Value } // 自定义kpiId数据 cTitles := kpi_c_title.GetActiveKPICList(taskNe.NeType) for _, v := range cTitles { item := map[string]any{ "kpiId": *v.KpiID, "value": 0, "err": "", } // 计算结果 result, err := evaluate.CalcExpr(*v.Expression, kpiValMap) if err != nil { item["value"] = 0 item["err"] = err.Error() } else { if *v.Unit == "%" && result > 100 { result = 100 } if *v.Unit == "%" && result < 0 { result = 0 } item["value"] = result } KpiValues = append(KpiValues, item) } KpiValuesByte, _ := json.Marshal(KpiValues) kpiData := neDataModel.KpiCReport{ NeType: taskNe.NeType, NeName: taskNe.NEName, RmUid: taskNe.RmUID, Date: date.ParseDateToStr(receiverTime, "2006-01-02"), StartTime: date.ParseDateToStr(startTime, "15:04:05"), EndTime: date.ParseDateToStr(endTime, "15:04:05"), Index: index, Granularity: granularity, KpiValues: string(KpiValuesByte), CreatedAt: receiverTime.UnixMilli(), // 时间戳毫秒实际记录到秒 } insertId := neDataService.NewKpiCReport.Insert(kpiData) if insertId > 0 { // 指标事件对象 kpiEvent := map[string]any{ "neType": kpiData.NeType, "neName": kpiData.NeName, "rmUID": kpiData.RmUid, "startIndex": kpiData.Index, "timeGroup": kpiData.CreatedAt, // kip_id ... } for _, v := range KpiValues { kpiEvent[fmt.Sprint(v["kpiId"])] = v["value"] } // 发送到匹配的网元 neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid) if neInfo.RmUID == kpiData.RmUid { // 推送自定义KPI到ws订阅组 wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiEvent) } } return insertId } // process KPI report post message from NFs 旧版 func PostKPIReportFromNFOld(w http.ResponseWriter, r *http.Request) { log.Debug("PostKPIReportFromNF processing... ") vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } //log.Trace("Request body:", string(body)) kpiReport := new(KpiReport) _ = json.Unmarshal(body, &kpiReport) //log.Trace("kpiReport:", kpiReport) layout := time.RFC3339Nano //kpiDate := GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime) kpiIndex, _ := strconv.Atoi(vars["index"]) startTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime) endTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime) // get time granularity from startTime and endTime seconds, _ := global.GetSecondDuration(startTime, endTime) var granularity int64 = 60 if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 { granularity = int64(seconds) } // insert into new kpi_report_xxx table kpiData := new(KpiData) kpiData.Date = startTime kpiData.Index = kpiIndex //stime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local) //etime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local) kpiData.StartTime = startTime kpiData.EndTime = endTime kpiData.Granularity = granularity kpiData.NEName = kpiReport.Task.NE.NEName kpiData.NEType = kpiReport.Task.NE.NeType kpiData.RmUid = kpiReport.Task.NE.RmUID kpiVal := new(KPIVal) kpiData.CreatedAt = time.Now().UnixMilli() // 黄金指标事件对象 kpiEvent := map[string]any{ // kip_id ... "neType": kpiReport.Task.NE.NeType, "neName": kpiReport.Task.NE.NEName, "rmUID": kpiReport.Task.NE.RmUID, "startIndex": kpiIndex, "timeGroup": kpiData.CreatedAt, } // for custom kpi kpiValMap := map[string]any{} for _, k := range kpiReport.Task.NE.KPIs { kpiEvent[k.KPIID] = k.Value // kip_id kpiVal.KPIID = k.KPIID kpiVal.Value = int64(k.Value) kpiVal.Err = k.Err kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal) kpiValMap[k.KPIID] = k.Value } kpiValMap["granularity"] = kpiData.Granularity // insert kpi_report table, no session tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType) // affected, err := xEngine.Table(tableName).Insert(kpiData) tx := db.DB("").Table(tableName).Create(kpiData) if tx.Error != nil && tx.RowsAffected <= 0 { log.Errorf("Failed to insert %s:%v", tableName, err) services.ResponseInternalServerError500ProcessError(w, err) return } report := kpi_c_report.KpiCReport{ NeType: &kpiData.NEType, NeName: &kpiData.NEName, RmUID: &kpiData.RmUid, Date: kpiData.Date, StartTime: &kpiData.StartTime, EndTime: &kpiData.EndTime, Index: int64(kpiData.Index), Granularity: &kpiData.Granularity, } // 发送到匹配的网元 neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid) // custom kpi report to FE kpiCEvent := map[string]any{ // kip_id ... "neType": kpiData.NEType, "neId": neInfo.NeId, "neName": kpiData.NEName, "rmUID": kpiData.RmUid, "startIndex": kpiData.Index, "timeGroup": kpiData.Date[:10] + " " + kpiData.EndTime, "createdAt": kpiData.CreatedAt, "granularity": kpiData.Granularity, } // kpiCList := kpi_c_title.GetActiveKPICList(kpiData.NEType) // for _, k := range kpiCList { // result, err := evaluate.CalcExpr(*k.Expression, kpiValMap) // kpiCVal := new(kpi_c_report.KpiCVal) // kpiCVal.KPIID = *k.KpiID // if err != nil { // kpiCVal.Value = 0.0 // kpiCVal.Err = err.Error() // } else { // kpiCVal.Value = result // } // report.KpiValues = append(report.KpiValues, *kpiCVal) // // set KPIC event kpiid and value // kpiCEvent[kpiCVal.KPIID] = kpiCVal.Value // } // KPI自定义指标入库 kpi_c_report.InsertKpiCReport(kpiData.NEType, report) if neInfo.RmUID == kpiData.RmUid { // 推送到ws订阅组 wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) // 推送自定义KPI到ws订阅组 wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) } services.ResponseStatusOK204NoContent(w) } // type MeasureTask struct { // Tasks []Task `json:"Tasks"` // NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */ // } type ScheduleJ struct { Type string `json:"Type"` Days []int `json:"Days"` } type Period struct { Start string `json:"Start"` End string `json:"End"` } type Task struct { Id int `json:"Id"` StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` Schedule struct { Type string `json:"Type"` // 计划类型:Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行 Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31] Periods []Period `json:"Periods"` /* Periods []struct { Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍 End string `json:"End"` //零点加测量粒度的整数倍 } `json:"Periods"` */ } `json:"Schedule"` GranulOption string `json:"GranulOption"` // 测量粒度选项:15M/30M/60M/24H KPISet []KpiSetJ `json:"KPISet"` /* KPISet []struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"] } `json:"KPISet"` */ } type KpiSetJ struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 KPIs []string `json:"KPIs"` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"] } type MeasureTask struct { Id int `json:"id" xorm:"pk 'id' autoincr"` NeType string `json:"neType" xorm:"ne_type"` NeIds []string `json:"neIds" xorm:"ne_ids"` KpiSet []KpiSetJ `json:"KPISet" xorm:"kpi_set"` StartTime string `json:"startTime" xorm:"start_time"` EndTime string `json:"endTime" xorm:"end_time"` Periods []Period `json:"Periods" xorm:"periods"` Schedule []ScheduleJ `json:"Schedule" xorm:"schedule"` GranulOption string `json:"granulOption" xorm:"granul_option"` Status string `json:"status" xorm:"status"` AccountID string `json:"accountId" xorm:"account_id"` Comment string `json:"comment" xorm:"comment"` CreateTime string `json:"createTime" xorm:"create_time"` UpdateTime string `json:"updateTime" xorm:"update_time"` DeleteTime string `json:"deleteTime" xorm:"delete_time"` Tasks []Task `json:"Tasks"` NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */ } type MeasureReport struct { Id int `json:"Id"` TimeStamp string `json:"TimeStamp"` NeName string `json:"NeName"` RmUID string `json:"rmUID"` NeType string `json:"NeType"` Report struct { Period struct { StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` } `json:"Period"` Datas []struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 KPIs []struct { KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn KPIValues []struct { Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims" Value int64 `json:"Value"` } `json:"KPIValues"` } `json:"KPIs"` } `json:"Datas"` } `json:"Report"` } func GetMeasureTask(taskId int) (*MeasureTask, error) { log.Debug("GetMeasureTask processing... ") measureTask := new(MeasureTask) tx := db.DB("").Table("measure_task").Where("id=?", taskId).Find(measureTask) if tx.Error != nil { log.Error("Failed to get table measure_task from database:", tx.Error) return nil, tx.Error } log.Debug("Measure Task:", measureTask) return measureTask, nil } func XormGetActiveMeasureTask(measureTasks *[]MeasureTask) (*[]MeasureTask, error) { log.Debug("XormGetActiveMeasureTask processing... ") tx := db.DB("").Table("measure_task").Where("status='Active'").Find(measureTasks) if tx.Error != nil { log.Error("Failed to get table measure_task:", tx.Error) return nil, tx.Error } log.Debug("measureTasks:", measureTasks) return measureTasks, nil } type MeasureData struct { // Id int `json:"id" xorm:"pk 'id' autoincr"` Id int `json:"id" xorm:"-"` Date string `json:"date" xorm:"date"` TaskId int `json:"taskId"` NeType string `json:"neType" xorm:"ne_type"` NeName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` GranulOption string `json:"granulOption" xorm:"granul_option"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` KpiCode string `json:"kpiCode" xorm:"kpi_code"` KpiId string `json:"kpiId" xorm:"kpi_id"` KpiExt string `json:"kpiExt" xorm:"kpi_ext"` Value int64 `json:"value"` Timestamp string `json:"timestamp"` } // process measure report from NFs func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasureReportFromNF processing... ") // vars := mux.Vars(r) // neType := vars["elementTypeValue"] vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Debug("Request body:", string(body)) measureReport := new(MeasureReport) _ = json.Unmarshal(body, &measureReport) log.Debug("measureReport:", measureReport) measureData := new(MeasureData) layout := global.DateTime measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime) measureData.TaskId = measureReport.Id measureData.StartTime = measureReport.Report.Period.StartTime measureData.EndTime = measureReport.Report.Period.EndTime measureData.NeType = measureReport.NeType measureData.NeName = measureReport.NeName measureData.RmUid = measureReport.RmUID measureData.GranulOption, _ = dborm.XormGetSingleCol("measure_task", "granul_option", fmt.Sprintf("id=%d", measureReport.Id)) t, _ := strconv.ParseInt(measureReport.TimeStamp, 10, 64) timestamp := time.Unix(t, 0) log.Debug("timestamp:", timestamp.Format(layout)) measureData.Timestamp = timestamp.Format(layout) log.Debug("Datas:", measureReport.Report.Datas) for _, d := range measureReport.Report.Datas { measureData.KpiCode = d.Code log.Debug("KPIs:", d.KPIs) for _, k := range d.KPIs { measureData.KpiId = k.KPIID log.Debug("KPIValues:", k.KPIValues) if len(k.KPIValues) != 0 { for _, v := range k.KPIValues { measureData.KpiExt = v.Name measureData.Value = v.Value log.Debug("measureData:", measureData) err := db.DB("").Create(measureData).Error if err != nil { log.Error("Failed to insert measure_data:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } } } else { measureData.Value = 0 log.Debug("measureData:", measureData) err := db.DB("").Create(measureData).Error if err != nil { log.Error("Failed to insert measure_data:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } } } } services.ResponseStatusOK204NoContent(w) } func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasureTaskToNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response client := resty.New() measureTask := new(MeasureTask) measureTask.Tasks = make([]Task, 1) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Table Task:", task) measureTask.Tasks[0].Id = task.Id measureTask.Tasks[0].StartTime = task.StartTime measureTask.Tasks[0].EndTime = task.EndTime // v := new(dborm.ScheduleJson) // _ = json.Unmarshal(task.Schedule, v) // measureTask.Task[0].Schedule.Type = v.Type // measureTask.Task[0].Schedule.Days = v.Days if len(task.Schedule) >= 1 { measureTask.Tasks[0].Schedule.Type = task.Schedule[0].Type measureTask.Tasks[0].Schedule.Days = task.Schedule[0].Days } //v := new(dborm.ScheduleJ) //_ = json.Unmarshal(task.Schedule, v) measureTask.Tasks[0].Schedule.Periods = task.Periods measureTask.Tasks[0].GranulOption = task.GranulOption measureTask.Tasks[0].KPISet = task.KpiSet ips, err := global.GetIps() if err != nil { log.Error("Failed to get local IP:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("ips:", ips) measureTask.NotifyUrl = global.SetNotifyUrl(ips[0], 33030, fmt.Sprintf(MeasureReportFmt, neType)) log.Debug("Measure Task to NF:", measureTask) if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("Failed to dborm.XormGetNeInfo:", err) services.ResponseInternalServerError500ProcessError(w, err) return } if neInfo == nil { err := fmt.Errorf("not found target NE neType=%s, neId=%s", neType, neId) log.Error(err) services.ResponseInternalServerError500ProcessError(w, err) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: POST ", requestURI2NF) switch task.Status { case dborm.MeasureTaskStatusInactive: body, _ := json.Marshal(measureTask) log.Debug("body: ", string(body)) log.Debug("User-Agent: ", config.GetDefaultUserAgent()) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). SetContentLength(true). Post(requestURI2NF) if err != nil { log.Error("Post to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } log.Debug("response info: ") log.Debug("Status Code:", response.StatusCode()) log.Debug("Status:", response.Status()) log.Debug("Proto:", response.Proto()) log.Debug("Time:", response.Time()) log.Debug("Received At:", response.ReceivedAt()) log.Debug("Size:", response.Size()) case dborm.MeasureTaskStatusSuspend: body, _ := json.Marshal(measureTask) log.Debug("body: ", string(body)) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). SetContentLength(true). Put(requestURI2NF) if err != nil { log.Error("Put to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } default: err = fmt.Errorf("measure task status must be inactive id=%d", id) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusActive taskInfo.CreateTime = time.Now().Format(time.DateTime) affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } default: log.Error("NF return failure to active measure task") if response != nil { log.Info("response body:", string(response.Body())) services.TransportResponse(w, response.StatusCode(), response.Body()) return } else { err = fmt.Errorf("failed to active measure task, NF return error status=%v", response.Status()) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } } } services.ResponseStatusOK204NoContent(w) } func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { services.ResponseStatusOK200Null(w) } func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("DeleteMeasureTaskToNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response respMsg := make(map[string]interface{}) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Measure Task:", task) if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } log.Debug("neIds:", task.NeIds) if len(task.NeIds) == 0 { log.Warn("Not found target NE in the measure task") taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } if neInfo != nil { requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: DELETE ", requestURI2NF) client := resty.New() response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Delete(requestURI2NF) if err != nil { // to avoid can't delete the task for abnormal NF log.Error("Failed to resty delete:", err) taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } log.Info("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Infof("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return default: log.Info("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } } else { taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } } } services.ResponseWithJson(w, response.StatusCode(), respMsg) } func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PatchMeasureTaskToNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response respMsg := make(map[string]interface{}) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Measure Task:", task) // for neType if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } if len(task.NeIds) == 0 { taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } if neInfo == nil { taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) //services.ResponseInternalServerError500ProcessError(w, em) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: PATCH ", requestURI2NF) client := resty.New() response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Patch(requestURI2NF) if err != nil { log.Error("Patch to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } } } services.ResponseWithJson(w, response.StatusCode(), respMsg) } type Measurement struct { Id int `json:"-" xorm:"pk 'id' autoincr"` Date string `json:"-" xorm:"date"` Index int `json:"Index"` // 1天中测量时间粒度(如15分钟)的切片索引: 0~95 Timestamp string `json:"TimeStamp" xorm:"-"` NeName string `json:"NeName"` // UserLabel RmUID string `json:"RmUID" xorm:"rm_uid"` NeType string `json:"NeType"` // 网元类型 PmVersion string `json:"PmVersion"` // 性能数据版本号 Dn string `json:"Dn"` // (???)网元标识, 如:RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456 Period string `json:"Period"` // 测量时间粒度选项:5/15/30/60 TimeZone string `json:"TimeZone"` StartTime string `json:"StartTime"` Datas []Data `json:"Datas"` } type KPIValue struct { Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims" Value int64 `json:"Value"` } type KPI struct { KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn KPIValues []KPIValue `json:"KPIValues"` } type Data struct { ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如:SmfFunction KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn } // process measurement post message from NFs func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasurementFromNF processing... ") vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Debug("Request body:", string(body)) // measurement := new(dborm.NorthboundPm) measurement := new(dborm.NorthboundPm) _ = json.Unmarshal(body, &measurement) log.Debug("measurement:", measurement) // layout := global.DateTime layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) // tx := db.DB("").Table("nbi_pm").Create(measurement) // if tx.Error != nil && tx.RowsAffected <= 0 { // log.Error("Failed to insert nbi_pm:", err) // services.ResponseInternalServerError500DatabaseOperationFailed(w) // return // } services.ResponseStatusOK204NoContent(w) } // get measurement message from NFs func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("GetMeasurementFromNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } neType := vars["elementTypeValue"] if neType == "" { log.Error("elementTypeValue is null.") services.ResponseNotFound404UriNotExist(w, r) return } params := r.URL.Query() neIds := params["ne_id"] if len(neIds) == 0 { log.Error("ne_id NOT FOUND") services.ResponseBadRequest400WrongParamValue(w) return } log.Debugf("neType: %s neId:%s", neType, neIds) //var neInfo *dborm.NeInfo neInfo := new(dborm.NeInfo) neInfo, err := dborm.XormGetNeInfo(neType, neIds[0]) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: GET ", requestURI2NF) client := resty.New() response, err := client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Get(requestURI2NF) if err != nil { log.Error("Failed to Get from NF:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } respMsg := make(map[string]interface{}) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: log.Debug("response:", response) // measurement := new(dborm.NorthboundPm) measurement := new(dborm.NorthboundPm) _ = json.Unmarshal(response.Body(), &measurement) log.Debug("measurement:", measurement) layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) // tx := db.DB("").Table("nbi_pm").Create(measurement) // if tx.Error != nil && tx.RowsAffected <= 0 { // log.Error("Failed to insert nbi_pm:", err) // services.ResponseInternalServerError500DatabaseOperationFailed(w) // return // } default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } services.ResponseWithJson(w, response.StatusCode(), respMsg) }