package pm import ( "encoding/json" "fmt" "io" "math" "net/http" "strconv" "strings" "time" "nms_cxy/features/pm/kpi_c_report" "nms_cxy/features/pm/kpi_c_title" "nms_cxy/lib/dborm" evaluate "nms_cxy/lib/eval" "nms_cxy/lib/global" "nms_cxy/lib/log" "nms_cxy/lib/services" "nms_cxy/omc/config" neService "nms_cxy/src/modules/network_element/service" wsService "nms_cxy/src/modules/ws/service" "github.com/go-resty/resty/v2" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/mux" "xorm.io/xorm" ) type Response struct { Data interface{} `json:"data"` } type KpiReport struct { Timestamp string `json:"TimeStamp"` Task struct { Period struct { StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` } `json:"Period"` NE struct { NEName string `json:"NEName"` RmUID string `json:"rmUID"` NeType string `json:"NeType"` KPIs []struct { KPIID string `json:"KPIID"` Value int `json:"Value"` Err string `json:"Err"` } `json:"KPIs"` } `json:"NE"` } `json:"Task"` } type GoldKpi struct { // Id int `json:"-" xorm:"pk 'id' autoincr"` Date string `json:"date" xorm:"date"` Index int `json:"index"` Granularity int8 `json:"granularity"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` NEName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` NEType string `json:"neType" xorm:"ne_type"` KpiId string `json:"kpiId" xorm:"kpi_id"` Value int `json:"value"` Error string `json:"error"` Timestamp string `json:"timestamp"` } type KpiData struct { ID int `json:"id" xorm:"pk 'id' '<-' autoincr"` NEType string `json:"neType" xorm:"ne_type"` NEName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` Date string `json:"date" xorm:"date"` StartTime string `json:"startTime" xorm:"start_time"` EndTime string `json:"endTime" xorm:"end_time"` Index int `json:"index" xorm:"index"` Granularity int8 `json:"granularity" xorm:"granularity"` KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"` //CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"` CreatedAt int64 `json:"createdAt" xorm:"'created_at'"` } type KPIVal struct { KPIID string `json:"kpi_id" xorm:"kpi_id"` Value int64 `json:"value" xorm:"value"` Err string `json:"err" xorm:"err"` } var ( // performance management PerformanceUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}" MeasureTaskUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask" MeasureReportUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport" MeasureReportFmt = config.DefaultUriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport" MeasurementUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}" UriMeasureTask = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}" // performance management CustomPerformanceUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}" CustomMeasureTaskUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask" CustomMeasureReportUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport" CustomMeasureReportFmt = config.UriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport" CustomMeasurementUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}" CustomUriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}" ) var xEngine *xorm.Engine type DatabaseClient struct { dbType string dbUrl string dbConnMaxLifetime time.Duration dbMaxIdleConns int dbMaxOpenConns int IsShowSQL bool XEngine *xorm.Engine } var DbClient DatabaseClient func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error { DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s", dbUser, dbPassword, dbHost, dbPort, dbName, dbParam) DbClient.dbType = dbType DbClient.dbConnMaxLifetime = 0 DbClient.dbMaxIdleConns = 0 DbClient.dbMaxOpenConns = 0 if log.GetLevel() == log.LOG_TRACE { DbClient.IsShowSQL = true } log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl) var err error DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl) if err != nil { log.Error("Failed to connet database:", err) return err } DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime) DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns) DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns) DbClient.XEngine.DatabaseTZ = time.Local // 必须 DbClient.XEngine.TZLocation = time.Local // 必须 if DbClient.IsShowSQL { DbClient.XEngine.ShowSQL(true) } xEngine = DbClient.XEngine // exist, err := xEngine.IsTableExist("kpi_report") // if err != nil { // log.Error("Failed to IsTableExist:", err) // return err // } // if exist { // // 复制表结构到新表 // sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` AS SELECT * FROM kpi_report WHERE 1=0", "kpi_report_amf") // _, err := xEngine.Exec(sql) // if err != nil { // log.Error("Failed to Exec:", err) // return err // } // } return nil } func XormConnectDatabase(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) (*xorm.Engine, error) { sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local", dbUser, dbPassword, dbHost, dbPort, dbName) log.Debugf("dbType:%s Connect to:%s:******@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local", dbType, dbUser, dbHost, dbPort, dbName) var err error xEngine, err = xorm.NewEngine(dbType, sqlStr) //1、Create xorm engine if err != nil { log.Error("Failed to connect database:", err) return nil, err } if log.GetLevel() == log.LOG_TRACE { xEngine.ShowSQL(true) } return xEngine, nil } func GetDateFromTimeString(fmtString string, timeString string) string { t, _ := time.ParseInLocation(fmtString, timeString, time.Local) return t.Format("2006-01-02") } func GetDateTimeFromTimeString(fmtString string, timeString string) string { t, _ := time.ParseInLocation(fmtString, timeString, time.Local) return t.Format(global.DateTime) } // process KPI report post message from NFs func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostKPIReportFromNF processing... ") vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } //log.Trace("Request body:", string(body)) kpiReport := new(KpiReport) _ = json.Unmarshal(body, &kpiReport) //log.Trace("kpiReport:", kpiReport) layout := time.RFC3339Nano //kpiDate := GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime) kpiIndex, _ := strconv.Atoi(vars["index"]) startTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime) endTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime) // get time granularity from startTime and endTime seconds, _ := global.GetSecondDuration(startTime, endTime) var granularity int8 = 60 if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 { granularity = int8(seconds) } // insert into new kpi_report_xxx table kpiData := new(KpiData) kpiData.Date = startTime kpiData.Index = kpiIndex //stime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local) //etime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local) kpiData.StartTime = startTime kpiData.EndTime = endTime kpiData.Granularity = granularity kpiData.NEName = kpiReport.Task.NE.NEName kpiData.NEType = kpiReport.Task.NE.NeType kpiData.RmUid = kpiReport.Task.NE.RmUID kpiVal := new(KPIVal) kpiData.CreatedAt = time.Now().UnixMilli() // 黄金指标事件对象 kpiEvent := map[string]any{ // kip_id ... "neType": kpiReport.Task.NE.NeType, "neName": kpiReport.Task.NE.NEName, "rmUID": kpiReport.Task.NE.RmUID, "startIndex": kpiIndex, "timeGroup": kpiData.CreatedAt, } // for custom kpi kpiValMap := map[string]any{} for _, k := range kpiReport.Task.NE.KPIs { kpiEvent[k.KPIID] = k.Value // kip_id kpiVal.KPIID = k.KPIID kpiVal.Value = int64(k.Value) kpiVal.Err = k.Err kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal) kpiValMap[k.KPIID] = k.Value } kpiValMap["granularity"] = kpiData.Granularity // insert kpi_report table, no session tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType) affected, err := xEngine.Table(tableName).Insert(kpiData) if err != nil && affected <= 0 { log.Errorf("Failed to insert %s:%v", tableName, err) services.ResponseInternalServerError500ProcessError(w, err) return } report := kpi_c_report.KpiCReport{ NeType: &kpiData.NEType, NeName: &kpiData.NEName, RmUID: &kpiData.RmUid, Date: kpiData.Date, StartTime: &kpiData.StartTime, EndTime: &kpiData.EndTime, Index: int16(kpiData.Index), Granularity: &kpiData.Granularity, } // 发送到匹配的网元 neInfo := neService.NewNeInfoImpl.SelectNeInfoByRmuid(kpiData.RmUid) // custom kpi report to FE kpiCEvent := map[string]any{ // kip_id ... "neType": kpiData.NEType, "neId": neInfo.NeId, "neName": kpiData.NEName, "rmUID": kpiData.RmUid, "startIndex": kpiData.Index, "timeGroup": kpiData.Date[:10] + " " + kpiData.StartTime, "createdAt": kpiData.CreatedAt, "granularity": kpiData.Granularity, } kpiCList := kpi_c_title.GetActiveKPICList(kpiData.NEType) for _, k := range kpiCList { result, err := evaluate.CalcExpr(*k.Expression, kpiValMap) kpiCVal := new(kpi_c_report.KpiCVal) kpiCVal.KPIID = *k.KpiID if err != nil { kpiCVal.Value = 0.0 kpiCVal.Err = err.Error() } else { kpiCVal.Value = result } report.KpiValues = append(report.KpiValues, *kpiCVal) // set KPIC event kpiid and value kpiCEvent[kpiCVal.KPIID] = kpiCVal.Value } // KPI自定义指标入库 kpi_c_report.InsertKpiCReport(kpiData.NEType, report) if neInfo.RmUID == kpiData.RmUid { // 推送到ws订阅组 wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) // 推送自定义KPI到ws订阅组 wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) if neInfo.NeType == "UPF" { // 推送标识为:12_RMUID, exp: 12_4400HXUPF001 wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF+kpiReport.Task.NE.RmUID, kpiEvent) } } services.ResponseStatusOK204NoContent(w) } // PostGoldKPIFromNF 已废弃 // post kpi report from NEs, insert insto gold_kpi table, discard... func PostGoldKPIFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostKPIReportFromNF processing... ") vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Trace("Request body:", string(body)) kpiReport := new(KpiReport) _ = json.Unmarshal(body, &kpiReport) log.Trace("kpiReport:", kpiReport) session := xEngine.NewSession() defer session.Close() goldKpi := new(GoldKpi) layout := time.RFC3339Nano goldKpi.Date = GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime) goldKpi.Index, _ = strconv.Atoi(vars["index"]) goldKpi.StartTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime) goldKpi.EndTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime) // get time granularity from startTime and endTime seconds, _ := global.GetSecondDuration(goldKpi.StartTime, goldKpi.EndTime) goldKpi.Granularity = 60 if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 { goldKpi.Granularity = int8(seconds) } goldKpi.NEName = kpiReport.Task.NE.NEName goldKpi.RmUid = kpiReport.Task.NE.RmUID goldKpi.NEType = kpiReport.Task.NE.NeType goldKpi.Timestamp = global.GetFmtTimeString(layout, kpiReport.Timestamp, time.DateTime) // 黄金指标事件对象 kpiEvent := map[string]any{ // kip_id ... "neType": goldKpi.NEType, "neName": goldKpi.NEName, "rmUID": goldKpi.RmUid, "startIndex": goldKpi.Index, "timeGroup": goldKpi.StartTime, } // insert into new kpi_report_xxx table kpiData := new(KpiData) kpiData.Date = goldKpi.Date kpiData.Index = goldKpi.Index //st, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local) //et, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local) kpiData.StartTime = goldKpi.StartTime kpiData.EndTime = goldKpi.EndTime kpiData.Granularity = goldKpi.Granularity kpiData.NEName = goldKpi.NEName kpiData.NEType = goldKpi.NEType kpiData.RmUid = goldKpi.RmUid kpiVal := new(KPIVal) kpiData.CreatedAt = time.Now().UnixMilli() for _, k := range kpiReport.Task.NE.KPIs { kpiEvent[k.KPIID] = k.Value // kip_id goldKpi.KpiId = k.KPIID goldKpi.Value = k.Value goldKpi.Error = k.Err kpiVal.KPIID = k.KPIID kpiVal.Value = int64(k.Value) kpiVal.Err = k.Err kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal) //log.Trace("goldKpi:", goldKpi) // 启动事务 err := session.Begin() if err != nil { log.Error("Failed to Begin gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } gkpi := &GoldKpi{} _, err = session.Where("id = ?", 1).ForUpdate().Get(gkpi) if err != nil { // 回滚事务 session.Rollback() log.Error("Failed to ForUpdate gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } affected, err := session.Insert(goldKpi) if err != nil && affected <= 0 { session.Rollback() log.Error("Failed to insert gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } // 提交事务 err = session.Commit() if err != nil { log.Error("Failed to Commit gold_kpi:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } // insert kpi_report table, no session tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType) affected, err := xEngine.Table(tableName).Insert(kpiData) if err != nil && affected <= 0 { log.Errorf("Failed to insert %s:%v", tableName, err) services.ResponseInternalServerError500ProcessError(w, err) return } // 推送到ws订阅组 wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent) if goldKpi.NEType == "UPF" { wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF, kpiEvent) } services.ResponseStatusOK204NoContent(w) } type MeasureTask struct { Tasks []Task `json:"Tasks"` NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */ } type Task struct { Id int `json:"Id"` StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` Schedule struct { Type string `json:"Type"` // 计划类型:Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行 Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31] Periods []dborm.Period `json:"Periods"` /* Periods []struct { Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍 End string `json:"End"` //零点加测量粒度的整数倍 } `json:"Periods"` */ } `json:"Schedule"` GranulOption string `json:"GranulOption"` // 测量粒度选项:15M/30M/60M/24H KPISet []dborm.KpiSetJ `json:"KPISet"` /* KPISet []struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"] } `json:"KPISet"` */ } type MeasureReport struct { Id int `json:"Id"` TimeStamp string `json:"TimeStamp"` NeName string `json:"NeName"` RmUID string `json:"rmUID"` NeType string `json:"NeType"` Report struct { Period struct { StartTime string `json:"StartTime"` EndTime string `json:"EndTime"` } `json:"Period"` Datas []struct { Code string `json:"Code"` // 统计编码 如:SMFHA01 KPIs []struct { KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn KPIValues []struct { Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims" Value int64 `json:"Value"` } `json:"KPIValues"` } `json:"KPIs"` } `json:"Datas"` } `json:"Report"` } type MeasureData struct { // Id int `json:"id" xorm:"pk 'id' autoincr"` Id int `json:"id" xorm:"-"` Date string `json:"date" xorm:"date"` TaskId int `json:"taskId"` NeType string `json:"neType" xorm:"ne_type"` NeName string `json:"neName" xorm:"ne_name"` RmUid string `json:"rmUid" xorm:"rm_uid"` GranulOption string `json:"granulOption" xorm:"granul_option"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` KpiCode string `json:"kpiCode" xorm:"kpi_code"` KpiId string `json:"kpiId" xorm:"kpi_id"` KpiExt string `json:"kpiExt" xorm:"kpi_ext"` Value int64 `json:"value"` Timestamp string `json:"timestamp"` } // process measure report from NFs func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasureReportFromNF processing... ") // vars := mux.Vars(r) // neType := vars["elementTypeValue"] vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } // body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Debug("Request body:", string(body)) measureReport := new(MeasureReport) _ = json.Unmarshal(body, &measureReport) log.Debug("measureReport:", measureReport) session := xEngine.NewSession() defer session.Close() measureData := new(MeasureData) layout := global.DateTime measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime) measureData.TaskId = measureReport.Id measureData.StartTime = measureReport.Report.Period.StartTime measureData.EndTime = measureReport.Report.Period.EndTime measureData.NeType = measureReport.NeType measureData.NeName = measureReport.NeName measureData.RmUid = measureReport.RmUID measureData.GranulOption, _ = dborm.XormGetSingleCol("measure_task", "granul_option", fmt.Sprintf("id=%d", measureReport.Id)) t, _ := strconv.ParseInt(measureReport.TimeStamp, 10, 64) timestamp := time.Unix(t, 0) log.Debug("timestamp:", timestamp.Format(layout)) measureData.Timestamp = timestamp.Format(layout) log.Debug("Datas:", measureReport.Report.Datas) for _, d := range measureReport.Report.Datas { measureData.KpiCode = d.Code log.Debug("KPIs:", d.KPIs) for _, k := range d.KPIs { measureData.KpiId = k.KPIID log.Debug("KPIValues:", k.KPIValues) if len(k.KPIValues) != 0 { for _, v := range k.KPIValues { measureData.KpiExt = v.Name measureData.Value = v.Value log.Debug("measureData:", measureData) affected, err := session.Insert(measureData) if err != nil && affected <= 0 { log.Error("Failed to insert measure_data:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } } } else { measureData.Value = 0 log.Debug("measureData:", measureData) affected, err := session.Insert(measureData) if err != nil && affected <= 0 { log.Error("Failed to insert measure_data:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } } } } services.ResponseStatusOK204NoContent(w) } func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasureTaskToNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response client := resty.New() measureTask := new(MeasureTask) measureTask.Tasks = make([]Task, 1) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := dborm.GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Table Task:", task) measureTask.Tasks[0].Id = task.Id measureTask.Tasks[0].StartTime = task.StartTime measureTask.Tasks[0].EndTime = task.EndTime // v := new(dborm.ScheduleJson) // _ = json.Unmarshal(task.Schedule, v) // measureTask.Task[0].Schedule.Type = v.Type // measureTask.Task[0].Schedule.Days = v.Days if len(task.Schedule) >= 1 { measureTask.Tasks[0].Schedule.Type = task.Schedule[0].Type measureTask.Tasks[0].Schedule.Days = task.Schedule[0].Days } //v := new(dborm.ScheduleJ) //_ = json.Unmarshal(task.Schedule, v) measureTask.Tasks[0].Schedule.Periods = task.Periods measureTask.Tasks[0].GranulOption = task.GranulOption measureTask.Tasks[0].KPISet = task.KpiSet ips, err := global.GetIps() if err != nil { log.Error("Failed to get local IP:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("ips:", ips) measureTask.NotifyUrl = global.SetNotifyUrl(ips[0], config.GetYamlConfig().Rest[0].Port, fmt.Sprintf(MeasureReportFmt, neType)) log.Debug("Measure Task to NF:", measureTask) if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("Failed to dborm.XormGetNeInfo:", err) services.ResponseInternalServerError500ProcessError(w, err) return } if neInfo == nil { err := fmt.Errorf("not found target NE neType=%s, neId=%s", neType, neId) log.Error(err) services.ResponseInternalServerError500ProcessError(w, err) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: POST ", requestURI2NF) switch task.Status { case dborm.MeasureTaskStatusInactive: body, _ := json.Marshal(measureTask) log.Debug("body: ", string(body)) log.Debug("User-Agent: ", config.GetDefaultUserAgent()) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). SetContentLength(true). Post(requestURI2NF) if err != nil { log.Error("Post to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } log.Debug("response info: ") log.Debug("Status Code:", response.StatusCode()) log.Debug("Status:", response.Status()) log.Debug("Proto:", response.Proto()) log.Debug("Time:", response.Time()) log.Debug("Received At:", response.ReceivedAt()) log.Debug("Size:", response.Size()) case dborm.MeasureTaskStatusSuspend: body, _ := json.Marshal(measureTask) log.Debug("body: ", string(body)) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). SetContentLength(true). Put(requestURI2NF) if err != nil { log.Error("Put to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } default: err = fmt.Errorf("measure task status must be inactive id=%d", id) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusActive taskInfo.CreateTime = time.Now().Format(time.DateTime) affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } default: log.Error("NF return failure to active measure task") if response != nil { log.Info("response body:", string(response.Body())) services.TransportResponse(w, response.StatusCode(), response.Body()) return } else { err = fmt.Errorf("failed to active measure task, NF return error status=%v", response.Status()) log.Error("Unable to active measure task:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } } } services.ResponseStatusOK204NoContent(w) } func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { services.ResponseStatusOK200Null(w) } func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("DeleteMeasureTaskToNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response respMsg := make(map[string]interface{}) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := dborm.GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Measure Task:", task) if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } log.Debug("neIds:", task.NeIds) if len(task.NeIds) == 0 { log.Warn("Not found target NE in the measure task") taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } if neInfo != nil { requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: DELETE ", requestURI2NF) client := resty.New() response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Delete(requestURI2NF) if err != nil { // to avoid can't delete the task for abnormal NF log.Error("Failed to resty delete:", err) taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } log.Info("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Infof("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return default: log.Info("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } } else { taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusDeleted affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } } } services.ResponseWithJson(w, response.StatusCode(), respMsg) } func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PatchMeasureTaskToNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) neType := vars["elementTypeValue"] params := r.URL.Query() taskIds := params["id"] log.Debug("taskIds:", taskIds) var response *resty.Response respMsg := make(map[string]interface{}) for _, taskId := range taskIds { id, _ := strconv.Atoi(taskId) task, err := dborm.GetMeasureTask(id) if err != nil { log.Error("Failed to connect database: ", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } log.Debug("Measure Task:", task) // for neType if len(task.NeIds) == 0 { var neInfos []dborm.NeInfo err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } for _, neInfo := range neInfos { task.NeIds = append(task.NeIds, neInfo.NeId) } } if len(task.NeIds) == 0 { taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) return } for _, neId := range task.NeIds { var err error neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } if neInfo == nil { taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } services.ResponseStatusOK204NoContent(w) //services.ResponseInternalServerError500ProcessError(w, em) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: PATCH ", requestURI2NF) client := resty.New() response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Patch(requestURI2NF) if err != nil { log.Error("Patch to NF failed:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: taskInfo := new(dborm.MeasureTask) taskInfo.Status = dborm.MeasureTaskStatusInactive affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo) if err != nil { log.Error("dborm.XormUpdateTableById is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } else if affected <= 0 { log.Info("Not record affected in measure_task") } default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } } } services.ResponseWithJson(w, response.StatusCode(), respMsg) } type Measurement struct { Id int `json:"-" xorm:"pk 'id' autoincr"` Date string `json:"-" xorm:"date"` Index int `json:"Index"` // 1天中测量时间粒度(如15分钟)的切片索引: 0~95 Timestamp string `json:"TimeStamp" xorm:"-"` NeName string `json:"NeName"` // UserLabel RmUID string `json:"RmUID" xorm:"rm_uid"` NeType string `json:"NeType"` // 网元类型 PmVersion string `json:"PmVersion"` // 性能数据版本号 Dn string `json:"Dn"` // (???)网元标识, 如:RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456 Period string `json:"Period"` // 测量时间粒度选项:5/15/30/60 TimeZone string `json:"TimeZone"` StartTime string `json:"StartTime"` Datas []Data `json:"Datas"` } type KPIValue struct { Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims" Value int64 `json:"Value"` } type KPI struct { KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn KPIValues []KPIValue `json:"KPIValues"` } type Data struct { ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如:SmfFunction KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn } // process measurement post message from NFs func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostMeasurementFromNF processing... ") vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen)) if err != nil { log.Error("Faile to io.ReadAll: ", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Debug("Request body:", string(body)) // measurement := new(dborm.NorthboundPm) measurement := new(dborm.NorthboundPm) _ = json.Unmarshal(body, &measurement) log.Debug("measurement:", measurement) session := dborm.DbClient.XEngine.NewSession() defer session.Close() // layout := global.DateTime layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) affected, err := session.Table("nbi_pm").Insert(measurement) if err != nil && affected <= 0 { log.Error("Failed to insert nbi_pm:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } services.ResponseStatusOK204NoContent(w) } // get measurement message from NFs func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("GetMeasurementFromNF processing... ") // _, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } vars := mux.Vars(r) apiVer := vars["apiVersion"] if apiVer != global.ApiVersionV1 { log.Error("Uri api version is invalid. apiVersion:", apiVer) services.ResponseNotFound404UriNotExist(w, r) return } neType := vars["elementTypeValue"] if neType == "" { log.Error("elementTypeValue is null.") services.ResponseNotFound404UriNotExist(w, r) return } params := r.URL.Query() neIds := params["ne_id"] if len(neIds) == 0 { log.Error("ne_id NOT FOUND") services.ResponseBadRequest400WrongParamValue(w) return } log.Debugf("neType: %s neId:%s", neType, neIds) //var neInfo *dborm.NeInfo neInfo := new(dborm.NeInfo) neInfo, err := dborm.XormGetNeInfo(neType, neIds[0]) if err != nil { log.Error("dborm.XormGetNeInfo is failed:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI) log.Debug("requestURI2NF: GET ", requestURI2NF) client := resty.New() response, err := client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Get(requestURI2NF) if err != nil { log.Error("Failed to Get from NF:", err) services.ResponseInternalServerError500NFConnectRefused(w) return } respMsg := make(map[string]interface{}) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: log.Debug("response:", response) // measurement := new(dborm.NorthboundPm) measurement := new(dborm.NorthboundPm) _ = json.Unmarshal(response.Body(), &measurement) log.Debug("measurement:", measurement) session := dborm.DbClient.XEngine.NewSession() defer session.Close() layout := time.RFC3339 measurement.Date = GetDateFromTimeString(layout, measurement.StartTime) measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime) affected, err := session.Table("nbi_pm").Insert(measurement) if err != nil && affected <= 0 { log.Error("Failed to insert nbi_pm:", err) services.ResponseInternalServerError500DatabaseOperationFailed(w) return } default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) respMsg["error"] = body } services.ResponseWithJson(w, response.StatusCode(), respMsg) }