Files
be.ems/features/pm/performance.go

1100 lines
37 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package pm
import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"
"be.ems/features/pm/kpi_c_report"
"be.ems/features/pm/kpi_c_title"
"be.ems/lib/config"
"be.ems/lib/dborm"
evaluate "be.ems/lib/eval"
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/utils/parse"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
"github.com/go-resty/resty/v2"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
"xorm.io/xorm"
)
type Response struct {
Data interface{} `json:"data"`
}
type KpiReport struct {
Timestamp string `json:"TimeStamp"`
Task struct {
Period struct {
StartTime string `json:"StartTime"`
EndTime string `json:"EndTime"`
} `json:"Period"`
NE struct {
NEName string `json:"NEName"`
RmUID string `json:"rmUID"`
NeType string `json:"NeType"`
KPIs []struct {
KPIID string `json:"KPIID"`
Value int64 `json:"Value"`
Err string `json:"Err"`
} `json:"KPIs"`
} `json:"NE"`
} `json:"Task"`
}
type GoldKpi struct {
// Id int `json:"-" xorm:"pk 'id' autoincr"`
Date string `json:"date" xorm:"date"`
Index int `json:"index"`
Granularity int8 `json:"granularity"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
NEName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
NEType string `json:"neType" xorm:"ne_type"`
KpiId string `json:"kpiId" xorm:"kpi_id"`
Value int64 `json:"value"`
Error string `json:"error"`
Timestamp string `json:"timestamp"`
}
type KpiData struct {
ID int `json:"id" xorm:"pk 'id' '<-' autoincr"`
NEType string `json:"neType" xorm:"ne_type"`
NEName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
Date string `json:"date" xorm:"date"`
StartTime string `json:"startTime" xorm:"start_time"`
EndTime string `json:"endTime" xorm:"end_time"`
Index int `json:"index" xorm:"index"`
Granularity int8 `json:"granularity" xorm:"granularity"`
KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"`
//CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"`
CreatedAt int64 `json:"createdAt" xorm:"'created_at'"`
TenantID string `json:"tenantId" xorm:"tenant_id"`
}
type KPIVal struct {
KPIID string `json:"kpi_id" xorm:"kpi_id"`
Value int64 `json:"value" xorm:"value"`
Err string `json:"err" xorm:"err"`
}
var (
// performance management
PerformanceUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
MeasureTaskUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask"
MeasureReportUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport"
MeasureReportFmt = config.DefaultUriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport"
MeasurementUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}"
UriMeasureTask = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
// performance management
CustomPerformanceUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
CustomMeasureTaskUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureTask"
CustomMeasureReportUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measureReport"
CustomMeasureReportFmt = config.UriPrefix + "/performanceManagement/v1/elementType/%s/objectType/measureReport"
CustomMeasurementUri = config.UriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/measurement/{index}"
CustomUriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
)
var xEngine *xorm.Engine
type DatabaseClient struct {
dbType string
dbUrl string
dbConnMaxLifetime time.Duration
dbMaxIdleConns int
dbMaxOpenConns int
IsShowSQL bool
XEngine *xorm.Engine
}
var DbClient DatabaseClient
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error {
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s",
dbUser, dbPassword, dbHost, dbPort, dbName, dbParam)
DbClient.dbType = dbType
DbClient.dbConnMaxLifetime = 0
DbClient.dbMaxIdleConns = 0
DbClient.dbMaxOpenConns = 0
if log.GetLevel() == log.LOG_TRACE {
DbClient.IsShowSQL = true
}
log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl)
var err error
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
if err != nil {
log.Error("Failed to connet database:", err)
return err
}
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
DbClient.XEngine.DatabaseTZ = time.Local // 必须
DbClient.XEngine.TZLocation = time.Local // 必须
if DbClient.IsShowSQL {
DbClient.XEngine.ShowSQL(true)
}
xEngine = DbClient.XEngine
// exist, err := xEngine.IsTableExist("kpi_report")
// if err != nil {
// log.Error("Failed to IsTableExist:", err)
// return err
// }
// if exist {
// // 复制表结构到新表
// sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` AS SELECT * FROM kpi_report WHERE 1=0", "kpi_report_amf")
// _, err := xEngine.Exec(sql)
// if err != nil {
// log.Error("Failed to Exec:", err)
// return err
// }
// }
return nil
}
func XormConnectDatabase(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) (*xorm.Engine, error) {
sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbUser, dbPassword, dbHost, dbPort, dbName)
log.Debugf("dbType:%s Connect to:%s:******@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbType, dbUser, dbHost, dbPort, dbName)
var err error
xEngine, err = xorm.NewEngine(dbType, sqlStr) //1、Create xorm engine
if err != nil {
log.Error("Failed to connect database:", err)
return nil, err
}
if log.GetLevel() == log.LOG_TRACE {
xEngine.ShowSQL(true)
}
return xEngine, nil
}
func GetDateFromTimeString(fmtString string, timeString string) string {
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
return t.Format("2006-01-02")
}
func GetDateTimeFromTimeString(fmtString string, timeString string) string {
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
return t.Format(global.DateTime)
}
// process KPI report post message from NFs
func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostKPIReportFromNF processing... ")
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
if err != nil {
log.Error("Faile to io.ReadAll: ", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
//log.Trace("Request body:", string(body))
kpiReport := new(KpiReport)
_ = json.Unmarshal(body, &kpiReport)
//log.Trace("kpiReport:", kpiReport)
layout := time.RFC3339Nano
//kpiDate := GetDateFromTimeString(layout, kpiReport.Task.Period.StartTime)
kpiIndex, _ := strconv.Atoi(vars["index"])
startTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime)
endTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime)
// get time granularity from startTime and endTime
seconds, _ := global.GetSecondDuration(startTime, endTime)
var granularity int8 = 60
if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 {
granularity = int8(seconds)
}
// insert into new kpi_report_xxx table
kpiData := new(KpiData)
kpiData.Date = startTime
kpiData.Index = kpiIndex
//stime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local)
//etime, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local)
kpiData.StartTime = startTime
kpiData.EndTime = endTime
kpiData.Granularity = granularity
kpiData.NEName = kpiReport.Task.NE.NEName
kpiData.NEType = kpiReport.Task.NE.NeType
kpiData.RmUid = kpiReport.Task.NE.RmUID
kpiVal := new(KPIVal)
kpiData.CreatedAt = time.Now().UnixMilli()
// 黄金指标事件对象
kpiEvent := map[string]any{
// kip_id ...
"neType": kpiReport.Task.NE.NeType,
"neName": kpiReport.Task.NE.NEName,
"rmUID": kpiReport.Task.NE.RmUID,
"startIndex": kpiIndex,
"timeGroup": kpiData.CreatedAt,
}
// for custom kpi
kpiValMap := map[string]any{}
for _, k := range kpiReport.Task.NE.KPIs {
kpiEvent[k.KPIID] = k.Value // kip_id
kpiVal.KPIID = k.KPIID
kpiVal.Value = int64(k.Value)
kpiVal.Err = k.Err
kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal)
kpiValMap[k.KPIID] = k.Value
}
kpiValMap["granularity"] = kpiData.Granularity
// set tenant_name if exist
where := fmt.Sprintf("status='1' and tenancy_type='UPF' and tenancy_key='%s'", kpiData.RmUid)
kpiData.TenantID, _ = dborm.XormGetSingleColStringByWhere("sys_tenant", "parent_id", where)
// insert kpi_report table, no session
tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType)
affected, err := xEngine.Table(tableName).Insert(kpiData)
if err != nil && affected <= 0 {
log.Errorf("Failed to insert %s:%v", tableName, err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
report := kpi_c_report.KpiCReport{
NeType: &kpiData.NEType,
NeName: &kpiData.NEName,
RmUID: &kpiData.RmUid,
Date: kpiData.Date,
StartTime: &kpiData.StartTime,
EndTime: &kpiData.EndTime,
Index: int16(kpiData.Index),
Granularity: &kpiData.Granularity,
TenantID: &kpiData.TenantID,
}
// 发送到匹配的网元
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(kpiData.RmUid)
// custom kpi report to FE
kpiCEvent := map[string]any{
// kip_id ...
"neType": kpiData.NEType,
"neId": neInfo.NeId,
"neName": kpiData.NEName,
"rmUID": kpiData.RmUid,
"startIndex": kpiData.Index,
"timeGroup": kpiData.Date[:10] + " " + kpiData.EndTime,
"createdAt": kpiData.CreatedAt,
"granularity": kpiData.Granularity,
"tenantID": kpiData.TenantID,
}
kpiCList := kpi_c_title.GetActiveKPICList(kpiData.NEType)
for _, k := range kpiCList {
result, err := evaluate.CalcExpr(*k.Expression, kpiValMap)
kpiCVal := new(kpi_c_report.KpiCVal)
kpiCVal.KPIID = *k.KpiID
if err != nil {
kpiCVal.Value = 0.0
kpiCVal.Err = err.Error()
} else {
if *k.Unit == "%" && result > 100 {
result = 100
}
kpiCVal.Value = result
}
report.KpiValues = append(report.KpiValues, *kpiCVal)
// set KPIC event kpiid and value
kpiCEvent[kpiCVal.KPIID] = kpiCVal.Value
}
// KPI自定义指标入库
kpi_c_report.InsertKpiCReport(kpiData.NEType, report)
if neInfo.RmUID == kpiData.RmUid {
// 推送到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent)
// 推送自定义KPI到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent)
if neInfo.NeType == "UPF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+"_"+neInfo.NeId, kpiEvent)
// 推送标识为12_RMUID, exp: 12_4400HXUPF001, for multi-tenancy
wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+"_"+kpiReport.Task.NE.RmUID, kpiEvent)
// 更新UPF总流量
upValue := parse.Number(kpiEvent["UPF.03"])
downValue := parse.Number(kpiEvent["UPF.06"])
neDataService.NewPerfKPI.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue)
}
}
services.ResponseStatusOK204NoContent(w)
}
type MeasureTask struct {
Tasks []Task `json:"Tasks"`
NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */
}
type Task struct {
Id int `json:"Id"`
StartTime string `json:"StartTime"`
EndTime string `json:"EndTime"`
Schedule struct {
Type string `json:"Type"` // 计划类型Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行
Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31]
Periods []dborm.Period `json:"Periods"`
/*
Periods []struct {
Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍
End string `json:"End"` //零点加测量粒度的整数倍
} `json:"Periods"`
*/
} `json:"Schedule"`
GranulOption string `json:"GranulOption"` // 测量粒度选项15M/30M/60M/24H
KPISet []dborm.KpiSetJ `json:"KPISet"`
/*
KPISet []struct {
Code string `json:"Code"` // 统计编码 如SMFHA01
KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"]
} `json:"KPISet"`
*/
}
type MeasureReport struct {
Id int `json:"Id"`
TimeStamp string `json:"TimeStamp"`
NeName string `json:"NeName"`
RmUID string `json:"rmUID"`
NeType string `json:"NeType"`
Report struct {
Period struct {
StartTime string `json:"StartTime"`
EndTime string `json:"EndTime"`
} `json:"Period"`
Datas []struct {
Code string `json:"Code"` // 统计编码 如SMFHA01
KPIs []struct {
KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
KPIValues []struct {
Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项如Dnn的名称写对应的Dnn"cmnet"/"ims"
Value int64 `json:"Value"`
} `json:"KPIValues"`
} `json:"KPIs"`
} `json:"Datas"`
} `json:"Report"`
}
type MeasureData struct {
// Id int `json:"id" xorm:"pk 'id' autoincr"`
Id int `json:"id" xorm:"-"`
Date string `json:"date" xorm:"date"`
TaskId int `json:"taskId"`
NeType string `json:"neType" xorm:"ne_type"`
NeName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
GranulOption string `json:"granulOption" xorm:"granul_option"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
KpiCode string `json:"kpiCode" xorm:"kpi_code"`
KpiId string `json:"kpiId" xorm:"kpi_id"`
KpiExt string `json:"kpiExt" xorm:"kpi_ext"`
Value int64 `json:"value"`
Timestamp string `json:"timestamp"`
}
// process measure report from NFs
func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostMeasureReportFromNF processing... ")
// vars := mux.Vars(r)
// neType := vars["elementTypeValue"]
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
// body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
if err != nil {
log.Error("Faile to io.ReadAll: ", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
log.Debug("Request body:", string(body))
measureReport := new(MeasureReport)
_ = json.Unmarshal(body, &measureReport)
log.Debug("measureReport:", measureReport)
session := xEngine.NewSession()
defer session.Close()
measureData := new(MeasureData)
layout := global.DateTime
measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime)
measureData.TaskId = measureReport.Id
measureData.StartTime = measureReport.Report.Period.StartTime
measureData.EndTime = measureReport.Report.Period.EndTime
measureData.NeType = measureReport.NeType
measureData.NeName = measureReport.NeName
measureData.RmUid = measureReport.RmUID
measureData.GranulOption, _ = dborm.XormGetSingleCol("measure_task", "granul_option", fmt.Sprintf("id=%d", measureReport.Id))
t, _ := strconv.ParseInt(measureReport.TimeStamp, 10, 64)
timestamp := time.Unix(t, 0)
log.Debug("timestamp:", timestamp.Format(layout))
measureData.Timestamp = timestamp.Format(layout)
log.Debug("Datas:", measureReport.Report.Datas)
for _, d := range measureReport.Report.Datas {
measureData.KpiCode = d.Code
log.Debug("KPIs:", d.KPIs)
for _, k := range d.KPIs {
measureData.KpiId = k.KPIID
log.Debug("KPIValues:", k.KPIValues)
if len(k.KPIValues) != 0 {
for _, v := range k.KPIValues {
measureData.KpiExt = v.Name
measureData.Value = v.Value
log.Debug("measureData:", measureData)
affected, err := session.Insert(measureData)
if err != nil && affected <= 0 {
log.Error("Failed to insert measure_data:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
}
} else {
measureData.Value = 0
log.Debug("measureData:", measureData)
affected, err := session.Insert(measureData)
if err != nil && affected <= 0 {
log.Error("Failed to insert measure_data:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
}
}
}
services.ResponseStatusOK204NoContent(w)
}
func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostMeasureTaskToNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
params := r.URL.Query()
taskIds := params["id"]
log.Debug("taskIds:", taskIds)
var response *resty.Response
client := resty.New()
measureTask := new(MeasureTask)
measureTask.Tasks = make([]Task, 1)
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
log.Debug("Table Task:", task)
measureTask.Tasks[0].Id = task.Id
measureTask.Tasks[0].StartTime = task.StartTime
measureTask.Tasks[0].EndTime = task.EndTime
// v := new(dborm.ScheduleJson)
// _ = json.Unmarshal(task.Schedule, v)
// measureTask.Task[0].Schedule.Type = v.Type
// measureTask.Task[0].Schedule.Days = v.Days
if len(task.Schedule) >= 1 {
measureTask.Tasks[0].Schedule.Type = task.Schedule[0].Type
measureTask.Tasks[0].Schedule.Days = task.Schedule[0].Days
}
//v := new(dborm.ScheduleJ)
//_ = json.Unmarshal(task.Schedule, v)
measureTask.Tasks[0].Schedule.Periods = task.Periods
measureTask.Tasks[0].GranulOption = task.GranulOption
measureTask.Tasks[0].KPISet = task.KpiSet
ips, err := global.GetIps()
if err != nil {
log.Error("Failed to get local IP:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Debug("ips:", ips)
measureTask.NotifyUrl = global.SetNotifyUrl(ips[0], config.GetYamlConfig().Rest[0].Port, fmt.Sprintf(MeasureReportFmt, neType))
log.Debug("Measure Task to NF:", measureTask)
if len(task.NeIds) == 0 {
var neInfos []dborm.NeInfo
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
for _, neInfo := range neInfos {
task.NeIds = append(task.NeIds, neInfo.NeId)
}
}
for _, neId := range task.NeIds {
var err error
neInfo, err := dborm.XormGetNeInfo(neType, neId)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfo:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
if neInfo == nil {
err := fmt.Errorf("not found target NE neType=%s, neId=%s", neType, neId)
log.Error(err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: POST ", requestURI2NF)
switch task.Status {
case dborm.MeasureTaskStatusInactive:
body, _ := json.Marshal(measureTask)
log.Debug("body: ", string(body))
log.Debug("User-Agent: ", config.GetDefaultUserAgent())
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(body).
SetContentLength(true).
Post(requestURI2NF)
if err != nil {
log.Error("Post to NF failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
log.Debug("response info: ")
log.Debug("Status Code:", response.StatusCode())
log.Debug("Status:", response.Status())
log.Debug("Proto:", response.Proto())
log.Debug("Time:", response.Time())
log.Debug("Received At:", response.ReceivedAt())
log.Debug("Size:", response.Size())
case dborm.MeasureTaskStatusSuspend:
body, _ := json.Marshal(measureTask)
log.Debug("body: ", string(body))
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(body).
SetContentLength(true).
Put(requestURI2NF)
if err != nil {
log.Error("Put to NF failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
default:
err = fmt.Errorf("measure task status must be inactive id=%d", id)
log.Error("Unable to active measure task:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusActive
taskInfo.CreateTime = time.Now().Format(time.DateTime)
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
default:
log.Error("NF return failure to active measure task")
if response != nil {
log.Info("response body:", string(response.Body()))
services.TransportResponse(w, response.StatusCode(), response.Body())
return
} else {
err = fmt.Errorf("failed to active measure task, NF return error status=%v", response.Status())
log.Error("Unable to active measure task:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
}
}
services.ResponseStatusOK204NoContent(w)
}
func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
services.ResponseStatusOK200Null(w)
}
func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("DeleteMeasureTaskToNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
params := r.URL.Query()
taskIds := params["id"]
log.Debug("taskIds:", taskIds)
var response *resty.Response
respMsg := make(map[string]interface{})
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
log.Debug("Measure Task:", task)
if len(task.NeIds) == 0 {
var neInfos []dborm.NeInfo
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
for _, neInfo := range neInfos {
task.NeIds = append(task.NeIds, neInfo.NeId)
}
}
log.Debug("neIds:", task.NeIds)
if len(task.NeIds) == 0 {
log.Warn("Not found target NE in the measure task")
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
for _, neId := range task.NeIds {
var err error
neInfo, err := dborm.XormGetNeInfo(neType, neId)
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
if neInfo != nil {
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: DELETE ", requestURI2NF)
client := resty.New()
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Delete(requestURI2NF)
if err != nil {
// to avoid can't delete the task for abnormal NF
log.Error("Failed to resty delete:", err)
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
log.Info("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Infof("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
default:
log.Info("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
respMsg["error"] = body
}
} else {
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
}
}
services.ResponseWithJson(w, response.StatusCode(), respMsg)
}
func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PatchMeasureTaskToNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
params := r.URL.Query()
taskIds := params["id"]
log.Debug("taskIds:", taskIds)
var response *resty.Response
respMsg := make(map[string]interface{})
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
log.Debug("Measure Task:", task)
// for neType
if len(task.NeIds) == 0 {
var neInfos []dborm.NeInfo
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
for _, neInfo := range neInfos {
task.NeIds = append(task.NeIds, neInfo.NeId)
}
}
if len(task.NeIds) == 0 {
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
return
}
for _, neId := range task.NeIds {
var err error
neInfo, err := dborm.XormGetNeInfo(neType, neId)
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
if neInfo == nil {
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
services.ResponseStatusOK204NoContent(w)
//services.ResponseInternalServerError500ProcessError(w, em)
return
}
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: PATCH ", requestURI2NF)
client := resty.New()
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Patch(requestURI2NF)
if err != nil {
log.Error("Patch to NF failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
log.Error("dborm.XormUpdateTableById is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
} else if affected <= 0 {
log.Info("Not record affected in measure_task")
}
default:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
respMsg["error"] = body
}
}
}
services.ResponseWithJson(w, response.StatusCode(), respMsg)
}
type Measurement struct {
Id int `json:"-" xorm:"pk 'id' autoincr"`
Date string `json:"-" xorm:"date"`
Index int `json:"Index"` // 1天中测量时间粒度(如15分钟)的切片索引: 0~95
Timestamp string `json:"TimeStamp" xorm:"-"`
NeName string `json:"NeName"` // UserLabel
RmUID string `json:"RmUID" xorm:"rm_uid"`
NeType string `json:"NeType"` // 网元类型
PmVersion string `json:"PmVersion"` // 性能数据版本号
Dn string `json:"Dn"` // (???)网元标识, 如:RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456
Period string `json:"Period"` // 测量时间粒度选项5/15/30/60
TimeZone string `json:"TimeZone"`
StartTime string `json:"StartTime"`
Datas []Data `json:"Datas"`
}
type KPIValue struct {
Name string `json:"Name"` // 单个的写"Total", 或者指标项有多个测量项如Dnn的名称写对应的Dnn"cmnet"/"ims"
Value int64 `json:"Value"`
}
type KPI struct {
KPIID string `json:"KPIID"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
KPIValues []KPIValue `json:"KPIValues"`
}
type Data struct {
ObjectType string `json:"ObjectType"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如SmfFunction
KPIs []KPI `json:"KPIs"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
}
// process measurement post message from NFs
func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostMeasurementFromNF processing... ")
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, global.RequestBodyMaxLen))
if err != nil {
log.Error("Faile to io.ReadAll: ", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
log.Debug("Request body:", string(body))
// measurement := new(dborm.NorthboundPm)
measurement := new(dborm.NorthboundPm)
_ = json.Unmarshal(body, &measurement)
log.Debug("measurement:", measurement)
session := dborm.DbClient.XEngine.NewSession()
defer session.Close()
// layout := global.DateTime
layout := time.RFC3339
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
affected, err := session.Table("nbi_pm").Insert(measurement)
if err != nil && affected <= 0 {
log.Error("Failed to insert nbi_pm:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
services.ResponseStatusOK204NoContent(w)
}
// get measurement message from NFs
func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("GetMeasurementFromNF processing... ")
// _, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
neType := vars["elementTypeValue"]
if neType == "" {
log.Error("elementTypeValue is null.")
services.ResponseNotFound404UriNotExist(w, r)
return
}
params := r.URL.Query()
neIds := params["ne_id"]
if len(neIds) == 0 {
log.Error("ne_id NOT FOUND")
services.ResponseBadRequest400WrongParamValue(w)
return
}
log.Debugf("neType: %s neId:%s", neType, neIds)
//var neInfo *dborm.NeInfo
neInfo := new(dborm.NeInfo)
neInfo, err := dborm.XormGetNeInfo(neType, neIds[0])
if err != nil {
log.Error("dborm.XormGetNeInfo is failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
requestURI2NF := fmt.Sprintf("http://%s:%v%s", neInfo.Ip, neInfo.Port, r.RequestURI)
log.Debug("requestURI2NF: GET ", requestURI2NF)
client := resty.New()
response, err := client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Get(requestURI2NF)
if err != nil {
log.Error("Failed to Get from NF:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
respMsg := make(map[string]interface{})
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
log.Debug("response:", response)
// measurement := new(dborm.NorthboundPm)
measurement := new(dborm.NorthboundPm)
_ = json.Unmarshal(response.Body(), &measurement)
log.Debug("measurement:", measurement)
session := dborm.DbClient.XEngine.NewSession()
defer session.Close()
layout := time.RFC3339
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
affected, err := session.Table("nbi_pm").Insert(measurement)
if err != nil && affected <= 0 {
log.Error("Failed to insert nbi_pm:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
default:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
respMsg["error"] = body
}
services.ResponseWithJson(w, response.StatusCode(), respMsg)
}