merge: 合并代码
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/lib/dborm"
|
||||
@@ -63,6 +64,26 @@ type GoldKpi struct {
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
type KpiData struct {
|
||||
ID int `json:"id" xorm:"pk 'id' '<-' autoincr"`
|
||||
NEType string `json:"neType" xorm:"ne_type"`
|
||||
NEName string `json:"neName" xorm:"ne_name"`
|
||||
RmUid string `json:"rmUid" xorm:"rm_uid"`
|
||||
Date string `json:"date" xorm:"date"`
|
||||
StartTime time.Time `json:"startTime" xorm:"start_time"`
|
||||
EndTime time.Time `json:"endTime" xorm:"end_time"`
|
||||
Index int `json:"index" xorm:"index"`
|
||||
Granularity int8 `json:"granularity" xorm:"granularity"`
|
||||
KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"`
|
||||
//CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"`
|
||||
CreatedAt int64 `json:"createdAt" xorm:"'created_at'"`
|
||||
}
|
||||
type KPIVal struct {
|
||||
KPIID string `json:"kpi_id" xorm:"kpi_id"`
|
||||
Value int64 `json:"value" xorm:"value"`
|
||||
Err string `json:"err" xorm:"err"`
|
||||
}
|
||||
|
||||
var (
|
||||
// performance management
|
||||
PerformanceUri = config.DefaultUriPrefix + "/performanceManagement/{apiVersion}/elementType/{elementTypeValue}/objectType/kpiReport/{index}"
|
||||
@@ -96,9 +117,9 @@ type DatabaseClient struct {
|
||||
|
||||
var DbClient DatabaseClient
|
||||
|
||||
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) error {
|
||||
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
|
||||
dbUser, dbPassword, dbHost, dbPort, dbName)
|
||||
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error {
|
||||
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s",
|
||||
dbUser, dbPassword, dbHost, dbPort, dbName, dbParam)
|
||||
DbClient.dbType = dbType
|
||||
DbClient.dbConnMaxLifetime = 0
|
||||
DbClient.dbMaxIdleConns = 0
|
||||
@@ -106,8 +127,7 @@ func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) err
|
||||
if log.GetLevel() == log.LOG_TRACE {
|
||||
DbClient.IsShowSQL = true
|
||||
}
|
||||
log.Debugf("dbType:%s dbUrl:%s:******@tcp(%s:%s)/%s??charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
|
||||
dbType, dbUser, dbHost, dbPort, dbName)
|
||||
log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl)
|
||||
|
||||
var err error
|
||||
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
|
||||
@@ -118,11 +138,28 @@ func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) err
|
||||
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
|
||||
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
|
||||
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
|
||||
DbClient.XEngine.DatabaseTZ = time.Local // 必须
|
||||
DbClient.XEngine.TZLocation = time.Local // 必须
|
||||
if DbClient.IsShowSQL {
|
||||
DbClient.XEngine.ShowSQL(true)
|
||||
}
|
||||
xEngine = DbClient.XEngine
|
||||
|
||||
// exist, err := xEngine.IsTableExist("kpi_report")
|
||||
// if err != nil {
|
||||
// log.Error("Failed to IsTableExist:", err)
|
||||
// return err
|
||||
// }
|
||||
// if exist {
|
||||
// // 复制表结构到新表
|
||||
// sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` AS SELECT * FROM kpi_report WHERE 1=0", "kpi_report_amf")
|
||||
// _, err := xEngine.Exec(sql)
|
||||
// if err != nil {
|
||||
// log.Error("Failed to Exec:", err)
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -203,12 +240,34 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
"startIndex": goldKpi.Index,
|
||||
"timeGroup": goldKpi.StartTime,
|
||||
}
|
||||
// insert into new kpi_report_xxx table
|
||||
kpiData := new(KpiData)
|
||||
kpiData.Date = goldKpi.Date
|
||||
kpiData.Index = goldKpi.Index
|
||||
st, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.StartTime, time.Local)
|
||||
et, _ := time.ParseInLocation(time.RFC3339Nano, kpiReport.Task.Period.EndTime, time.Local)
|
||||
// kpiData.StartTime = goldKpi.StartTime
|
||||
// kpiData.EndTime = goldKpi.EndTime
|
||||
kpiData.StartTime = st
|
||||
kpiData.EndTime = et
|
||||
kpiData.Granularity = goldKpi.Granularity
|
||||
kpiData.NEName = goldKpi.NEName
|
||||
kpiData.NEType = goldKpi.NEType
|
||||
kpiData.RmUid = goldKpi.RmUid
|
||||
kpiVal := new(KPIVal)
|
||||
kpiData.CreatedAt = time.Now().UnixMilli()
|
||||
for _, k := range kpiReport.Task.NE.KPIs {
|
||||
kpiEvent[k.KPIID] = k.Value // kip_id
|
||||
goldKpi.KpiId = k.KPIID
|
||||
goldKpi.Value = k.Value
|
||||
goldKpi.Error = k.Err
|
||||
log.Trace("goldKpi:", goldKpi)
|
||||
|
||||
kpiVal.KPIID = k.KPIID
|
||||
kpiVal.Value = int64(k.Value)
|
||||
kpiVal.Err = k.Err
|
||||
kpiData.KPIValues = append(kpiData.KPIValues, *kpiVal)
|
||||
|
||||
//log.Trace("goldKpi:", goldKpi)
|
||||
|
||||
// 启动事务
|
||||
err := session.Begin()
|
||||
@@ -242,10 +301,22 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// insert kpi_report table, no session
|
||||
tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType)
|
||||
affected, err := xEngine.Table(tableName).Insert(kpiData)
|
||||
if err != nil && affected <= 0 {
|
||||
log.Errorf("Failed to insert %s:%v", tableName, err)
|
||||
services.ResponseInternalServerError500ProcessError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 推送到ws订阅组
|
||||
wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent)
|
||||
if goldKpi.NEType == "UPF" {
|
||||
wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF, kpiEvent)
|
||||
}
|
||||
|
||||
services.ResponseStatusOK200Null(w)
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
}
|
||||
|
||||
type MeasureTask struct {
|
||||
@@ -409,11 +480,11 @@ func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("PostMeasureTaskToNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
// _, err := services.CheckFrontValidRequest(w, r)
|
||||
// if err != nil {
|
||||
// log.Error("Request error:", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
vars := mux.Vars(r)
|
||||
neType := vars["elementTypeValue"]
|
||||
@@ -585,11 +656,11 @@ func PutMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("DeleteMeasureTaskToNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
// _, err := services.CheckFrontValidRequest(w, r)
|
||||
// if err != nil {
|
||||
// log.Error("Request error:", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
vars := mux.Vars(r)
|
||||
neType := vars["elementTypeValue"]
|
||||
@@ -716,11 +787,11 @@ func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("PatchMeasureTaskToNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
// _, err := services.CheckFrontValidRequest(w, r)
|
||||
// if err != nil {
|
||||
// log.Error("Request error:", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
vars := mux.Vars(r)
|
||||
neType := vars["elementTypeValue"]
|
||||
@@ -912,11 +983,11 @@ func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug("GetMeasurementFromNF processing... ")
|
||||
|
||||
_, err := services.CheckFrontValidRequest(w, r)
|
||||
if err != nil {
|
||||
log.Error("Request error:", err)
|
||||
return
|
||||
}
|
||||
// _, err := services.CheckFrontValidRequest(w, r)
|
||||
// if err != nil {
|
||||
// log.Error("Request error:", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
vars := mux.Vars(r)
|
||||
apiVer := vars["apiVersion"]
|
||||
@@ -944,7 +1015,7 @@ func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
//var neInfo *dborm.NeInfo
|
||||
neInfo := new(dborm.NeInfo)
|
||||
|
||||
neInfo, err = dborm.XormGetNeInfo(neType, neIds[0])
|
||||
neInfo, err := dborm.XormGetNeInfo(neType, neIds[0])
|
||||
if err != nil {
|
||||
log.Error("dborm.XormGetNeInfo is failed:", err)
|
||||
services.ResponseInternalServerError500DatabaseOperationFailed(w)
|
||||
|
||||
Reference in New Issue
Block a user