package pm import ( "encoding/json" "errors" "fmt" "io" "math" "net/http" "strconv" "time" "ems.agt/lib/dborm" "ems.agt/lib/global" "ems.agt/lib/log" "ems.agt/lib/services" "ems.agt/restagent/config" "xorm.io/xorm" wsService "ems.agt/src/modules/ws/service" "github.com/go-resty/resty/v2" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/mux" ) type Response struct { Data interface{} `json:"data"` } type KpiReport struct { Timestamp string `json:"TimeStamp"` Task struct { Period struct { StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` } `json:"Period"` NE struct { NEName string `json:"NEName"` RmUID string `json:"rmUID"` NeType string `json:"NeType"` KPIs []struct { KPIID string `json:"KPIID"` Value int `json:"Value"` Err string `json:"Err"` } `json:"KPIs"` } `json:"NE"` } `json:"Task"` } type GoldKpi struct { // Id int `json:"-" xorm:"pk 'id' autoincr"` Date string `json:"date" xorm:"date"` Index int `json:"index"` Granularity int8 `json:"granularity"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` NEName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` NEType string `json:"neType" xorm:"ne_type"` KpiId string `json:"kpiId" xorm:"kpi_id"` Value int `json:"value"` Error string `json:"error"` Timestamp string `json:"timestamp"` } var ( // performance management PerformanceUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}" MeasureTaskUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask" MeasureReportUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport" MeasureReportFmt = config.DefaultUriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport" MeasurementUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}" UriMeasureTask = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}" // performance management CustomPerformanceUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}" CustomMeasureTaskUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask" CustomMeasureReportUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport" CustomMeasureReportFmt = config.UriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport" CustomMeasurementUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}" CustomUriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}" ) var xEngine *xorm.Engine type DatabaseClient struct { dbType string dbUrl string dbConnMaxLifetime time.Duration dbMaxIdleConns int dbMaxOpenConns int IsShowSQL bool XEngine *xorm.Engine } var DbClient DatabaseClient func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) error { DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local", dbUser, dbPassword, dbHost, dbPort, dbName) DbClient.dbType = dbType DbClient.dbConnMaxLifetime = 0 DbClient.dbMaxIdleConns = 0 DbClient.dbMaxOpenConns = 0 if log.GetLevel() == log.LOG_TRACE { DbClient.IsShowSQL = true } log.Debugf("dbType:%s dbUrl:%s:******@tcp(%s:%s)/%s??charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local", dbType, dbUser, dbHost, dbPort, dbName) var err error DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl) if err != nil { log.Error("Failed to connet database:", err) return err } DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime) DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns) DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns) if DbClient.IsShowSQL { DbClient.XEngine.ShowSQL(true) } xEngine = DbClient.XEngine return nil } func XormConnectDatabase(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) (*xorm.Engine, error) { sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local", dbUser, dbPassword, dbHost, dbPort, dbName) log.Debugf("dbType:%s Connect to:%s:******@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local", dbType, dbUser, dbHost, dbPort, dbName) var err error xEngine, err = xorm.NewEngine(dbType, sqlStr) //1、Create xorm engine if err != nil { log.Error("Failed to connect database:", err) return nil, err } if log.GetLevel() == log.LOG_TRACE { xEngine.ShowSQL(true) } return xEngine, nil } func GetDateFromTimeString(fmtString string, timeString string) string { t, _ := time.ParseInLocation(fmtString, timeString, time.Local) return t.Format("2006-01-02") } func GetDateTimeFromTimeString(fmtString string, timeString string) string { t, _ := time.ParseInLocation(fmtString, timeString, time.Local) return t.Format(global.DateTime) } // process alarm post message from NFs func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostKPIReportFromNF processing... ") vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Trace("Request body:", string(body)) kpiReport := new(KpiReport) _ = json.Unmarshal(body, &kpiReport) log.Trace("kpiReport:", kpiReport) session := xEngine.NewSession() defer session.Close() goldKpi := new(GoldKpi) layout := time.RFC3339Nano goldKpi.Date = GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime) goldKpi.Index, _ = strconv.Atoi(vars["index"]) goldKpi.StartTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime) goldKpi.EndTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime) // get time granularity from startTime and endTime seconds, _ := global.GetSecondDuration(goldKpi.StartTime, goldKpi.EndTime) goldKpi.Granularity = 60 if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 { goldKpi.Granularity = int8(seconds) } goldKpi.NEName = kpiReport.Task.NE.NEName goldKpi.RmUid = kpiReport.Task.NE.RmUID goldKpi.NEType = kpiReport.Task.NE.NeType goldKpi.Timestamp = global.GetFmtTimeString(layout, kpiReport.Timestamp, time.DateTime) // 黄金指标事件对象 kpiEvent := map[string]any{ // kip_id ... "neType": goldKpi.NEType, "neName": goldKpi.NEName, "startIndex": goldKpi.Index, "timeGroup": goldKpi.StartTime, } for _, k := range kpiReport.Task.NE.KPIs { kpiEvent[k.KPIID] = k.Value // kip_id goldKpi.KpiId = k.KPIID goldKpi.Value = k.Value goldKpi.Error = k.Err log.Trace("goldKpi:", goldKpi) // 启动事务 err := session.Begin() if err != nil { log.Error("Failed to Begin gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } gkpi := &GoldKpi{} _, err = session.Where("id = ?", 1).ForUpdate().Get(gkpi) if err != nil { // 回滚事务 session.Rollback() log.Error("Failed to ForUpdate gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } affected, err := session.Insert(goldKpi) if err != nil && affected <= 0 { session.Rollback() log.Error("Failed to insert gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } // 提交事务 err = session.Commit() if err != nil { log.Error("Failed to Commit gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } // 推送到ws订阅组 wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent) services.ResponseStatusOK200Null(w) } type MeasureTask struct { Tasks []Task `json:"Tasks"` NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */ } type Task struct { Id int `json:"Id"` StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` Schedule struct { Type string `json:"Type"` // 计划类型:Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行 Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31] Periods []dborm.Period `json:"Periods"` /* Periods []struct { Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍 End string `json:"End"` //零点加测量粒度的整数倍 } `json:"Periods"` */ } `json:"Schedule"` GranulOption string `json:"GranulOption"` // 测量粒度选项:15M/30M/60M/24H KPISet []dborm.KpiSetJ `json:"KPISet"` /* KPISet []struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"] } `json:"KPISet"` */ } type MeasureReport struct { Id int `json:"Id"` TimeStamp string `json:"TimeStamp"` NeName string `json:"NeName"` RmUID string `json:"rmUID"` NeType string `json:"NeType"` Report struct { Period struct { StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` } `json:"Period"` Datas []struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 KPIs []struct { KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn KPIValues []struct { Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims" Value int64 `json:"Value"` } `json:"KPIValues"` } `json:"KPIs"` } `json:"Datas"` } `json:"Report"` } type MeasureData struct { // Id int `json:"id" xorm:"pk 'id' autoincr"` Id int `json:"id" xorm:"-"` Date string `json:"date" xorm:"date"` TaskId int `json:"taskId"` NeType string `json:"neType" xorm:"ne_type"` NeName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` GranulOption string `json:"granulOption" xorm:"granul_option"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` KpiCode string `json:"kpiCode" xorm:"kpi_code"` KpiId string `json:"kpiId" xorm:"kpi_id"` KpiExt string `json:"kpiExt" xorm:"kpi_ext"` Value int64 `json:"value"` Timestamp string `json:"timestamp"` } // process measure report from NFs func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasureReportFromNF processing... ") // vars := mux.Vars(r) // neType := vars["elementTypeValue"] vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Debug("Request body:", string(body)) measureReport := new(MeasureReport) _ = json.Unmarshal(body, &measureReport) log.Debug("measureReport:", measureReport) session := xEngine.NewSession() defer session.Close() measureData := new(MeasureData) layout := global.DateTime measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime) measureData.TaskId = measureReport.Id measureData.StartTime = measureReport.Report.Period.StartTime measureData.EndTime = measureReport.Report.Period.EndTime measureData.NeType = measureReport.NeType measureData.NeName = measureReport.NeName measureData.RmUid = measureReport.RmUID measureData.GranulOption, _ = dborm.XormGetSingleCol("measure_task", "granul_option", fmt.Sprintf("id=%d", measureReport.Id)) t, _ := strconv.ParseInt(measureReport.TimeStamp, 10, 64) timestamp := time.Unix(t, 0) log.Debug("timestamp:", timestamp.Format(layout)) measureData.Timestamp = timestamp.Format(layout) log.Debug("Datas:", measureReport.Report.Datas) for _, d := range measureReport.Report.Datas { measureData.KpiCode = d.Code log.Debug("KPIs:", d.KPIs) for _, k := range d.KPIs { measureData.KpiId = k.KPIID log.Debug("KPIValues:", k.KPIValues) if len(k.KPIValues) != 0 { for _, v := range k.KPIValues { measureData.KpiExt = v.Name measureData.Value = v.Value log.Debug("measureData:", measureData) affected, err := session.Insert(measureData) if err != nil && affected <= 0 { log.Error("Failed to insert measure_data:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } } } else { measureData.Value = 0 log.Debug("measureData:", measureData) affected, err := session.Insert(measureData) if err != nil && affected <= 0 { log.Error("Failed to insert measure_data:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } } } } services.ResponseStatusOK204NoContent(w) } func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasureTaskToNF processing... ") _, err := services.CheckFrontValidRequest(w, r) if err != nil { log.Error("Request error:", err) return } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response client := resty.New() measureTask := new(MeasureTask) measureTask.Tasks = make([]Task, 1) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := dborm.GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Table Task:", task) measureTask.Tasks[0].Id = task.Id measureTask.Tasks[0].StartTime = task.StartTime measureTask.Tasks[0].EndTime = task.EndTime // v := new(dborm.ScheduleJson) // _ = json.Unmarshal(task.Schedule, v) // measureTask.Task[0].Schedule.Type = v.Type // measureTask.Task[0].Schedule.Days = v.Days if len(task.Schedule) >= 1 { measureTask.Tasks[0].Schedule.Type = task.Schedule[0].Type measureTask.Tasks[0].Schedule.Days = task.Schedule[0].Days } //v := new(dborm.ScheduleJ) //_ = json.Unmarshal(task.Schedule, v) measureTask.Tasks[0].Schedule.Periods = task.Periods measureTask.Tasks[0].GranulOption = task.GranulOption measureTask.Tasks[0].KPISet = task.KpiSet ips, err := global.GetIps() if err != nil { log.Error("Failed to get local IP:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("ips:", ips) measureTask.NotifyUrl = global.SetNotifyUrl(ips[0], config.GetYamlConfig().Rest[0].Port, fmt.Sprintf(MeasureReportFmt, neType)) log.Debug("Measure Task to NF:", measureTask) if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("Failed to dborm.XormGetNeInfo:", err) services.ResponseInternalServerError500ProcessError(w, err) return } if neInfo == nil { err := errors.New(fmt.Sprintf("not found target NE neType=%s, neId=%s", neType, neId)) log.Error(err) services.ResponseInternalServerError500ProcessError(w, err) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: POST ", requestURI2NF) switch task.Status { case dborm.MeasureTaskStatusInactive: body, _ := json.Marshal(measureTask) log.Debug("body: ", string(body)) log.Debug("User-Agent: ", config.GetDefaultUserAgent()) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). SetContentLength(true). Post(requestURI2NF) if err != nil { log.Error("Post to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } log.Debug("response info: ") log.Debug("Status Code:", response.StatusCode()) log.Debug("Status:", response.Status()) log.Debug("Proto:", response.Proto()) log.Debug("Time:", response.Time()) log.Debug("Received At:", response.ReceivedAt()) log.Debug("Size:", response.Size()) case dborm.MeasureTaskStatusSuspend: body, _ := json.Marshal(measureTask) log.Debug("body: ", string(body)) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). SetContentLength(true). Put(requestURI2NF) if err != nil { log.Error("Put to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } default: err = errors.New(fmt.Sprintf("measure task status must be inactive id=%d", id)) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusActive taskInfo.CreateTime = time.Now().Format(time.DateTime) affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } default: log.Error("NF return failure to active measure task") if response != nil { log.Info("response body:", string(response.Body())) services.TransportResponse(w, response.StatusCode(), response.Body()) return } else { err = errors.New(fmt.Sprintf("failed to active measure task, NF return error status=%v", response.Status())) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } } } services.ResponseStatusOK204NoContent(w) } func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { services.ResponseStatusOK200Null(w) } func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("DeleteMeasureTaskToNF processing... ") _, err := services.CheckFrontValidRequest(w, r) if err != nil { log.Error("Request error:", err) return } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response respMsg := make(map[string]interface{}) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := dborm.GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Measure Task:", task) if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } log.Debug("neIds:", task.NeIds) if len(task.NeIds) == 0 { log.Warn("Not found target NE in the measure task") taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } if neInfo != nil { requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: DELETE ", requestURI2NF) client := resty.New() response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Delete(requestURI2NF) if err != nil { // to avoid can't delete the task for abnormal NF log.Error("Failed to resty delete:", err) taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } log.Info("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Infof("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return default: log.Info("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } } else { taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } } } services.ResponseWithJson(w, response.StatusCode(), respMsg) } func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PatchMeasureTaskToNF processing... ") _, err := services.CheckFrontValidRequest(w, r) if err != nil { log.Error("Request error:", err) return } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response respMsg := make(map[string]interface{}) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := dborm.GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Measure Task:", task) // for neType if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } if len(task.NeIds) == 0 { taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } if neInfo == nil { em := errors.New("Not found NE info in database") log.Error(em) taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) //services.ResponseInternalServerError500ProcessError(w, em) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: PATCH ", requestURI2NF) client := resty.New() response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Patch(requestURI2NF) if err != nil { log.Error("Patch to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } } } services.ResponseWithJson(w, response.StatusCode(), respMsg) return } type Measurement struct { Id int `json:"-" xorm:"pk 'id' autoincr"` Date string `json:"-" xorm:"date"` Index int `json:"Index"` // 1天中测量时间粒度(如15分钟)的切片索引: 0~95 Timestamp string `json:"TimeStamp" xorm:"-"` NeName string `json:"NeName"` // UserLabel RmUID string `json:"RmUID" xorm:"rm_uid"` NeType string `json:"NeType"` // 网元类型 PmVersion string `json:"PmVersion"` // 性能数据版本号 Dn string `json:"Dn"` // (???)网元标识, 如:RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456 Period string `json:"Period"` // 测量时间粒度选项:5/15/30/60 TimeZone string `json:"TimeZone"` StartTime string `json:"StartTime"` Datas []Data `json:"Datas"` } type KPIValue struct { Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims" Value int64 `json:"Value"` } type KPI struct { KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn KPIValues []KPIValue `json:"KPIValues"` } type Data struct { ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如:SmfFunction KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn } // process measurement post message from NFs func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasurementFromNF processing... ") vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Debug("Request body:", string(body)) // measurement := new(dborm.NorthboundPm) measurement := new(dborm.NorthboundPm) _ = json.Unmarshal(body, &measurement) log.Debug("measurement:", measurement) session := dborm.DbClient.XEngine.NewSession() defer session.Close() // layout := global.DateTime layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) affected, err := session.Table("northbound_pm").Insert(measurement) if err != nil && affected <= 0 { log.Error("Failed to insert northbound_pm:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } services.ResponseStatusOK204NoContent(w) } // get measurement message from NFs func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("GetMeasurementFromNF processing... ") _, err := services.CheckFrontValidRequest(w, r) if err != nil { log.Error("Request error:", err) return } vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } neType := vars["elementTypeValue"] if neType == "" { log.Error("elementTypeValue is null.") services.ResponseNotFound404UriNotExist(w, r) return } params := r.URL.Query() neIds := params["ne_id"] if len(neIds) == 0 { log.Error("ne_id NOT FOUND") services.ResponseBadRequest400WrongParamValue(w) return } log.Debugf("neType: %s neId:%s", neType, neIds) //var neInfo *dborm.NeInfo neInfo := new(dborm.NeInfo) neInfo, err = dborm.XormGetNeInfo(neType, neIds[0]) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: GET ", requestURI2NF) client := resty.New() response, err := client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Get(requestURI2NF) if err != nil { log.Error("Failed to Get from NF:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } respMsg := make(map[string]interface{}) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: log.Debug("response:", response) // measurement := new(dborm.NorthboundPm) measurement := new(dborm.NorthboundPm) _ = json.Unmarshal(response.Body(), &measurement) log.Debug("measurement:", measurement) session := dborm.DbClient.XEngine.NewSession() defer session.Close() layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) affected, err := session.Table("northbound_pm").Insert(measurement) if err != nil && affected <= 0 { log.Error("Failed to insert northbound_pm:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } services.ResponseWithJson(w, response.StatusCode(), respMsg) }