add: 提交
This commit is contained in:
972
features/pm/performance.go
Normal file
972
features/pm/performance.go
Normal file
@@ -0,0 +1,972 @@
|
||||
package pm
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"ems.agt/lib/dborm"
|
||||
"ems.agt/lib/global"
|
||||
"ems.agt/lib/log"
|
||||
"ems.agt/lib/services"
|
||||
"ems.agt/restagent/config"
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/go-resty/resty/v2"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type Response struct {
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
type KpiReport struct {
|
||||
Timestamp string `json:"TimeStamp"`
|
||||
Task struct {
|
||||
Period struct {
|
||||
StartTime string `json:"StartTime"`
|
||||
EndTime string `json:"EndTime"`
|
||||
} `json:"Period"`
|
||||
NE struct {
|
||||
NEName string `json:"NEName"`
|
||||
RmUID string `json:"rmUID"`
|
||||
NeType string `json:"NeType"`
|
||||
KPIs []struct {
|
||||
KPIID string `json:"KPIID"`
|
||||
Value int `json:"Value"`
|
||||
Err string `json:"Err"`
|
||||
} `json:"KPIs"`
|
||||
} `json:"NE"`
|
||||
} `json:"Task"`
|
||||
}
|
||||
|
||||
type GoldKpi struct {
|
||||
// Id int `json:"-" xorm:"pk 'id' autoincr"`
|
||||
Date string `json:"date" xorm:"date"`
|
||||
Index int `json:"index"`
|
||||
StartTime string `json:"startTime"`
|
||||
EndTime string `json:"endTime"`
|
||||
NEName string `json:"neName" xorm:"ne_name"`
|
||||
RmUid string `json:"rmUid" xorm:"rm_uid"`
|
||||
NEType string `json:"neType" xorm:"ne_type"`
|
||||
KpiId string `json:"kpiId" xorm:"kpi_id"`
|
||||
Value int `json:"value"`
|
||||
Error string `json:"error"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
var (
|
||||
// performance management
|
||||
PostPerformanceUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
|
||||
|
||||
// performance management
|
||||
PostPerformancePattern = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
|
||||
MeasureTaskUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask"
|
||||
MeasureReportUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport"
|
||||
MeasureReportFmt = config.UriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport"
|
||||
MeasurementUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}"
|
||||
UriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
|
||||
)
|
||||
|
||||
var xEngine *xorm.Engine
|
||||
|
||||
type DatabaseClient struct {
|
||||
dbType string
|
||||
dbUrl string
|
||||
dbConnMaxLifetime time.Duration
|
||||
dbMaxIdleConns int
|
||||
dbMaxOpenConns int
|
||||
IsShowSQL bool
|
||||
|
||||
XEngine *xorm.Engine
|
||||
}
|
||||
|
||||
var DbClient DatabaseClient
|
||||
|
||||
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) error {
|
||||
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&parseTime=true&loc=Local",
|
||||
dbUser, dbPassword, dbHost, dbPort, dbName)
|
||||
DbClient.dbType = dbType
|
||||
DbClient.dbConnMaxLifetime = 0
|
||||
DbClient.dbMaxIdleConns = 0
|
||||
DbClient.dbMaxOpenConns = 0
|
||||
if log.GetLevel() == log.LOG_TRACE {
|
||||
DbClient.IsShowSQL = true
|
||||
}
|
||||
log.Debugf("dbType:%s dbUrl:%s:******@tcp(%s:%s)/%s??charset=utf8&parseTime=true&loc=Local",
|
||||
dbType, dbUser, dbHost, dbPort, dbName)
|
||||
|
||||
var err error
|
||||
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
|
||||
if err != nil {
|
||||
log.Error("Failed to connet database:", err)
|
||||
return err
|
||||
}
|
||||
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
|
||||
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
|
||||
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
|
||||
if DbClient.IsShowSQL {
|
||||
DbClient.XEngine.ShowSQL(true)
|
||||
}
|
||||
xEngine = DbClient.XEngine
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func XormConnectDatabase(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) (*xorm.Engine, error) {
|
||||
sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&parseTime=true&loc=Local",
|
||||
dbUser, dbPassword, dbHost, dbPort, dbName)
|
||||
log.Debugf("dbType:%s Connect to:%s:******@tcp(%s:%s)/%s?charset=utf8&parseTime=true&loc=Local",
|
||||
dbType, dbUser, dbHost, dbPort, dbName)
|
||||
var err error
|
||||
xEngine, err = xorm.NewEngine(dbType, sqlStr) //1、Create xorm engine
|
||||
if err != nil {
|
||||
log.Error("Failed to connect database:", err)
|
||||
return nil, err
|
||||
}
|
||||
if log.GetLevel() == log.LOG_TRACE {
|
||||
xEngine.ShowSQL(true)
|
||||
}
|
||||
return xEngine, nil
|
||||
}
|
||||
|
||||
func GetDateFromTimeString(fmtString string, timeString string) string {
|
||||
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
|
||||
return t.Format("2006-01-02")
|
||||
}
|
||||
|
||||
func GetDateTimeFromTimeString(fmtString string, timeString string) string {
|
||||
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
|
||||
return t.Format(global.DateTime)
|
||||
}
|
||||
|
||||
// process alarm post message from NFs
|
||||
func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("PostKPIReportFromNF processing... ")
|
||||
|
||||
vars := mux.Vars(r)
|
||||
apiVer := vars["apiVersion"]
|
||||
if apiVer != global.ApiVersionV1 {
|
||||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||||
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||||
if err != nil {
|
||||
log.Error("Faile to io.ReadAll: ", err)
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
log.Debug("Request body:", string(body))
|
||||
kpiReport := new(KpiReport)
|
||||
_ = json.Unmarshal(body, &kpiReport)
|
||||
log.Debug("kpiReport:", kpiReport)
|
||||
|
||||
session := xEngine.NewSession()
|
||||
defer session.Close()
|
||||
goldKpi := new(GoldKpi)
|
||||
layout := time.RFC3339Nano
|
||||
goldKpi.Date = GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime)
|
||||
goldKpi.Index, _ = strconv.Atoi(vars["index"])
|
||||
goldKpi.StartTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime)
|
||||
goldKpi.EndTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime)
|
||||
goldKpi.NEName = kpiReport.Task.NE.NEName
|
||||
goldKpi.RmUid = kpiReport.Task.NE.RmUID
|
||||
goldKpi.NEType = kpiReport.Task.NE.NeType
|
||||
goldKpi.Timestamp = global.GetFmtTimeString(layout, kpiReport.Timestamp, time.DateTime)
|
||||
for _, k := range kpiReport.Task.NE.KPIs {
|
||||
goldKpi.KpiId = k.KPIID
|
||||
goldKpi.Value = k.Value
|
||||
goldKpi.Error = k.Err
|
||||
log.Trace("goldKpi:", goldKpi)
|
||||
|
||||
// 启动事务
|
||||
err := session.Begin()
|
||||
if err != nil {
|
||||
log.Error("Failed to Begin gold_kpi:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
gkpi := &GoldKpi{}
|
||||
_, err = session.Where("id = ?", 1).ForUpdate().Get(gkpi)
|
||||
if err != nil {
|
||||
// 回滚事务
|
||||
session.Rollback()
|
||||
log.Error("Failed to ForUpdate gold_kpi:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
affected, err := session.Insert(goldKpi)
|
||||
if err != nil && affected <= 0 {
|
||||
session.Rollback()
|
||||
log.Error("Failed to insert gold_kpi:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
// 提交事务
|
||||
err = session.Commit()
|
||||
if err != nil {
|
||||
log.Error("Failed to Commit gold_kpi:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
services.ResponseStatusOK200Null(w)
|
||||
}
|
||||
|
||||
type MeasureTask struct {
|
||||
Tasks []Task `json:"Tasks"`
|
||||
NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Id int `json:"Id"`
|
||||
|
||||
StartTime string `json:"StartTime"`
|
||||
EndTime string `json:"EndTime"`
|
||||
|
||||
Schedule struct {
|
||||
Type string `json:"Type"` // 计划类型:Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行
|
||||
Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31]
|
||||
Periods []dborm.Period `json:"Periods"`
|
||||
/*
|
||||
Periods []struct {
|
||||
Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍
|
||||
End string `json:"End"` //零点加测量粒度的整数倍
|
||||
} `json:"Periods"`
|
||||
*/
|
||||
} `json:"Schedule"`
|
||||
|
||||
GranulOption string `json:"GranulOption"` // 测量粒度选项:15M/30M/60M/24H
|
||||
KPISet []dborm.KpiSetJ `json:"KPISet"`
|
||||
/*
|
||||
KPISet []struct {
|
||||
Code string `json:"Code"` // 统计编码 如:SMFHA01
|
||||
KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"]
|
||||
} `json:"KPISet"`
|
||||
*/
|
||||
}
|
||||
|
||||
type MeasureReport struct {
|
||||
Id int `json:"Id"`
|
||||
TimeStamp string `json:"TimeStamp"`
|
||||
NeName string `json:"NeName"`
|
||||
RmUID string `json:"rmUID"`
|
||||
NeType string `json:"NeType"`
|
||||
|
||||
Report struct {
|
||||
Period struct {
|
||||
StartTime string `json:"StartTime"`
|
||||
EndTime string `json:"EndTime"`
|
||||
} `json:"Period"`
|
||||
|
||||
Datas []struct {
|
||||
Code string `json:"Code"` // 统计编码 如:SMFHA01
|
||||
KPIs []struct {
|
||||
KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
|
||||
KPIValues []struct {
|
||||
Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims"
|
||||
Value int64 `json:"Value"`
|
||||
} `json:"KPIValues"`
|
||||
} `json:"KPIs"`
|
||||
} `json:"Datas"`
|
||||
} `json:"Report"`
|
||||
}
|
||||
|
||||
type MeasureData struct {
|
||||
// Id int `json:"id" xorm:"pk 'id' autoincr"`
|
||||
Id int `json:"id" xorm:"-"`
|
||||
Date string `json:"date" xorm:"date"`
|
||||
TaskId int `json:"taskId"`
|
||||
NeType string `json:"neType" xorm:"ne_type"`
|
||||
NeName string `json:"neName" xorm:"ne_name"`
|
||||
RmUid string `json:"rmUid" xorm:"rm_uid"`
|
||||
GranulOption string `json:"granulOption" xorm:"granul_option"`
|
||||
StartTime string `json:"startTime"`
|
||||
EndTime string `json:"endTime"`
|
||||
KpiCode string `json:"kpiCode" xorm:"kpi_code"`
|
||||
KpiId string `json:"kpiId" xorm:"kpi_id"`
|
||||
KpiExt string `json:"kpiExt" xorm:"kpi_ext"`
|
||||
Value int64 `json:"value"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
// process measure report from NFs
|
||||
func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("PostMeasureReportFromNF processing... ")
|
||||
|
||||
// vars := mux.Vars(r)
|
||||
// neType := vars["elementTypeValue"]
|
||||
vars := mux.Vars(r)
|
||||
apiVer := vars["apiVersion"]
|
||||
if apiVer != global.ApiVersionV1 {
|
||||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||||
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||||
if err != nil {
|
||||
log.Error("Faile to io.ReadAll: ", err)
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
log.Debug("Request body:", string(body))
|
||||
measureReport := new(MeasureReport)
|
||||
_ = json.Unmarshal(body, &measureReport)
|
||||
log.Debug("measureReport:", measureReport)
|
||||
|
||||
session := xEngine.NewSession()
|
||||
defer session.Close()
|
||||
measureData := new(MeasureData)
|
||||
layout := global.DateTime
|
||||
measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime)
|
||||
measureData.TaskId = measureReport.Id
|
||||
measureData.StartTime = measureReport.Report.Period.StartTime
|
||||
measureData.EndTime = measureReport.Report.Period.EndTime
|
||||
measureData.NeType = measureReport.NeType
|
||||
measureData.NeName = measureReport.NeName
|
||||
measureData.RmUid = measureReport.RmUID
|
||||
measureData.GranulOption, _ = dborm.XormGetSingleCol("measure_task", "granul_option", fmt.Sprintf("id=%d", measureReport.Id))
|
||||
t, _ := strconv.ParseInt(measureReport.TimeStamp, 10, 64)
|
||||
timestamp := time.Unix(t, 0)
|
||||
log.Debug("timestamp:", timestamp.Format(layout))
|
||||
measureData.Timestamp = timestamp.Format(layout)
|
||||
log.Debug("Datas:", measureReport.Report.Datas)
|
||||
for _, d := range measureReport.Report.Datas {
|
||||
measureData.KpiCode = d.Code
|
||||
|
||||
log.Debug("KPIs:", d.KPIs)
|
||||
for _, k := range d.KPIs {
|
||||
measureData.KpiId = k.KPIID
|
||||
|
||||
log.Debug("KPIValues:", k.KPIValues)
|
||||
if len(k.KPIValues) != 0 {
|
||||
for _, v := range k.KPIValues {
|
||||
measureData.KpiExt = v.Name
|
||||
measureData.Value = v.Value
|
||||
log.Debug("measureData:", measureData)
|
||||
|
||||
affected, err := session.Insert(measureData)
|
||||
if err != nil && affected <= 0 {
|
||||
log.Error("Failed to insert measure_data:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
measureData.Value = 0
|
||||
log.Debug("measureData:", measureData)
|
||||
|
||||
affected, err := session.Insert(measureData)
|
||||
if err != nil && affected <= 0 {
|
||||
log.Error("Failed to insert measure_data:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
}
|
||||
|
||||
func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("PostMeasureTaskToNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
neType := vars["elementTypeValue"]
|
||||
params := r.URL.Query()
|
||||
taskIds := params["id"]
|
||||
log.Debug("taskIds:", taskIds)
|
||||
|
||||
var response *resty.Response
|
||||
client := resty.New()
|
||||
measureTask := new(MeasureTask)
|
||||
measureTask.Tasks = make([]Task, 1)
|
||||
for _, taskId := range taskIds {
|
||||
id, _ := strconv.Atoi(taskId)
|
||||
task, err := dborm.GetMeasureTask(id)
|
||||
if err != nil {
|
||||
log.Error("Failed to connect database: ", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
log.Debug("Table Task:", task)
|
||||
|
||||
measureTask.Tasks[0].Id = task.Id
|
||||
measureTask.Tasks[0].StartTime = task.StartTime
|
||||
measureTask.Tasks[0].EndTime = task.EndTime
|
||||
// v := new(dborm.ScheduleJson)
|
||||
// _ = json.Unmarshal(task.Schedule, v)
|
||||
// measureTask.Task[0].Schedule.Type = v.Type
|
||||
// measureTask.Task[0].Schedule.Days = v.Days
|
||||
if len(task.Schedule) >= 1 {
|
||||
measureTask.Tasks[0].Schedule.Type = task.Schedule[0].Type
|
||||
measureTask.Tasks[0].Schedule.Days = task.Schedule[0].Days
|
||||
}
|
||||
//v := new(dborm.ScheduleJ)
|
||||
//_ = json.Unmarshal(task.Schedule, v)
|
||||
measureTask.Tasks[0].Schedule.Periods = task.Periods
|
||||
measureTask.Tasks[0].GranulOption = task.GranulOption
|
||||
|
||||
measureTask.Tasks[0].KPISet = task.KpiSet
|
||||
ips, err := global.GetIps()
|
||||
if err != nil {
|
||||
log.Error("Failed to get local IP:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
log.Debug("ips:", ips)
|
||||
|
||||
measureTask.NotifyUrl = global.SetNotifyUrl(ips[0], config.GetYamlConfig().Rest[0].Port, fmt.Sprintf(MeasureReportFmt, neType))
|
||||
log.Debug("Measure Task to NF:", measureTask)
|
||||
|
||||
if len(task.NeIds) == 0 {
|
||||
var neInfos []dborm.NeInfo
|
||||
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
|
||||
if err != nil {
|
||||
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
for _, neInfo := range neInfos {
|
||||
task.NeIds = append(task.NeIds, neInfo.NeId)
|
||||
}
|
||||
}
|
||||
|
||||
for _, neId := range task.NeIds {
|
||||
var err error
|
||||
neInfo, err := dborm.XormGetNeInfo(neType, neId)
|
||||
if err != nil {
|
||||
log.Error("Failed to dborm.XormGetNeInfo:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
if neInfo == nil {
|
||||
err := errors.New(fmt.Sprintf("not found target NE neType=%s, neId=%s", neType, neId))
|
||||
log.Error(err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||||
log.Debug("requestURI2NF: POST ", requestURI2NF)
|
||||
|
||||
switch task.Status {
|
||||
case dborm.MeasureTaskStatusInactive:
|
||||
body, _ := json.Marshal(measureTask)
|
||||
log.Debug("body: ", string(body))
|
||||
|
||||
log.Debug("User-Agent: ", config.GetDefaultUserAgent())
|
||||
response, err = client.R().
|
||||
EnableTrace().
|
||||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||||
SetBody(body).
|
||||
SetContentLength(true).
|
||||
Post(requestURI2NF)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Post to NF failed:", err)
|
||||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||||
return
|
||||
}
|
||||
log.Debug("response info: ")
|
||||
log.Debug("Status Code:", response.StatusCode())
|
||||
log.Debug("Status:", response.Status())
|
||||
log.Debug("Proto:", response.Proto())
|
||||
log.Debug("Time:", response.Time())
|
||||
log.Debug("Received At:", response.ReceivedAt())
|
||||
log.Debug("Size:", response.Size())
|
||||
|
||||
case dborm.MeasureTaskStatusSuspend:
|
||||
body, _ := json.Marshal(measureTask)
|
||||
log.Debug("body: ", string(body))
|
||||
response, err = client.R().
|
||||
EnableTrace().
|
||||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||||
SetBody(body).
|
||||
SetContentLength(true).
|
||||
Put(requestURI2NF)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Put to NF failed:", err)
|
||||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||||
return
|
||||
}
|
||||
default:
|
||||
err = errors.New(fmt.Sprintf("measure task status must be inactive id=%d", id))
|
||||
log.Error("Unable to active measure task:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("StatusCode: ", response.StatusCode())
|
||||
switch response.StatusCode() {
|
||||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusActive
|
||||
taskInfo.CreateTime = time.Now().Format(time.DateTime)
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Info("Not record affected in measure_task")
|
||||
}
|
||||
default:
|
||||
log.Error("NF return failure to active measure task")
|
||||
if response != nil {
|
||||
log.Info("response body:", string(response.Body()))
|
||||
services.TransportResponse(w, response.StatusCode(), response.Body())
|
||||
return
|
||||
} else {
|
||||
err = errors.New(fmt.Sprintf("failed to active measure task, NF return error status=%v", response.Status()))
|
||||
log.Error("Unable to active measure task:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
}
|
||||
|
||||
func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
services.ResponseStatusOK200Null(w)
|
||||
}
|
||||
|
||||
func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("DeleteMeasureTaskToNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
neType := vars["elementTypeValue"]
|
||||
params := r.URL.Query()
|
||||
taskIds := params["id"]
|
||||
log.Debug("taskIds:", taskIds)
|
||||
|
||||
var response *resty.Response
|
||||
respMsg := make(map[string]interface{})
|
||||
for _, taskId := range taskIds {
|
||||
id, _ := strconv.Atoi(taskId)
|
||||
task, err := dborm.GetMeasureTask(id)
|
||||
if err != nil {
|
||||
log.Error("Failed to connect database: ", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
log.Debug("Measure Task:", task)
|
||||
|
||||
if len(task.NeIds) == 0 {
|
||||
var neInfos []dborm.NeInfo
|
||||
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
|
||||
if err != nil {
|
||||
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
for _, neInfo := range neInfos {
|
||||
task.NeIds = append(task.NeIds, neInfo.NeId)
|
||||
}
|
||||
}
|
||||
log.Debug("neIds:", task.NeIds)
|
||||
if len(task.NeIds) == 0 {
|
||||
log.Warn("Not found target NE in the measure task")
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Info("Not record affected in measure_task")
|
||||
}
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
return
|
||||
}
|
||||
|
||||
for _, neId := range task.NeIds {
|
||||
var err error
|
||||
neInfo, err := dborm.XormGetNeInfo(neType, neId)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
if neInfo != nil {
|
||||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||||
log.Debug("requestURI2NF: DELETE ", requestURI2NF)
|
||||
client := resty.New()
|
||||
response, err = client.R().
|
||||
EnableTrace().
|
||||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||||
Delete(requestURI2NF)
|
||||
if err != nil {
|
||||
// to avoid can't delete the task for abnormal NF
|
||||
log.Error("Failed to resty delete:", err)
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Info("Not record affected in measure_task")
|
||||
}
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("StatusCode: ", response.StatusCode())
|
||||
switch response.StatusCode() {
|
||||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Infof("Not record affected in measure_task")
|
||||
}
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
return
|
||||
default:
|
||||
log.Info("response body:", string(response.Body()))
|
||||
body := new(map[string]interface{})
|
||||
_ = json.Unmarshal(response.Body(), &body)
|
||||
respMsg["error"] = body
|
||||
}
|
||||
} else {
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusDeleted
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Info("Not record affected in measure_task")
|
||||
}
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services.ResponseWithJson(w, response.StatusCode(), respMsg)
|
||||
}
|
||||
|
||||
func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("PatchMeasureTaskToNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
neType := vars["elementTypeValue"]
|
||||
params := r.URL.Query()
|
||||
taskIds := params["id"]
|
||||
log.Debug("taskIds:", taskIds)
|
||||
|
||||
var response *resty.Response
|
||||
respMsg := make(map[string]interface{})
|
||||
for _, taskId := range taskIds {
|
||||
id, _ := strconv.Atoi(taskId)
|
||||
task, err := dborm.GetMeasureTask(id)
|
||||
if err != nil {
|
||||
log.Error("Failed to connect database: ", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
log.Debug("Measure Task:", task)
|
||||
|
||||
// for neType
|
||||
if len(task.NeIds) == 0 {
|
||||
var neInfos []dborm.NeInfo
|
||||
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
|
||||
if err != nil {
|
||||
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
for _, neInfo := range neInfos {
|
||||
task.NeIds = append(task.NeIds, neInfo.NeId)
|
||||
}
|
||||
}
|
||||
|
||||
if len(task.NeIds) == 0 {
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusInactive
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Info("Not record affected in measure_task")
|
||||
}
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
return
|
||||
}
|
||||
|
||||
for _, neId := range task.NeIds {
|
||||
var err error
|
||||
neInfo, err := dborm.XormGetNeInfo(neType, neId)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
if neInfo == nil {
|
||||
em := errors.New("Not found NE info in database")
|
||||
log.Error(em)
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusInactive
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Info("Not record affected in measure_task")
|
||||
}
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
//services.ResponseInternalServerError500ProcessError(w, em)
|
||||
return
|
||||
}
|
||||
|
||||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||||
log.Debug("requestURI2NF: PATCH ", requestURI2NF)
|
||||
client := resty.New()
|
||||
response, err = client.R().
|
||||
EnableTrace().
|
||||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||||
Patch(requestURI2NF)
|
||||
if err != nil {
|
||||
log.Error("Patch to NF failed:", err)
|
||||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("StatusCode: ", response.StatusCode())
|
||||
switch response.StatusCode() {
|
||||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||||
taskInfo := new(dborm.MeasureTask)
|
||||
taskInfo.Status = dborm.MeasureTaskStatusInactive
|
||||
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
|
||||
if err != nil {
|
||||
log.Error("dborm.XormUpdateTableById is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
} else if affected <= 0 {
|
||||
log.Info("Not record affected in measure_task")
|
||||
}
|
||||
default:
|
||||
log.Debug("response body:", string(response.Body()))
|
||||
body := new(map[string]interface{})
|
||||
_ = json.Unmarshal(response.Body(), &body)
|
||||
respMsg["error"] = body
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services.ResponseWithJson(w, response.StatusCode(), respMsg)
|
||||
return
|
||||
}
|
||||
|
||||
type Measurement struct {
|
||||
Id int `json:"-" xorm:"pk 'id' autoincr"`
|
||||
Date string `json:"-" xorm:"date"`
|
||||
Index int `json:"Index"` // 1天中测量时间粒度(如15分钟)的切片索引: 0~95
|
||||
Timestamp string `json:"TimeStamp" xorm:"-"`
|
||||
NeName string `json:"NeName"` // UserLabel
|
||||
RmUID string `json:"RmUID" xorm:"rm_uid"`
|
||||
NeType string `json:"NeType"` // 网元类型
|
||||
PmVersion string `json:"PmVersion"` // 性能数据版本号
|
||||
Dn string `json:"Dn"` // (???)网元标识, 如:RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456
|
||||
Period string `json:"Period"` // 测量时间粒度选项:5/15/30/60
|
||||
TimeZone string `json:"TimeZone"`
|
||||
StartTime string `json:"StartTime"`
|
||||
|
||||
Datas []Data `json:"Datas"`
|
||||
}
|
||||
|
||||
type KPIValue struct {
|
||||
Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项,如Dnn的名称写对应的Dnn"cmnet"/"ims"
|
||||
Value int64 `json:"Value"`
|
||||
}
|
||||
|
||||
type KPI struct {
|
||||
KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
|
||||
KPIValues []KPIValue `json:"KPIValues"`
|
||||
}
|
||||
|
||||
type Data struct {
|
||||
ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如:SmfFunction
|
||||
KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
|
||||
}
|
||||
|
||||
// process measurement post message from NFs
|
||||
func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("PostMeasurementFromNF processing... ")
|
||||
|
||||
vars := mux.Vars(r)
|
||||
apiVer := vars["apiVersion"]
|
||||
if apiVer != global.ApiVersionV1 {
|
||||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
|
||||
if err != nil {
|
||||
log.Error("Faile to io.ReadAll: ", err)
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
log.Debug("Request body:", string(body))
|
||||
// measurement := new(dborm.NorthboundPm)
|
||||
measurement := new(dborm.NorthboundPm)
|
||||
_ = json.Unmarshal(body, &measurement)
|
||||
log.Debug("measurement:", measurement)
|
||||
|
||||
session := dborm.DbClient.XEngine.NewSession()
|
||||
defer session.Close()
|
||||
|
||||
// layout := global.DateTime
|
||||
layout := time.RFC3339
|
||||
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
|
||||
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
|
||||
affected, err := session.Table("northbound_pm").Insert(measurement)
|
||||
if err != nil && affected <= 0 {
|
||||
log.Error("Failed to insert northbound_pm:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
}
|
||||
|
||||
// get measurement message from NFs
|
||||
func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("GetMeasurementFromNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
apiVer := vars["apiVersion"]
|
||||
if apiVer != global.ApiVersionV1 {
|
||||
log.Error("Uri api version is invalid. apiVersion:", apiVer)
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
neType := vars["elementTypeValue"]
|
||||
if neType == "" {
|
||||
log.Error("elementTypeValue is null.")
|
||||
services.ResponseNotFound404UriNotExist(w, r)
|
||||
return
|
||||
}
|
||||
params := r.URL.Query()
|
||||
neIds := params["ne_id"]
|
||||
if len(neIds) == 0 {
|
||||
log.Error("ne_id NOT FOUND")
|
||||
services.ResponseBadRequest400WrongParamValue(w)
|
||||
return
|
||||
}
|
||||
log.Debugf("neType: %s neId:%s", neType, neIds)
|
||||
|
||||
//var neInfo *dborm.NeInfo
|
||||
neInfo := new(dborm.NeInfo)
|
||||
|
||||
neInfo, err = dborm.XormGetNeInfo(neType, neIds[0])
|
||||
if err != nil {
|
||||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
|
||||
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
|
||||
log.Debug("requestURI2NF: GET ", requestURI2NF)
|
||||
|
||||
client := resty.New()
|
||||
response, err := client.R().
|
||||
EnableTrace().
|
||||
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
|
||||
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
|
||||
Get(requestURI2NF)
|
||||
if err != nil {
|
||||
log.Error("Failed to Get from NF:", err)
|
||||
services.ResponseInternalServerError500NFConnectRefused(w)
|
||||
return
|
||||
}
|
||||
|
||||
respMsg := make(map[string]interface{})
|
||||
switch response.StatusCode() {
|
||||
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
|
||||
log.Debug("response:", response)
|
||||
// measurement := new(dborm.NorthboundPm)
|
||||
measurement := new(dborm.NorthboundPm)
|
||||
_ = json.Unmarshal(response.Body(), &measurement)
|
||||
log.Debug("measurement:", measurement)
|
||||
|
||||
session := dborm.DbClient.XEngine.NewSession()
|
||||
defer session.Close()
|
||||
|
||||
layout := time.RFC3339
|
||||
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
|
||||
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
|
||||
affected, err := session.Table("northbound_pm").Insert(measurement)
|
||||
if err != nil && affected <= 0 {
|
||||
log.Error("Failed to insert northbound_pm:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
return
|
||||
}
|
||||
default:
|
||||
log.Debug("response body:", string(response.Body()))
|
||||
body := new(map[string]interface{})
|
||||
_ = json.Unmarshal(response.Body(), &body)
|
||||
respMsg["error"] = body
|
||||
}
|
||||
|
||||
services.ResponseWithJson(w, response.StatusCode(), respMsg)
|
||||
}
|
||||
Reference in New Issue
Block a user