refactor: 重构更新多个文件中的相关调用

This commit is contained in:
TsMask
2025-02-20 10:11:40 +08:00
parent f3c33b31ac
commit 5b9bcd6660
34 changed files with 1243 additions and 1894 deletions

View File

@@ -1,15 +1,17 @@
package cdr
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"be.ems/lib/config"
"be.ems/lib/core/ctx"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/database/db"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
)
@@ -22,55 +24,78 @@ var (
CustomUriCDRFile = config.UriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrFile"
)
// CDREvent CDR数据表格结构体
type CDREvent struct {
NeType string `json:"neType" xorm:"ne_type"`
NeName string `json:"neName" xorm:"ne_name"`
RmUID string `json:"rmUID" xorm:"rm_uid"`
Timestamp int `json:"timestamp" xorm:"timestamp"`
CDR map[string]any `json:"CDR" xorm:"cdr_json"`
}
// PostCDREventFrom 接收CDR数据请求
func PostCDREventFrom(w http.ResponseWriter, r *http.Request) {
log.Info("PostCDREventFrom processing... ")
neType := ctx.GetParam(r, "elementTypeValue")
var cdrEvent CDREvent
if err := ctx.ShouldBindJSON(r, &cdrEvent); err != nil {
var body struct {
NeType string `json:"neType" `
NeName string `json:"neName" `
RmUID string `json:"rmUID" `
Timestamp int `json:"timestamp" `
CDR map[string]any `json:"CDR" `
}
if err := ctx.ShouldBindJSON(r, &body); err != nil {
services.ResponseInternalServerError500ProcessError(w, err)
return
}
neTypeLower := strings.ToLower(cdrEvent.NeType)
neTypeLower := strings.ToLower(body.NeType)
if neType == "" || neType != neTypeLower {
services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("inconsistent network element types"))
return
}
// 是否存在网元
neInfo := neService.NewNeInfo.FindByRmuid(body.RmUID)
if neInfo.NeType != body.NeType || neInfo.RmUID != body.RmUID {
services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("network element does not exist"))
return
}
cdrByte, err := json.Marshal(body.CDR)
if err != nil {
services.ResponseInternalServerError500ProcessError(w, err)
return
}
// 执行插入表
type CDREvent struct {
ID int64 `json:"-" gorm:"column:id;primaryKey;autoIncrement"`
NeType string `json:"neType" gorm:"column:ne_type"`
NeName string `json:"neName" gorm:"column:ne_name"`
RmUid string `json:"rmUid" gorm:"column:rm_uid"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` // 接收到的timestamp秒级存储毫秒时间戳
CdrJson string `json:"cdrJSON" gorm:"column:cdr_json"` // data JSON String
CreatedAt int64 `json:"-" gorm:"column:created_at"` // 记录创建存储毫秒
}
data := CDREvent{
NeType: body.NeType,
NeName: body.NeName,
RmUid: body.RmUID,
Timestamp: int64(body.Timestamp) * 1000,
CdrJson: string(cdrByte),
CreatedAt: time.Now().UnixMilli(),
}
tableName := fmt.Sprintf("cdr_event_%s", neTypeLower)
affected, err := dborm.XormInsertTableOne(tableName, cdrEvent)
if err != nil && affected <= 0 {
if err := db.DB("").Table(tableName).Create(&data).Error; err != nil {
log.Error("Failed to insert "+tableName, err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
// 送到匹配的网元
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(cdrEvent.RmUID)
if neInfo.RmUID == cdrEvent.RmUID {
// 推送到ws订阅组
switch neInfo.NeType {
case "IMS":
if v, ok := cdrEvent.CDR["recordType"]; ok && (v == "MOC" || v == "MTSM") {
wsService.NewWSSend.ByGroupID(wsService.GROUP_IMS_CDR+neInfo.NeId, cdrEvent)
}
case "SMF":
wsService.NewWSSend.ByGroupID(wsService.GROUP_SMF_CDR+neInfo.NeId, cdrEvent)
case "SMSC":
wsService.NewWSSend.ByGroupID(wsService.GROUP_SMSC_CDR+neInfo.NeId, cdrEvent)
case "SGWC":
wsService.NewWSSend.ByGroupID(wsService.GROUP_SGWC_CDR+neInfo.NeId, cdrEvent)
// 送到ws订阅组
switch neInfo.NeType {
case "IMS":
if v, ok := body.CDR["recordType"]; ok && (v == "MOC" || v == "MTSM") {
wsService.NewWSSend.ByGroupID(wsService.GROUP_IMS_CDR+neInfo.NeId, data)
}
case "SMF":
wsService.NewWSSend.ByGroupID(wsService.GROUP_SMF_CDR+neInfo.NeId, data)
case "SMSC":
wsService.NewWSSend.ByGroupID(wsService.GROUP_SMSC_CDR+neInfo.NeId, data)
case "SGWC":
wsService.NewWSSend.ByGroupID(wsService.GROUP_SGWC_CDR+neInfo.NeId, data)
}
services.ResponseStatusOK204NoContent(w)

View File

@@ -15,7 +15,8 @@ import (
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
tokenConst "be.ems/src/framework/constants/token"
"be.ems/src/framework/constants"
"be.ems/src/framework/database/db"
neService "be.ems/src/modules/network_element/service"
"github.com/go-resty/resty/v2"
@@ -293,7 +294,7 @@ func PutNeInfo(w http.ResponseWriter, r *http.Request) {
body, _ = json.Marshal(omcNeConfig)
response, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(body).
@@ -748,20 +749,20 @@ func DownloadNeBackupFile(w http.ResponseWriter, r *http.Request) {
}
sql := fmt.Sprintf("select * from ne_backup where ne_type='%s' and file_name='%s'", neTypeUpper, fileName)
neBackup, err := dborm.XormGetDataBySQL(sql)
neBackup, err := db.RawDB("", sql, nil)
if err != nil {
log.Error("Faile to XormGetDataBySQL:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
} else if len(*neBackup) == 0 {
} else if len(neBackup) == 0 {
err := global.ErrCMNotFoundTargetBackupFile
log.Error(err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
path := (*neBackup)[0]["path"]
md5Sum := (*neBackup)[0]["md5_sum"]
path := fmt.Sprint(neBackup[0]["path"])
md5Sum := fmt.Sprint(neBackup[0]["md5_sum"])
services.ResponseFileWithNameAndMD5(w, http.StatusOK, fileName, path, md5Sum)
}
@@ -793,12 +794,12 @@ func DeleteNeBackupFile(w http.ResponseWriter, r *http.Request) {
}
sql := fmt.Sprintf("select * from ne_backup where ne_type='%s' and file_name='%s'", neTypeUpper, fileName)
neBackup, err := dborm.XormGetDataBySQL(sql)
neBackup, err := db.RawDB("", sql, nil)
if err != nil {
log.Error("Faile to XormGetDataBySQL:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
} else if len(*neBackup) == 0 {
} else if len(neBackup) == 0 {
err := global.ErrCMNotFoundTargetBackupFile
log.Error(err)
services.ResponseInternalServerError500ProcessError(w, err)
@@ -813,7 +814,7 @@ func DeleteNeBackupFile(w http.ResponseWriter, r *http.Request) {
return
}
path := (*neBackup)[0]["path"]
path := neBackup[0]["path"]
filePath := fmt.Sprintf("%s/%s", path, fileName)
err = os.Remove(filePath)
if err != nil {

View File

@@ -39,7 +39,7 @@ func GetParamConfigFromNF(w http.ResponseWriter, r *http.Request) {
return
}
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neType, neId)
neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neType, neId)
var response services.DataResponse
if neInfo.NeId == neId && neInfo.NeId != "" {
@@ -76,7 +76,7 @@ func PostParamConfigToNF(w http.ResponseWriter, r *http.Request) {
return
}
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neType, neId)
neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neType, neId)
if neInfo.NeId != neId || neInfo.NeId == "" {
log.Error("neId is empty")
@@ -128,7 +128,7 @@ func PutParamConfigToNF(w http.ResponseWriter, r *http.Request) {
}
neId := ctx.GetQuery(r, "ne_id")
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neType, neId)
neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neType, neId)
if neInfo.NeId != neId || neInfo.NeId == "" {
log.Error("neId is empty")
@@ -181,7 +181,7 @@ func DeleteParamConfigToNF(w http.ResponseWriter, r *http.Request) {
}
neId := ctx.GetQuery(r, "ne_id")
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neType, neId)
neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neType, neId)
if neInfo.NeId != neId || neInfo.NeId == "" {
log.Error("neId is empty")

View File

@@ -7,7 +7,6 @@ import (
"net/http"
"regexp"
"strings"
"time"
"be.ems/lib/config"
"be.ems/lib/core/ctx"
@@ -15,9 +14,9 @@ import (
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/database/db"
"github.com/gorilla/mux"
"xorm.io/xorm"
)
type XormResponse struct {
@@ -62,56 +61,6 @@ var (
)
var XEngine *xorm.Engine
type DatabaseClient struct {
dbType string
dbUrl string
dbConnMaxLifetime time.Duration
dbMaxIdleConns int
dbMaxOpenConns int
IsShowSQL bool
XEngine *xorm.Engine
}
var DbClient DatabaseClient
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error {
// DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
// dbUser, dbPassword, dbHost, dbPort, dbName)
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s",
dbUser, dbPassword, dbHost, dbPort, dbName, dbParam)
DbClient.dbType = dbType
DbClient.dbConnMaxLifetime = 0
DbClient.dbMaxIdleConns = 0
DbClient.dbMaxOpenConns = 0
if log.GetLevel() == log.LOG_TRACE {
DbClient.IsShowSQL = true
}
log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl)
var err error
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
if err != nil {
log.Error("Failed to connet database:", err)
return err
}
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
DbClient.XEngine.DatabaseTZ = time.Local // 必须
DbClient.XEngine.TZLocation = time.Local // 必须
if DbClient.IsShowSQL {
//DbClient.XEngine.SetLogger(&log.Elogger)
DbClient.XEngine.ShowSQL(true)
}
XEngine = DbClient.XEngine
return nil
}
func GetUriSQLArray(r *http.Request) []string {
var sa []string
vars := r.URL.Query()
@@ -181,17 +130,13 @@ func GetTableName(sql string) string {
func IsQuerySQL(s string) bool {
ts := strings.Trim(strings.ToLower(s), " ")
if strings.Index(ts, "select") != 0 {
return false
}
return true
return strings.Index(ts, "select") == 0
}
// xorm Get data from database
func ExtDatabaseExecSQL(w http.ResponseWriter, r *http.Request) {
log.Debug("ExtDatabaseExecSQL processing... ")
var sql []string
// var err error
// _, err = services.CheckExtValidRequest(w, r)
@@ -202,7 +147,7 @@ func ExtDatabaseExecSQL(w http.ResponseWriter, r *http.Request) {
//vars := mux.Vars(r)
//tblName := vars["objectTypeValue"]
sql = GetUriSQLArray(r)
var sql = GetUriSQLArray(r)
// select as must, todo ...
ls := services.ExtGetUriPageLimitString(r)
@@ -229,7 +174,7 @@ func ExtDatabaseExecSQL(w http.ResponseWriter, r *http.Request) {
querySQL = querySQL + " " + ls
}
log.Debug("querySQL:", querySQL)
rows, err := DbClient.XEngine.Exec(querySQL)
rows, err := db.RawDB("", querySQL, nil)
if err != nil {
log.Error("SQL failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
@@ -313,7 +258,7 @@ func ExtDatabaseGetData(w http.ResponseWriter, r *http.Request) {
querySQL = querySQL + " " + ls
}
log.Debug("querySQL:", querySQL)
rows, err := DbClient.XEngine.QueryInterface(querySQL)
rows, err := db.RawDB("", querySQL, nil)
if err != nil {
log.Error("SQL failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
@@ -374,20 +319,18 @@ func ExtDatabaseInsertData(w http.ResponseWriter, r *http.Request) {
tn, sql := dborm.ConstructInsertSQL(tbname, insertData)
log.Tracef("tn: %s sql :%s", tn, sql)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
var affected int64
for _, s := range sql {
res, err := xSession.Exec(s)
n, err := db.ExecDB("", s, nil)
if err != nil {
log.Error("Insert failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
n, _ := res.RowsAffected()
affected = affected + n
}
xSession.Commit()
// affected, err := InsertDataWithJson(insertData)
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
@@ -440,20 +383,18 @@ func ExtDatabaseUpdateData(w http.ResponseWriter, r *http.Request) {
tn, sql := dborm.ConstructUpdateSQL(tbname, updateData, wc)
log.Tracef("tn: %s sql :%s", tn, sql)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
var affected int64
for _, s := range sql {
res, err := xSession.Exec(s)
n, err := db.ExecDB("", s, nil)
if err != nil {
log.Error("Update failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
n, _ := res.RowsAffected()
affected = affected + n
}
xSession.Commit()
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow[tn] = row
@@ -494,16 +435,14 @@ func ExtDatabaseDeleteData(w http.ResponseWriter, r *http.Request) {
log.Debug("Table name:", tbname, "wc:", wc)
sql := dborm.ConstructDeleteSQL(tbname, wc)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
res, err := xSession.Exec(sql)
affected, err := db.ExecDB("", sql, nil)
if err != nil {
log.Error("Update failed, err:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
affected, _ := res.RowsAffected()
xSession.Commit()
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow["data"] = row
@@ -515,7 +454,6 @@ func DatabaseGetData(w http.ResponseWriter, r *http.Request) {
log.Debug("DatabaseGetData processing... ")
var sql []string
var err error
// _, err = services.CheckFrontValidRequest(w, r)
// if err != nil {
@@ -546,7 +484,6 @@ func DatabaseGetData(w http.ResponseWriter, r *http.Request) {
for i, s := range sql {
log.Tracef("SQL[%d]: %s", i, sql[i])
rows := make([]map[string]interface{}, 0)
mapRows := make(map[string]interface{})
if s != "" {
@@ -561,7 +498,7 @@ func DatabaseGetData(w http.ResponseWriter, r *http.Request) {
querySQL = querySQL + " " + ls
}
log.Debug("querySQL:", querySQL)
rows, err = DbClient.XEngine.QueryInterface(querySQL)
rows, err := db.RawDB("", querySQL, nil)
if err != nil {
log.Error("QueryInterface failed:", err)
services.ResponseInternalServerError500ProcessError(w, err)
@@ -611,20 +548,18 @@ func DatabaseInsertData(w http.ResponseWriter, r *http.Request) {
tn, sql := dborm.ConstructInsertSQL(tableName, insertData)
log.Tracef("tn: %s sql :%s", tn, sql)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
var affected int64
for _, s := range sql {
res, err := xSession.Exec(s)
n, err := db.ExecDB("", s, nil)
if err != nil {
log.Error("Insert failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
n, _ := res.RowsAffected()
affected = affected + n
}
xSession.Commit()
// affected, err := InsertDataWithJson(insertData)
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
@@ -653,20 +588,18 @@ func DatabaseUpdateData(w http.ResponseWriter, r *http.Request) {
tn, sql := dborm.ConstructUpdateSQL(tblName, updateData, wc)
log.Tracef("tn: %s sql :%s", tn, sql)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
var affected int64
for _, s := range sql {
res, err := xSession.Exec(s)
n, err := db.ExecDB("", s, nil)
if err != nil {
log.Error("Update failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
n, _ := res.RowsAffected()
affected = affected + n
}
xSession.Commit()
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow[tn] = row
@@ -683,16 +616,14 @@ func DatabaseDeleteData(w http.ResponseWriter, r *http.Request) {
log.Debug("Table name:", tblName, "wc:", wc)
sql := dborm.ConstructDeleteSQL(tblName, wc)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
res, err := xSession.Exec(sql)
affected, err := db.ExecDB("", sql, nil)
if err != nil {
log.Error("Update failed, err:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
affected, _ := res.RowsAffected()
xSession.Commit()
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow["data"] = row
@@ -701,15 +632,25 @@ func DatabaseDeleteData(w http.ResponseWriter, r *http.Request) {
// 连接用户实例
func DbConnection(w http.ResponseWriter, r *http.Request) {
if dborm.DbClient.XEngine == nil {
// 获取底层 SQL 数据库连接
sqlDB, err := db.DB("").DB()
if err != nil {
services.ResponseErrorWithJson(w, 400, "无连接")
return
}
// 测试数据库连接
err = sqlDB.Ping()
if err != nil {
services.ResponseErrorWithJson(w, 400, "无连接")
return
}
// 查询实例
result, err := dborm.DbClient.XEngine.QueryString("SHOW PROCESSLIST;")
result, err := db.RawDB("", "SHOW PROCESSLIST;", nil)
if err != nil {
services.ResponseErrorWithJson(w, 500, err.Error())
}
filterData := []map[string]string{}
filterData := []map[string]any{}
for _, r := range result {
if r["User"] != "system user" {
filterData = append(filterData, r)
@@ -724,15 +665,24 @@ func DbConnection(w http.ResponseWriter, r *http.Request) {
// 关闭数据库连接
func DbStop(w http.ResponseWriter, r *http.Request) {
if dborm.DbClient.XEngine == nil {
// 获取底层 SQL 数据库连接
sqlDB, err := db.DB("").DB()
if err != nil {
services.ResponseErrorWithJson(w, 400, "无连接")
return
}
// 测试数据库连接
err = sqlDB.Ping()
if err != nil {
services.ResponseErrorWithJson(w, 400, "无连接")
return
}
// json 請求參數獲取
var bodyArgs struct {
ID string `json:"ID" validate:"required"`
}
err := ctx.ShouldBindJSON(r, &bodyArgs)
err = ctx.ShouldBindJSON(r, &bodyArgs)
if err != nil {
log.Error("io.ReadAll is failed:", err)
services.ResponseErrorWithJson(w, 400, err.Error())
@@ -740,7 +690,7 @@ func DbStop(w http.ResponseWriter, r *http.Request) {
}
// 关闭
rse, err := dborm.DbClient.XEngine.Exec("KILL ?;", bodyArgs.ID)
rse, err := db.ExecDB("", "KILL ?;", []any{bodyArgs.ID})
if err != nil {
services.ResponseErrorWithJson(w, 500, err.Error())
return
@@ -752,12 +702,9 @@ func DbStop(w http.ResponseWriter, r *http.Request) {
func TaskDatabaseGetData(w http.ResponseWriter, r *http.Request) {
log.Debug("DatabaseGetData processing... ")
var sql []string
var err error
vars := mux.Vars(r)
tblName := vars["objectTypeValue"]
sql = GetUriSQLArray(r)
var sql = GetUriSQLArray(r)
// select as must, todo ...
if sql == nil {
@@ -778,12 +725,11 @@ func TaskDatabaseGetData(w http.ResponseWriter, r *http.Request) {
for i, s := range sql {
log.Tracef("SQL[%d]: %s", i, sql[i])
rows := make([]map[string]interface{}, 0)
mapRows := make(map[string]interface{})
if s != "" {
// err = XEngine.SQL(s).Find(&rows)
if IsQuerySQL(s) == false {
if !IsQuerySQL(s) {
services.ResponseNotAcceptable406QuerySQLError(w)
return
}
@@ -793,7 +739,7 @@ func TaskDatabaseGetData(w http.ResponseWriter, r *http.Request) {
querySQL = querySQL + " " + ls
}
log.Debug("querySQL:", querySQL)
rows, err = DbClient.XEngine.QueryInterface(querySQL)
rows, err := db.RawDB("", querySQL, nil)
if err != nil {
log.Error("SQL failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
@@ -830,20 +776,18 @@ func TaskDatabaseInsertData(w http.ResponseWriter, r *http.Request) {
tn, sql := dborm.ConstructInsertSQL(tableName, insertData)
log.Tracef("tn: %s sql :%s", tn, sql)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
var affected int64
for _, s := range sql {
res, err := xSession.Exec(s)
n, err := db.ExecDB("", s, nil)
if err != nil {
log.Error("Insert failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
n, _ := res.RowsAffected()
affected = affected + n
}
xSession.Commit()
// affected, err := InsertDataWithJson(insertData)
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
@@ -872,20 +816,18 @@ func TaskDatabaseUpdateData(w http.ResponseWriter, r *http.Request) {
tn, sql := dborm.ConstructUpdateSQL(tblName, updateData, wc)
log.Tracef("tn: %s sql :%s", tn, sql)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
var affected int64
for _, s := range sql {
res, err := xSession.Exec(s)
n, err := db.ExecDB("", s, nil)
if err != nil {
log.Error("Update failed:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
n, _ := res.RowsAffected()
affected = affected + n
}
xSession.Commit()
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow[tn] = row
@@ -902,16 +844,14 @@ func TaskDatabaseDeleteData(w http.ResponseWriter, r *http.Request) {
log.Debug("Table name:", tblName, "wc:", wc)
sql := dborm.ConstructDeleteSQL(tblName, wc)
xSession := DbClient.XEngine.NewSession()
defer xSession.Close()
res, err := xSession.Exec(sql)
affected, err := db.ExecDB("", sql, nil)
if err != nil {
log.Error("Update failed, err:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
affected, _ := res.RowsAffected()
xSession.Commit()
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow["data"] = row

View File

@@ -1,6 +1,7 @@
package event
import (
"encoding/json"
"fmt"
"net/http"
"strings"
@@ -8,12 +9,15 @@ import (
"be.ems/lib/config"
"be.ems/lib/core/ctx"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/database/db"
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/parse"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
"github.com/gin-gonic/gin"
)
@@ -25,21 +29,11 @@ var (
CustomUriUEEvent = config.UriPrefix + "/logManagement/v1/elementType/{elementTypeValue}/objectType/ueEvent"
)
type UEEvent struct {
NeType string `json:"neType" xorm:"ne_type"`
NeName string `json:"neName" xorm:"ne_name"`
RmUID string `json:"rmUID" xorm:"rm_uid"`
Timestamp int64 `json:"timestamp" xorm:"timestamp"`
EventType string `json:"eventType" xorm:"event_type"`
EventJson map[string]any `json:"eventJSON" xorm:"event_json"`
}
// 旧AMF上报处理
func PostUEEventFromAMF(c *gin.Context) {
log.Info("PostUEEventFromAMF processing... ")
eventType, ok := c.Params.Get("eventType")
if !ok || eventType == "" {
eventType := c.Param("eventType")
if eventType == "" {
log.Error("eventType is empty")
services.ResponseNotFound404UriNotExist(c.Writer, c.Request)
return
@@ -51,21 +45,45 @@ func PostUEEventFromAMF(c *gin.Context) {
return
}
ueEvent := UEEvent{
NeType: "AMF",
Timestamp: time.Now().Unix(),
EventType: eventType,
// 执行插入表
type UEEvent struct {
ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
NeType string `json:"neType" gorm:"column:ne_type"`
NeName string `json:"neName" gorm:"column:ne_name"`
RmUID string `json:"rmUID" gorm:"column:rm_uid"` // 可能没有
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` // 接收到的timestamp秒级存储毫秒时间戳
EventType string `json:"eventType" gorm:"column:event_type"` // 事件类型 auth-result detach cm-state
EventJSONStr string `json:"eventJSON" gorm:"column:event_json"` // data JSON String
CreatedAt int64 `json:"createdAt" gorm:"column:created_at"` // 记录创建存储毫秒
}
timestamp := time.Now().UnixMilli()
data := UEEvent{
NeType: "AMF",
NeName: "",
RmUID: "",
Timestamp: timestamp,
EventType: eventType,
EventJSONStr: "",
CreatedAt: timestamp,
}
// 从eventJson中获取rmUID
if v, ok := body["rmUID"]; ok {
ueEvent.RmUID = fmt.Sprint(v)
data.RmUID = fmt.Sprint(v)
} else {
ueEvent.RmUID = "4400HXAMF001"
data.RmUID = "4400HXAMF001"
}
if v, ok := body["neName"]; ok {
ueEvent.NeName = fmt.Sprint(v)
data.NeName = fmt.Sprint(v)
} else {
ueEvent.NeName = "AMF_001"
data.NeName = "AMF_001"
}
// 是否存在网元
neInfo := neService.NewNeInfo.FindByRmuid(data.RmUID)
if neInfo.NeType != "AMF" || neInfo.RmUID != data.RmUID {
services.ResponseInternalServerError500ProcessError(c.Writer, fmt.Errorf("network element does not exist"))
return
}
// 统一格式
@@ -92,7 +110,8 @@ func PostUEEventFromAMF(c *gin.Context) {
eventJson["result"] = fmt.Sprint(v)
}
if v, ok := body["authTime"]; ok {
eventJson["timestamp"] = ueEvent.Timestamp
authTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS)
eventJson["timestamp"] = authTime.Unix()
eventJson["time"] = fmt.Sprint(v)
}
case "detach":
@@ -108,7 +127,8 @@ func PostUEEventFromAMF(c *gin.Context) {
}
}
if v, ok := body["detachTime"]; ok {
eventJson["timestamp"] = ueEvent.Timestamp
detachTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS)
eventJson["timestamp"] = detachTime.Unix()
eventJson["time"] = fmt.Sprint(v)
}
case "cm-state":
@@ -123,27 +143,29 @@ func PostUEEventFromAMF(c *gin.Context) {
eventJson["result"] = fmt.Sprint(v)
}
if v, ok := body["changeTime"]; ok {
eventJson["timestamp"] = ueEvent.Timestamp
changeTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS)
eventJson["timestamp"] = changeTime.Unix()
eventJson["time"] = fmt.Sprint(v)
}
}
ueEvent.EventJson = eventJson
affected, err := dborm.XormInsertTableOne("ue_event_amf", ueEvent)
if err != nil && affected <= 0 {
log.Error("Failed to insert ue_event_amf:", err)
ueByte, err := json.Marshal(eventJson)
if err != nil {
services.ResponseInternalServerError500ProcessError(c.Writer, err)
return
}
data.EventJSONStr = string(ueByte)
if err := db.DB("").Table("ue_event_amf").Create(&data).Error; err != nil {
log.Error("Failed to insert ue_event_amf", err)
services.ResponseInternalServerError500ProcessError(c.Writer, err)
return
}
// 送到匹配的网元
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(ueEvent.RmUID)
if neInfo.RmUID == ueEvent.RmUID {
// 推送到ws订阅组
if ueEvent.NeType == "AMF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE, ueEvent)
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE+"_"+neInfo.NeId, ueEvent)
}
// 送到ws订阅组
if data.NeType == "AMF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE, data)
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE+"_"+neInfo.NeId, data)
}
services.ResponseStatusOK204NoContent(c.Writer)
@@ -152,35 +174,74 @@ func PostUEEventFromAMF(c *gin.Context) {
// UE上报处理
func PostUEEvent(w http.ResponseWriter, r *http.Request) {
log.Info("PostUEEvent processing... ")
neType := ctx.GetParam(r, "elementTypeValue")
var ueEvent UEEvent
if err := ctx.ShouldBindJSON(r, &ueEvent); err != nil {
var body struct {
NeType string `json:"neType" `
NeName string `json:"neName" `
RmUID string `json:"rmUID" `
Timestamp int64 `json:"timestamp" `
EventType string `json:"eventType" `
EventJson map[string]any `json:"eventJSON" `
}
if err := ctx.ShouldBindJSON(r, &body); err != nil {
services.ResponseInternalServerError500ProcessError(w, err)
return
}
ueEvent.NeType = strings.ToUpper(neType)
tableName := fmt.Sprintf("ue_event_%s", strings.ToLower(neType))
affected, err := dborm.XormInsertTableOne(tableName, ueEvent)
if err != nil && affected <= 0 {
neTypeLower := strings.ToLower(body.NeType)
if neType == "" || neType != neTypeLower {
services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("inconsistent network element types"))
return
}
// 是否存在网元
neInfo := neService.NewNeInfo.FindByRmuid(body.RmUID)
if neInfo.NeType != body.NeType || neInfo.RmUID != body.RmUID {
services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("network element does not exist"))
return
}
ueByte, err := json.Marshal(body.EventJson)
if err != nil {
services.ResponseInternalServerError500ProcessError(w, err)
return
}
// 执行插入表
type UEEvent struct {
ID string `json:"-" gorm:"column:id;primaryKey;autoIncrement"`
NeType string `json:"neType" gorm:"column:ne_type"`
NeName string `json:"neName" gorm:"column:ne_name"`
RmUID string `json:"rmUID" gorm:"column:rm_uid"` // 可能没有
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` // 接收到的timestamp秒级存储毫秒时间戳
EventType string `json:"eventType" gorm:"column:event_type"` // 事件类型 auth-result detach cm-state
EventJSONStr string `json:"eventJSON" gorm:"column:event_json"` // data JSON String
CreatedAt int64 `json:"-" gorm:"column:created_at"` // 记录创建存储毫秒
}
data := UEEvent{
NeType: body.NeType,
NeName: body.NeName,
RmUID: body.RmUID,
Timestamp: int64(body.Timestamp) * 1000,
EventType: body.EventType,
EventJSONStr: string(ueByte),
CreatedAt: time.Now().UnixMilli(),
}
tableName := fmt.Sprintf("ue_event_%s", neTypeLower)
if err := db.DB("").Table(tableName).Create(&data).Error; err != nil {
log.Error("Failed to insert "+tableName, err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
// 送到匹配的网元
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(ueEvent.RmUID)
if neInfo.RmUID == ueEvent.RmUID {
// 推送到ws订阅组
if ueEvent.NeType == "MME" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_MME_UE, ueEvent)
wsService.NewWSSend.ByGroupID(wsService.GROUP_MME_UE+"_"+neInfo.NeId, ueEvent)
}
if ueEvent.NeType == "AMF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE, ueEvent)
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE+"_"+neInfo.NeId, ueEvent)
}
// 送到ws订阅组
if body.NeType == "MME" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_MME_UE, data)
wsService.NewWSSend.ByGroupID(wsService.GROUP_MME_UE+"_"+neInfo.NeId, data)
}
if body.NeType == "AMF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE, data)
wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE+"_"+neInfo.NeId, data)
}
services.ResponseStatusOK204NoContent(w)

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,6 @@ import (
"strings"
"be.ems/lib/config"
"be.ems/lib/dborm"
"be.ems/lib/log"
"gopkg.in/gomail.v2"
@@ -66,16 +65,6 @@ func AlarmEmailForward(alarmData *Alarm) error {
// return err
// }
emails := strings.Split(config.GetYamlConfig().Alarm.EmailForward.EmailList, ",")
forwardLog := &dborm.AlarmForwardLog{
NeType: alarmData.NeType,
NeID: alarmData.NeId,
AlarmID: alarmData.AlarmId,
AlarmTitle: alarmData.AlarmTitle,
AlarmSeq: alarmData.AlarmSeq,
EventTime: alarmData.EventTime,
ToUser: config.GetYamlConfig().Alarm.EmailForward.EmailList,
}
m.SetHeader("To", emails...) // 收件人,可以多个收件人,但必须使用相同的 SMTP 连接
//m.SetHeader("To", strings.Join(*emails, " ")) // 收件人,可以多个收件人,但必须使用相同的 SMTP 连接
//m.SetHeader("To", "zhangshuzhong@agrandtech.com", "simonzhangsz@outlook.com") // 收件人,可以多个收件人,但必须使用相同的 SMTP 连接
@@ -107,22 +96,6 @@ func AlarmEmailForward(alarmData *Alarm) error {
d.TLSConfig = &tls.Config{InsecureSkipVerify: false}
}
if err := d.DialAndSend(m); err != nil {
operResult := fmt.Sprintf("Failed to DialAndSend:%v", err)
log.Error(operResult)
forwardLog.OperResult = operResult
affected, err := dborm.XormInsertAlarmForwardLog(forwardLog)
if err != nil && affected <= 0 {
log.Error("Failed to insert data:", err)
}
return err
}
forwardLog.OperResult = "Email sent successfully!"
affected, err := dborm.XormInsertAlarmForwardLog(forwardLog)
if err != nil && affected <= 0 {
log.Error("Failed to insert data:", err)
return err
}
return nil
err := d.DialAndSend(m)
return writeLog(alarmData, config.GetYamlConfig().Alarm.EmailForward.EmailList, "EMAIL", err)
}

View File

@@ -11,6 +11,10 @@ import (
"be.ems/lib/config"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/framework/utils/date"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
"github.com/linxGnu/gosmpp"
"github.com/linxGnu/gosmpp/data"
"github.com/linxGnu/gosmpp/pdu"
@@ -167,28 +171,31 @@ func AlarmForwardBySMPP(alarmData *Alarm) (string, error) {
}
func writeLog(alarmData *Alarm, toUser, forwardBy string, err error) error {
var result string
if err == nil {
result = "SMS sent successfully"
} else {
var result string = fmt.Sprintf("%s Sent Successfully!", forwardBy)
if err != nil {
result = err.Error()
}
forwardLog := &dborm.AlarmForwardLog{
NeType: alarmData.NeType,
NeID: alarmData.NeId,
AlarmID: alarmData.AlarmId,
AlarmTitle: alarmData.AlarmTitle,
AlarmSeq: alarmData.AlarmSeq,
EventTime: alarmData.EventTime,
Interface: forwardBy,
ToUser: toUser,
OperResult: result,
neInfo := neService.NewNeInfo.FindByRmuid(alarmData.NeId)
eventTime := date.ParseStrToDate(alarmData.EventTime, date.YYYY_MM_DDTHH_MM_SSZ)
alarmForwardLog := neDataModel.AlarmForwardLog{
NeType: neInfo.NeType,
NeId: neInfo.NeId,
AlarmSeq: int64(alarmData.AlarmSeq),
AlarmId: alarmData.AlarmId,
AlarmTitle: alarmData.AlarmTitle,
AlarmCode: int64(alarmData.AlarmCode),
AlarmStatus: fmt.Sprint(alarmData.AlarmStatus),
OrigSeverity: origSeverityValue(alarmData.OrigSeverity),
EventTime: eventTime.UnixMilli(),
Type: forwardBy,
Target: toUser,
Result: result,
}
affected, err := dborm.XormInsertAlarmForwardLog(forwardLog)
if err != nil && affected <= 0 {
log.Error("Failed to insert data:", err)
return err
// 记录日志
insertId := neDataService.NewAlarmForwardLog.Insert(alarmForwardLog)
if insertId <= 0 {
return fmt.Errorf("failed to insert data")
}
return nil
}

View File

@@ -2,25 +2,21 @@ package file_export
import (
"encoding/json"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"time"
"github.com/jlaffaye/ftp"
"be.ems/lib/file"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/config"
"be.ems/src/framework/datasource"
"be.ems/src/framework/database/db"
"be.ems/src/framework/i18n"
"be.ems/src/framework/reqctx"
"be.ems/src/framework/resp"
"be.ems/src/framework/ssh"
"be.ems/src/framework/utils/crypto"
"be.ems/src/framework/utils/ctx"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/vo/result"
systemService "be.ems/src/modules/system/service"
"github.com/gin-gonic/gin"
)
@@ -45,13 +41,13 @@ type TargetParams struct {
func (m *SysJob) GetFileExportTable(c *gin.Context) {
var results []SysJob
err := datasource.DefaultDB().Table(m.TableName()).Where("invoke_target=? and status=1", INVOKE_FILE_EXPORT).
err := db.DB("").Table(m.TableName()).Where("invoke_target=? and status=1", INVOKE_FILE_EXPORT).
Find(&results).Error
if err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
language := ctx.AcceptLanguage(c)
language := reqctx.AcceptLanguage(c)
var response []SysJobResponse
for _, job := range results {
var params TargetParams
@@ -154,54 +150,54 @@ func (m *FileExport) Delete(c *gin.Context) {
// 设置FTP配置
// POST /table/ftp
func (m *SysJob) SetFTPConfig(c *gin.Context) {
language := ctx.AcceptLanguage(c)
language := reqctx.AcceptLanguage(c)
var body struct {
Password string `json:"password" `
Username string `json:"username" binding:"required"`
ToIp string `json:"toIp" binding:"required"`
ToPort int64 `json:"toPort" binding:"required"`
Protocol string `json:"protocol" binding:"required,oneof=ssh ftp"`
Enable bool `json:"enable"`
Dir string `json:"dir" binding:"required"`
}
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
c.JSON(400, resp.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 获取配置
cfg := systemService.NewSysConfigImpl.SelectConfigByKey("sys.exportTable")
if cfg.ConfigID != "" {
cfg := systemService.NewSysConfig.FindByKey("sys.exportTable")
if cfg.ConfigId > 0 {
// 加密body
appKey := config.Get("aes.appKey").(string)
byteData, err := json.Marshal(body)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
bodyEn, err := crypto.AESEncryptBase64(string(byteData), appKey)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
// 更新
cfg.ConfigValue = bodyEn
systemService.NewSysConfigImpl.UpdateConfig(cfg)
systemService.NewSysConfig.Update(cfg)
}
c.JSON(200, result.Ok(nil))
c.JSON(200, resp.Ok(nil))
}
// 设置FTP配置
// 获取FTP配置
// GET /table/ftp
func (m *SysJob) GetFTPConfig(c *gin.Context) {
// 获取配置
cfg := systemService.NewSysConfigImpl.SelectConfigByKey("sys.exportTable")
if cfg.ConfigID != "" {
cfg := systemService.NewSysConfig.FindByKey("sys.exportTable")
if cfg.ConfigId > 0 {
// 解密body
appKey := config.Get("aes.appKey").(string)
bodyDe, err := crypto.AESDecryptBase64(cfg.ConfigValue, appKey)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
var body struct {
@@ -209,31 +205,31 @@ func (m *SysJob) GetFTPConfig(c *gin.Context) {
Username string `json:"username" binding:"required"`
ToIp string `json:"toIp" binding:"required"`
ToPort int64 `json:"toPort" binding:"required"`
Protocol string `json:"protocol" binding:"required,oneof=ssh ftp"`
Enable bool `json:"enable"`
Dir string `json:"dir" binding:"required"`
}
err = json.Unmarshal([]byte(bodyDe), &body)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
c.JSON(200, result.OkData(body))
c.JSON(200, resp.OkData(body))
return
}
c.JSON(200, result.Ok(nil))
c.JSON(200, resp.Ok(nil))
}
// FTP发送
// PUT /table/ftp
func (m *SysJob) PutFTP(c *gin.Context) {
language := ctx.AcceptLanguage(c)
language := reqctx.AcceptLanguage(c)
var body struct {
FilePath string `json:"filePath" binding:"required"`
FileName string `json:"fileName" binding:"required"`
}
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
c.JSON(400, resp.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
@@ -241,7 +237,7 @@ func (m *SysJob) PutFTP(c *gin.Context) {
// 判断文件是否存在
if _, err := os.Stat(localFilePath); os.IsNotExist(err) {
c.JSON(200, result.ErrMsg(err.Error()))
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
@@ -251,88 +247,55 @@ func (m *SysJob) PutFTP(c *gin.Context) {
Username string `json:"username" binding:"required"`
ToIp string `json:"toIp" binding:"required"`
ToPort int64 `json:"toPort" binding:"required"`
Protocol string `json:"protocol" binding:"required,oneof=ssh ftp"`
Enable bool `json:"enable"`
Dir string `json:"dir" binding:"required"`
}
cfg := systemService.NewSysConfigImpl.SelectConfigByKey("sys.exportTable")
if cfg.ConfigID != "" {
cfg := systemService.NewSysConfig.FindByKey("sys.exportTable")
if cfg.ConfigId > 0 {
// 解密body
appKey := config.Get("aes.appKey").(string)
bodyDe, err := crypto.AESDecryptBase64(cfg.ConfigValue, appKey)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
err = json.Unmarshal([]byte(bodyDe), &cfgData)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
}
if cfgData.Protocol == "ssh" {
connSSH := ssh.ConnSSH{
User: cfgData.Username,
Password: cfgData.Password,
Addr: cfgData.ToIp,
Port: cfgData.ToPort,
AuthMode: "0",
}
sshClient, err := connSSH.NewClient()
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
defer sshClient.Close()
// 网元主机的SSH客户端进行文件传输
sftpClient, err := sshClient.NewClientSFTP()
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
defer sftpClient.Close()
// 远程文件
remotePath := filepath.Join(cfgData.Dir, path.Base(body.FilePath), body.FileName)
// 复制到远程
if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil {
c.JSON(200, result.ErrMsg("error uploading file"))
return
}
c.JSON(200, result.Ok(nil))
if !cfgData.Enable {
c.JSON(200, resp.ErrMsg("Setting Remote Backup is disabled"))
return
}
if cfgData.Protocol == "ftp" {
// 连接到 FTP 服务器
addr := fmt.Sprintf("%s:%d", cfgData.ToIp, cfgData.ToPort)
ftpComm, err := ftp.Dial(addr, ftp.DialWithTimeout(15*time.Second))
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
// 登录到 FTP 服务器
err = ftpComm.Login(cfgData.Username, cfgData.Password)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
defer ftpComm.Quit()
// 打开本地文件
file, err := os.Open(localFilePath)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
defer file.Close()
// 远程文件
remotePath := filepath.Join(cfgData.Dir, path.Base(body.FilePath), body.FileName)
// 上传文件到 FTP 服务器
err = ftpComm.Stor(remotePath, file)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
connSSH := ssh.ConnSSH{
User: cfgData.Username,
Password: cfgData.Password,
Addr: cfgData.ToIp,
Port: cfgData.ToPort,
AuthMode: "0",
}
c.JSON(200, result.Err(nil))
sshClient, err := connSSH.NewClient()
if err != nil {
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
defer sshClient.Close()
// 网元主机的SSH客户端进行文件传输
sftpClient, err := sshClient.NewClientSFTP()
if err != nil {
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
defer sftpClient.Close()
// 远程文件
remotePath := filepath.Join(cfgData.Dir, path.Base(body.FilePath), body.FileName)
// 复制到远程
if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil {
c.JSON(200, resp.ErrMsg("error uploading file"))
return
}
c.JSON(200, resp.Ok(nil))
}

View File

@@ -17,10 +17,12 @@ func Register(r *gin.RouterGroup) {
)
lmTable.POST("/ftp",
middleware.PreAuthorize(nil),
middleware.CryptoApi(true, false),
m.SetFTPConfig,
)
lmTable.GET("/ftp",
middleware.PreAuthorize(nil),
middleware.CryptoApi(false, true),
m.GetFTPConfig,
)
lmTable.PUT("/ftp",

View File

@@ -10,9 +10,9 @@ import (
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/database/db"
"github.com/gorilla/mux"
"xorm.io/xorm"
)
type XormResponse struct {
@@ -29,58 +29,12 @@ var (
CustomExtBackupDataUri = config.UriPrefix + "/dataManagement/{apiVersion}/{dataStorage}/{dataObject}/backup" // for external
)
var XEngine *xorm.Engine
type DatabaseClient struct {
dbType string
dbUrl string
dbConnMaxLifetime time.Duration
dbMaxIdleConns int
dbMaxOpenConns int
IsShowSQL bool
XEngine *xorm.Engine
}
var DbClient DatabaseClient
// func init() {
// conf := config.GetYamlConfig()
// InitDbClient(conf.Database.Type, conf.Database.User, conf.Database.Password,
// conf.Database.Host, conf.Database.Port, conf.Database.Name)
// }
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error {
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s",
dbUser, dbPassword, dbHost, dbPort, dbName, dbParam)
DbClient.dbType = dbType
DbClient.dbConnMaxLifetime = 0
DbClient.dbMaxIdleConns = 0
DbClient.dbMaxOpenConns = 0
if log.GetLevel() == log.LOG_TRACE {
DbClient.IsShowSQL = true
}
log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl)
var err error
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
if err != nil {
log.Error("Failed to connet database:", err)
return err
}
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
DbClient.XEngine.DatabaseTZ = time.Local // 必须
DbClient.XEngine.TZLocation = time.Local // 必须
if DbClient.IsShowSQL {
DbClient.XEngine.ShowSQL(true)
}
XEngine = DbClient.XEngine
return nil
}
func ExtDatabaseBackupData(w http.ResponseWriter, r *http.Request) {
log.Debug("ExtDatabaseBackupData processing... ")
@@ -126,16 +80,15 @@ func ExtDatabaseBackupData(w http.ResponseWriter, r *http.Request) {
return
}
res, err := DbClient.XEngine.Exec(sql)
affected, err := db.ExecDB("", sql, nil)
if err != nil {
log.Error("Failed to exec SQL:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
}
affected, _ := res.RowsAffected()
log.Debugf("filePath:%s backup dir:%s", filePath, config.GetYamlConfig().Database.Backup)
cmd := exec.Command("cp", "-rf", filePath, config.GetYamlConfig().Database.Backup)
log.Debugf("filePath:%s backup dir:%s", filePath, config.GetYamlConfig().OMC.Backup)
cmd := exec.Command("cp", "-rf", filePath, config.GetYamlConfig().OMC.Backup)
out, err := cmd.CombinedOutput()
log.Debugf("Exec output: %v", string(out))
if err != nil {

View File

@@ -16,7 +16,7 @@ import (
"be.ems/lib/log"
"be.ems/lib/mmlp"
"be.ems/lib/services"
tokenConst "be.ems/src/framework/constants/token"
"be.ems/src/framework/constants"
neModel "be.ems/src/modules/network_element/model"
neService "be.ems/src/modules/network_element/service"
)
@@ -83,7 +83,7 @@ func PostMML2ToNF(w http.ResponseWriter, r *http.Request) {
}
log.Debug("neType:", neType, "neId", neId)
neInfoArr := neService.NewNeInfo.SelectList(neModel.NeInfo{NeType: neType, NeId: neId}, false, true)
neInfoArr := neService.NewNeInfo.Find(neModel.NeInfo{NeType: neType, NeId: neId}, false, true)
if len(neInfoArr) < 1 {
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
@@ -235,7 +235,7 @@ func PostMMLToNF(w http.ResponseWriter, r *http.Request) {
return
}
neInfoArr := neService.NewNeInfo.SelectList(neModel.NeInfo{NeType: neType, NeId: neId}, false, true)
neInfoArr := neService.NewNeInfo.Find(neModel.NeInfo{NeType: neType, NeId: neId}, false, true)
if len(neInfoArr) < 1 {
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
@@ -635,7 +635,7 @@ func PostMMLToOMC(w http.ResponseWriter, r *http.Request) {
}
log.Debug("neType:", neType, "neId", neId)
neInfoArr := neService.NewNeInfo.SelectList(neModel.NeInfo{NeType: neType, NeId: neId}, false, false)
neInfoArr := neService.NewNeInfo.Find(neModel.NeInfo{NeType: neType, NeId: neId}, false, false)
if len(neInfoArr) < 1 {
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
@@ -664,8 +664,8 @@ func PostMMLToOMC(w http.ResponseWriter, r *http.Request) {
MmlHome: config.GetYamlConfig().MML.MmlHome,
Limit: 50,
User: "",
SessionToken: "", // 旧token
Authorization: r.Header.Get(tokenConst.HEADER_KEY), // 请求Token
SessionToken: "", // 旧token
Authorization: r.Header.Get(constants.HEADER_KEY), // 请求Token
HttpUri: hostUri,
UserAgent: config.GetDefaultUserAgent(),
}

View File

@@ -1,13 +1,15 @@
package kpi_c_report
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"be.ems/lib/dborm"
"be.ems/lib/services"
"be.ems/src/framework/database/db"
"be.ems/src/framework/utils/parse"
"github.com/gin-gonic/gin"
)
@@ -31,7 +33,7 @@ func (k *KpiCReport) Get(c *gin.Context) {
return
}
tableName := TableName() + "_" + strings.ToLower(querys.NeType)
dbg := dborm.DefaultDB().Table(tableName)
dbg := db.DB("").Model(&KpiCReport{}).Table(tableName)
if querys.NeID != "" {
conditions = append(conditions, "rm_uid = (select n.rm_uid from ne_info n where n.ne_type=? and n.ne_id=? and n.status=1)")
@@ -69,7 +71,7 @@ func (k *KpiCReport) Get(c *gin.Context) {
dbg = dbg.Order(orderBy)
}
//err := dborm.DefaultDB().Table(tableName).Where(whereSql, params...).Find(&reports).Error
//err := db.DB("").Table(tableName).Where(whereSql, params...).Find(&reports).Error
err := dbg.Find(&reports).Error
if err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
@@ -98,7 +100,7 @@ func (k *KpiCReport) GetReport2FE(c *gin.Context) {
return
}
tableName := TableName() + "_" + strings.ToLower(querys.NeType)
dbg := dborm.DefaultDB().Table(tableName)
dbg := db.DB("").Model(&KpiCReport{}).Table(tableName)
if querys.NeID != "" {
conditions = append(conditions, "rm_uid = (select n.rm_uid from ne_info n where n.ne_type=? and n.ne_id=? and n.status=1)")
@@ -108,11 +110,11 @@ func (k *KpiCReport) GetReport2FE(c *gin.Context) {
return
}
if querys.StartTime != "" {
conditions = append(conditions, "(UNIX_TIMESTAMP(created_at) * 1000) >= ?")
conditions = append(conditions, "created_at >= ?")
params = append(params, querys.StartTime)
}
if querys.EndTime != "" {
conditions = append(conditions, "(UNIX_TIMESTAMP(created_at) * 1000) <= ?")
conditions = append(conditions, "created_at <= ?")
params = append(params, querys.EndTime)
}
conditions = append(conditions, "kpi_values != 'null'")
@@ -136,7 +138,7 @@ func (k *KpiCReport) GetReport2FE(c *gin.Context) {
dbg = dbg.Order(orderBy)
}
//err := dborm.DefaultDB().Table(tableName).Where(whereSql, params...).Find(&reports).Error
//err := db.DB("").Table(tableName).Where(whereSql, params...).Find(&reports).Error
err := dbg.Find(&results).Error
if err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
@@ -156,14 +158,24 @@ func (k *KpiCReport) GetReport2FE(c *gin.Context) {
"createdAt": r.CreatedAt,
"granularity": r.Granularity,
}
// 解析 JSON 字符串为 map
var kpiValues []map[string]any
err := json.Unmarshal([]byte(r.KpiValues), &kpiValues)
if err != nil {
continue
}
for _, k := range r.KpiValues {
formatted := fmt.Sprintf("%.3f", k.Value)
// 遍历 kpiValues 数组
for _, k := range kpiValues {
kpiId := fmt.Sprint(k["kpiId"])
value := parse.Number(k["value"])
formatted := fmt.Sprintf("%.3d", value)
formattedFloat, err := strconv.ParseFloat(formatted, 64)
if err != nil {
formattedFloat = 0
}
report[k.KPIID] = formattedFloat
report[kpiId] = formattedFloat
}
reports = append(reports, report)
}
@@ -190,14 +202,14 @@ func (k *KpiCReport) GetTotalList(c *gin.Context) {
return
}
tableName := TableName() + "_" + strings.ToLower(querys.NeType)
dbg := dborm.DefaultDB().Table(tableName)
dbg := db.DB("").Model(&KpiCReport{}).Table(tableName)
if querys.StartTime != "" {
conditions = append(conditions, "(UNIX_TIMESTAMP(created_at) * 1000) >= ?")
conditions = append(conditions, "created_at >= ?")
params = append(params, querys.StartTime)
}
if querys.EndTime != "" {
conditions = append(conditions, "(UNIX_TIMESTAMP(created_at) * 1000) <= ?")
conditions = append(conditions, "created_at <= ?")
params = append(params, querys.EndTime)
}
conditions = append(conditions, "kpi_values != 'null'")
@@ -230,7 +242,7 @@ func (k *KpiCReport) GetTotalList(c *gin.Context) {
dbg = dbg.Order(orderBy)
}
//err := dborm.DefaultDB().Table(tableName).Where(whereSql, params...).Find(&reports).Error
//err := db.DB("").Table(tableName).Where(whereSql, params...).Find(&reports).Error
err = dbg.Find(&reports).Error
if err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
@@ -259,8 +271,7 @@ func (k *KpiCReport) Total(c *gin.Context) {
return
}
tableName := TableName() + "_" + strings.ToLower(querys.NeType)
dbg := dborm.DefaultDB().Table(tableName)
dbg := db.DB("").Model(&KpiCReport{}).Table(tableName)
if querys.StartTime != "" {
conditions = append(conditions, "(UNIX_TIMESTAMP(created_at) * 1000) >= ?")
params = append(params, querys.StartTime)
@@ -293,7 +304,8 @@ func (k *KpiCReport) Post(c *gin.Context) {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
if err := dborm.DefaultDB().Create(&report).Error; err != nil {
dbg := db.DB("").Model(&KpiCReport{})
if err := dbg.Create(&report).Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
@@ -303,8 +315,8 @@ func (k *KpiCReport) Post(c *gin.Context) {
func (k *KpiCReport) Put(c *gin.Context) {
var report KpiCReport
id := c.Param("id")
if err := dborm.DefaultDB().First(&report, id).Error; err != nil {
dbg := db.DB("").Model(&KpiCReport{})
if err := dbg.First(&report, id).Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp("custom indicator report not found"))
return
}
@@ -313,14 +325,14 @@ func (k *KpiCReport) Put(c *gin.Context) {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
dborm.DefaultDB().Save(&report)
db.DB("").Model(&KpiCReport{}).Save(&report)
c.JSON(http.StatusOK, services.DataResp(report))
}
func (k *KpiCReport) Delete(c *gin.Context) {
id := c.Param("id")
if err := dborm.DefaultDB().Delete(&KpiCReport{}, id).Error; err != nil {
if err := db.DB("").Delete(&KpiCReport{}, id).Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp("custom indicator report not found"))
return
}
@@ -330,7 +342,8 @@ func (k *KpiCReport) Delete(c *gin.Context) {
func InsertKpiCReport(neType string, report KpiCReport) {
tableName := TableName() + "_" + strings.ToLower(neType)
if err := dborm.DefaultDB().Table(tableName).Create(&report).Error; err != nil {
dbg := db.DB("").Model(&KpiCReport{})
if err := dbg.Table(tableName).Create(&report).Error; err != nil {
return
}
}

View File

@@ -1,32 +1,17 @@
package kpi_c_report
import (
"database/sql/driver"
"encoding/json"
"fmt"
"time"
)
type KpiCVal struct {
KPIID string `json:"kpi_id" gorm:"column:kpi_id"`
Value float64 `json:"value" gorm:"column:value"`
Err string `json:"err" gorm:"column:err"`
}
type KpiCValues []KpiCVal
type KpiCReport struct {
ID int `gorm:"column:id;primary_key;auto_increment" json:"id"`
NeType *string `gorm:"column:ne_type;default:NULL" json:"neType,omitempty"`
NeName *string `gorm:"column:ne_name;default:" json:"neName,omitempty"`
RmUID *string `gorm:"column:rm_uid;default:NULL" json:"rmUid,omitempty"`
Date string `gorm:"column:date" json:"date"` // time.Time `gorm:"column:date" json:"date"`
StartTime *string `gorm:"column:start_time;default:NULL" json:"startTime,omitempty"`
EndTime *string `gorm:"column:end_time;default:NULL" json:"endTime,omitempty"`
Index int16 `gorm:"column:index" json:"index"`
Granularity *int8 `gorm:"column:granularity;default:60" json:"granularity,omitempty"` //Time granualarity: 5/10/.../60/300 (second)
KpiValues KpiCValues `gorm:"column:kpi_values;type:json" json:"kpiValues,omitempty"`
CreatedAt *time.Time `gorm:"column:created_at;default:current_timestamp()" json:"createdAt,omitempty"`
ID int `gorm:"column:id;primary_key;auto_increment" json:"id"`
NeType *string `gorm:"column:ne_type;default:NULL" json:"neType,omitempty"`
NeName *string `gorm:"column:ne_name;default:" json:"neName,omitempty"`
RmUID *string `gorm:"column:rm_uid;" json:"rmUid,omitempty"`
Date string `gorm:"column:date" json:"date"` // time.Time `gorm:"column:date" json:"date"`
StartTime *string `gorm:"column:start_time" json:"startTime,omitempty"`
EndTime *string `gorm:"column:end_time" json:"endTime,omitempty"`
Index int64 `gorm:"column:index" json:"index"`
Granularity *int64 `gorm:"column:granularity" json:"granularity,omitempty"` //Time granualarity: 5/10/.../60/300 (second)
KpiValues string `gorm:"column:kpi_values" json:"kpiValues,omitempty"`
CreatedAt *int64 `gorm:"column:created_at" json:"createdAt,omitempty"`
}
type KpiCReportQuery struct {
@@ -55,17 +40,3 @@ type KpiCReport2FE struct {
func TableName() string {
return "kpi_c_report"
}
// 将 KpiCValues 转换为 JSON 字节
func (k KpiCValues) Value() (driver.Value, error) {
return json.Marshal(k)
}
// 从字节中扫描 KpiCValues
func (k *KpiCValues) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return fmt.Errorf("failed to scan value: %v", value)
}
return json.Unmarshal(b, k)
}

View File

@@ -4,13 +4,16 @@ import (
"fmt"
"net/http"
"regexp"
"sort"
"strconv"
"strings"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/utils/ctx"
"be.ems/src/framework/database/db"
"be.ems/src/framework/reqctx"
"github.com/gin-gonic/gin"
)
@@ -19,7 +22,7 @@ func (k *KpiCTitle) GetToalList(c *gin.Context) {
var titles []KpiCTitle
var conditions []string
var params []any
i18n := ctx.AcceptLanguage(c)
i18n := reqctx.AcceptLanguage(c)
var querys KpiCTitleQuery
if err := c.ShouldBindQuery(&querys); err != nil {
@@ -27,7 +30,7 @@ func (k *KpiCTitle) GetToalList(c *gin.Context) {
return
}
dbg := dborm.DefaultDB().Table(k.TableName())
dbg := db.DB("").Model(&KpiCTitle{})
// construct condition to get
if neType := querys.NeType; neType != "" {
conditions = append(conditions, "ne_type = ?")
@@ -37,7 +40,7 @@ func (k *KpiCTitle) GetToalList(c *gin.Context) {
conditions = append(conditions, "status = ?")
params = append(params, status)
} else {
conditions = append(conditions, "status != 'Deleted'")
conditions = append(conditions, "status != '2'")
}
whereSql := ""
@@ -81,7 +84,7 @@ func (k *KpiCTitle) Get(c *gin.Context) {
var titles []KpiCTitle
var conditions []string
var params []any
i18n := ctx.AcceptLanguage(c)
i18n := reqctx.AcceptLanguage(c)
// construct condition to get
if neType := c.Query("neType"); neType != "" {
@@ -92,14 +95,14 @@ func (k *KpiCTitle) Get(c *gin.Context) {
conditions = append(conditions, "status = ?")
params = append(params, status)
} else {
conditions = append(conditions, "status != 'Deleted'")
conditions = append(conditions, "status != '2'")
}
whereSql := ""
if len(conditions) > 0 {
whereSql += strings.Join(conditions, " and ")
}
if err := dborm.DefaultDB().Where(whereSql, params...).Find(&titles).Error; err != nil {
if err := db.DB("").Where(whereSql, params...).Find(&titles).Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
@@ -126,11 +129,18 @@ func (k *KpiCTitle) expressionAlias(titles []KpiCTitle, i18n string) {
} else {
sql = fmt.Sprintf("SELECT en_title FROM kpi_title WHERE kpi_id='%s'", match[1])
}
err := dborm.XCoreDB().QueryRow(sql).Scan(&alias)
m, err := db.RawDB("", sql, nil)
if err != nil {
log.Warn("Failed to QueryRow:", err)
continue
}
if len(m) > 0 {
if i18n == "zh" {
alias = fmt.Sprintf("%v", m[0]["cn_title"])
} else {
alias = fmt.Sprintf("%v", m[0]["en_title"])
}
}
title.ExprAlias = regexp.MustCompile(match[1]).ReplaceAllString(title.ExprAlias, alias)
}
}
@@ -149,7 +159,7 @@ func (k *KpiCTitle) Total(c *gin.Context) {
conditions = append(conditions, "status = ?")
params = append(params, status)
} else {
conditions = append(conditions, "status != 'Deleted'")
conditions = append(conditions, "status != '2'")
}
whereSql := ""
@@ -157,7 +167,7 @@ func (k *KpiCTitle) Total(c *gin.Context) {
whereSql += strings.Join(conditions, " and ")
}
var total int64 = 0
if err := dborm.DefaultDB().Table(k.TableName()).Where(whereSql, params...).Count(&total).Error; err != nil {
if err := db.DB("").Table(k.TableName()).Where(whereSql, params...).Count(&total).Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
@@ -166,31 +176,56 @@ func (k *KpiCTitle) Total(c *gin.Context) {
}
func (k *KpiCTitle) Post(c *gin.Context) {
var title, res KpiCTitle
var title KpiCTitle
if err := c.ShouldBindJSON(&title); err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
userName := ctx.LoginUserToUserName(c)
userName := reqctx.LoginUserToUserName(c)
title.CreatedBy = &userName
result := dborm.DefaultDB().Where("ne_type=? and (kpi_id=? or title=?) and status!='Deleted'", title.NeType, title.KpiID, title.Title).First(&title)
tx := db.DB("").Model(&KpiCTitle{})
result := tx.Where("ne_type=? and (kpi_id=? or title=?) and status!='2'", title.NeType, title.KpiID, title.Title).First(&title)
if result.RowsAffected > 0 {
c.JSON(http.StatusOK, services.ErrResp("custom indicator already exist"))
return
}
// Regexp match like AMF.C.01
kpiIDRegexp := "^" + *title.NeType + "\\.C\\.[0-9]{2}$"
ret := dborm.DefaultDB().Table("kpi_c_title").
Where("ne_type=? and kpi_id REGEXP ? ORDER BY kpi_id DESC LIMIT 1", title.NeType, kpiIDRegexp).Scan(&res)
if err := ret.Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
// kpiIDRegexp := "^" + *title.NeType + "\\.C\\.[0-9]{2}$"
// ret := db.DB("").Table("kpi_c_title").
// Where("ne_type=? and kpi_id REGEXP ? ORDER BY kpi_id DESC LIMIT 1", title.NeType, kpiIDRegexp).Scan(&res)
// if err := ret.Error; err != nil {
// c.JSON(http.StatusOK, services.ErrResp(err.Error()))
// return
// }
titles := []KpiCTitle{}
ret := db.DB("").Model(&KpiCTitle{})
ret = ret.Select("kpi_id").Where("ne_type=?", title.NeType).Find(&titles)
newKpiID := *title.NeType + ".C" + ".01"
if ret.RowsAffected != 0 {
maxKpiID := *res.KpiID
suffixInt := 1
prefixStr := fmt.Sprintf("%s.C.", *title.NeType)
sort.SliceStable(titles, func(i, j int) bool {
vi := *titles[i].KpiID
vj := *titles[j].KpiID
if strings.HasPrefix(vi, prefixStr) && strings.HasPrefix(vj, prefixStr) {
vvi := strings.Replace(vi, prefixStr, "", 1)
vvii, err := strconv.Atoi(vvi)
if err != nil {
return false
}
vvj := strings.Replace(vj, prefixStr, "", 1)
vvjj, err := strconv.Atoi(vvj)
if err != nil {
return false
}
return vvii > vvjj // desc
}
return false
})
maxKpiID := *titles[0].KpiID
prefix := maxKpiID[:len(maxKpiID)-2]
suffix := maxKpiID[len(maxKpiID)-2:]
suffixInt, err := strconv.Atoi(suffix)
@@ -209,25 +244,26 @@ func (k *KpiCTitle) Post(c *gin.Context) {
newKpiID = prefix + newSuffix
}
title.KpiID = &newKpiID
if err := dborm.DefaultDB().Create(&title).Error; err != nil {
txx := db.DB("").Model(&KpiCTitle{})
if err := txx.Create(&title).Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
kpiCReportTable := "kpi_c_report_" + strings.ToLower(*title.NeType)
if !dborm.DefaultDB().Migrator().HasTable(kpiCReportTable) {
if !db.DB("").Migrator().HasTable(kpiCReportTable) {
// clone table "kpi_c_report" to "kpi_c_report_{neType}"
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s AS SELECT * FROM %s WHERE 1=0", kpiCReportTable, "kpi_c_report")
if _, err := dborm.ExecSQL(sql, nil); err != nil {
if _, err := db.ExecDB("", sql, nil); err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
sql = fmt.Sprintf("ALTER TABLE %s MODIFY COLUMN `id` int(11) NOT NULL AUTO_INCREMENT FIRST,ADD PRIMARY KEY IF NOT EXISTS (`id`)", kpiCReportTable)
if _, err := dborm.ExecSQL(sql, nil); err != nil {
if _, err := db.ExecDB("", sql, nil); err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
sql = fmt.Sprintf("ALTER TABLE %s ADD INDEX IF NOT EXISTS `idx_timestamp`(`created_at`) USING BTREE, ADD INDEX IF NOT EXISTS `idx_uid_datetime`(`rm_uid`, `date`, `start_time`) USING BTREE", kpiCReportTable)
if _, err := dborm.ExecSQL(sql, nil); err != nil {
if _, err := db.ExecDB("", sql, nil); err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
@@ -239,7 +275,7 @@ func (k *KpiCTitle) Put(c *gin.Context) {
var title KpiCTitle
id := c.Param("id")
if err := dborm.DefaultDB().First(&title, id).Error; err != nil {
if err := db.DB("").First(&title, id).Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp("custom indicator not found"))
return
}
@@ -248,7 +284,8 @@ func (k *KpiCTitle) Put(c *gin.Context) {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
dborm.DefaultDB().Save(&title)
title.UpdatedAt = time.Now().UnixMilli()
db.DB("").Save(&title)
c.JSON(http.StatusOK, services.DataResp(title))
}
@@ -256,7 +293,7 @@ func (k *KpiCTitle) Put(c *gin.Context) {
func (k *KpiCTitle) Delete(c *gin.Context) {
id := c.Param("id")
if err := dborm.DefaultDB().Table(k.TableName()).Where("id=?", id).Update("status", "Deleted").Error; err != nil {
if err := db.DB("").Table(k.TableName()).Where("id=?", id).Update("status", "2").Error; err != nil {
c.JSON(http.StatusOK, services.ErrResp(err.Error()))
return
}
@@ -267,7 +304,7 @@ func (k *KpiCTitle) Delete(c *gin.Context) {
func GetActiveKPICList(neType string) []KpiCTitle {
k := new([]KpiCTitle)
err := dborm.DefaultDB().Where("`ne_type` = ? and `status` = 'Active'", neType).Find(&k).Error
err := db.DB("").Where("`ne_type` = ? and `status` = '1'", neType).Find(&k).Error
if err != nil {
return nil
}

View File

@@ -1,23 +1,21 @@
package kpi_c_title
import "time"
const (
MAX_KPI_C_ID = 99
)
type KpiCTitle struct {
ID int `gorm:"column:id;primary_key;auto_increment" json:"id"`
NeType *string `gorm:"column:ne_type;default:NULL," json:"neType,omitempty"`
KpiID *string `gorm:"column:kpi_id;default:NULL," json:"kpiId,omitempty"`
Title *string `gorm:"column:title;default:NULL," json:"title,omitempty"`
Expression *string `gorm:"column:expression;default:NULL," json:"expression,omitempty"`
ExprAlias string `gorm:"-" json:"exprAlias"`
Status string `gorm:"column:status;default:'Active'" json:"status"`
Unit *string `gorm:"column:unit" json:"unit,omitempty"`
Description *string `gorm:"column:description;default:NULL," json:"description,omitempty"`
CreatedBy *string `gorm:"column:created_by;default:NULL," json:"createdBy,omitempty"`
UpdatedAt *time.Time `gorm:"column:updated_at;default:current_timestamp()," json:"updatedAt,omitempty"`
ID int `gorm:"column:id;primary_key;auto_increment" json:"id"`
NeType *string `gorm:"column:ne_type" json:"neType,omitempty"`
KpiID *string `gorm:"column:kpi_id" json:"kpiId,omitempty"`
Title *string `gorm:"column:title" json:"title,omitempty"`
Expression *string `gorm:"column:expression" json:"expression,omitempty"`
ExprAlias string `gorm:"-" json:"exprAlias"`
Status string `gorm:"column:status" json:"status"` // 0-Inactive/1-Active/2-Deleted
Unit *string `gorm:"column:unit" json:"unit,omitempty"`
Description *string `gorm:"column:description" json:"description,omitempty"`
CreatedBy *string `gorm:"column:created_by" json:"createdBy,omitempty"`
UpdatedAt int64 `gorm:"column:updated_at" json:"updatedAt,omitempty"`
}
type KpiCTitleQuery struct {

View File

@@ -13,18 +13,22 @@ import (
"be.ems/features/pm/kpi_c_report"
"be.ems/features/pm/kpi_c_title"
"be.ems/lib/config"
"be.ems/lib/core/ctx"
"be.ems/lib/dborm"
evaluate "be.ems/lib/eval"
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/database/db"
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/parse"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
"github.com/go-resty/resty/v2"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
"xorm.io/xorm"
)
type Response struct {
@@ -76,7 +80,7 @@ type KpiData struct {
StartTime string `json:"startTime" xorm:"start_time"`
EndTime string `json:"endTime" xorm:"end_time"`
Index int `json:"index" xorm:"index"`
Granularity int8 `json:"granularity" xorm:"granularity"`
Granularity int64 `json:"granularity" xorm:"granularity"`
KPIValues []KPIVal `json:"kpiValues" xorm:"json 'kpi_values'"`
//CreatedAt int64 `json:"createdAt" xorm:"created 'created_at'"`
CreatedAt int64 `json:"createdAt" xorm:"'created_at'"`
@@ -105,84 +109,6 @@ var (
CustomUriMeasureTask = config.UriPrefix + "/performanceManagement/{apiVersion}/measureTask/{netype}"
)
var xEngine *xorm.Engine
type DatabaseClient struct {
dbType string
dbUrl string
dbConnMaxLifetime time.Duration
dbMaxIdleConns int
dbMaxOpenConns int
IsShowSQL bool
XEngine *xorm.Engine
}
var DbClient DatabaseClient
func InitDbClient(dbType, dbUser, dbPassword, dbHost, dbPort, dbName, dbParam string) error {
DbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s",
dbUser, dbPassword, dbHost, dbPort, dbName, dbParam)
DbClient.dbType = dbType
DbClient.dbConnMaxLifetime = 0
DbClient.dbMaxIdleConns = 0
DbClient.dbMaxOpenConns = 0
if log.GetLevel() == log.LOG_TRACE {
DbClient.IsShowSQL = true
}
log.Debugf("dbType:%s dbUrl:%s:", dbType, DbClient.dbUrl)
var err error
DbClient.XEngine, err = xorm.NewEngine(DbClient.dbType, DbClient.dbUrl)
if err != nil {
log.Error("Failed to connet database:", err)
return err
}
DbClient.XEngine.SetConnMaxLifetime(DbClient.dbConnMaxLifetime)
DbClient.XEngine.SetMaxIdleConns(DbClient.dbMaxIdleConns)
DbClient.XEngine.SetMaxOpenConns(DbClient.dbMaxOpenConns)
DbClient.XEngine.DatabaseTZ = time.Local // 必须
DbClient.XEngine.TZLocation = time.Local // 必须
if DbClient.IsShowSQL {
DbClient.XEngine.ShowSQL(true)
}
xEngine = DbClient.XEngine
// exist, err := xEngine.IsTableExist("kpi_report")
// if err != nil {
// log.Error("Failed to IsTableExist:", err)
// return err
// }
// if exist {
// // 复制表结构到新表
// sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` AS SELECT * FROM kpi_report WHERE 1=0", "kpi_report_amf")
// _, err := xEngine.Exec(sql)
// if err != nil {
// log.Error("Failed to Exec:", err)
// return err
// }
// }
return nil
}
func XormConnectDatabase(dbType, dbUser, dbPassword, dbHost, dbPort, dbName string) (*xorm.Engine, error) {
sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbUser, dbPassword, dbHost, dbPort, dbName)
log.Debugf("dbType:%s Connect to:%s:******@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbType, dbUser, dbHost, dbPort, dbName)
var err error
xEngine, err = xorm.NewEngine(dbType, sqlStr) //1、Create xorm engine
if err != nil {
log.Error("Failed to connect database:", err)
return nil, err
}
if log.GetLevel() == log.LOG_TRACE {
xEngine.ShowSQL(true)
}
return xEngine, nil
}
func GetDateFromTimeString(fmtString string, timeString string) string {
t, _ := time.ParseInLocation(fmtString, timeString, time.Local)
return t.Format("2006-01-02")
@@ -197,6 +123,167 @@ func GetDateTimeFromTimeString(fmtString string, timeString string) string {
func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostKPIReportFromNF processing... ")
apiVer := ctx.GetParam(r, "apiVersion")
if apiVer != global.ApiVersionV1 {
log.Error("Uri api version is invalid. apiVersion:", apiVer)
services.ResponseNotFound404UriNotExist(w, r)
return
}
var kpiReport KpiReport
if err := ctx.ShouldBindJSON(r, &kpiReport); err != nil {
services.ResponseInternalServerError500ProcessError(w, err)
return
}
kpiIndexStr := ctx.GetParam(r, "index")
// insert kpi_report table, no session
saveKPIData(kpiReport, parse.Number(kpiIndexStr))
saveKPIDataC(kpiReport, parse.Number(kpiIndexStr))
services.ResponseStatusOK204NoContent(w)
}
// saveKPIData 存储KPI数据并推送到ws订阅组
func saveKPIData(kpiReport KpiReport, index int64) int64 {
timestamp := kpiReport.Timestamp
taskPeriod := kpiReport.Task.Period
taskNe := kpiReport.Task.NE
taskNeKPIs := kpiReport.Task.NE.KPIs
// 时间数据处理
receiverTime := date.ParseStrToDate(timestamp, date.YYYY_MM_DDTHH_MM_SSZ)
startTime := date.ParseStrToDate(taskPeriod.StartTime, date.YYYY_MM_DDTHH_MM_SSZ)
endTime := date.ParseStrToDate(taskPeriod.EndTime, date.YYYY_MM_DDTHH_MM_SSZ)
granularity := parse.Number(endTime.Sub(startTime).Seconds())
// kpi data数据json
KpiValues := make([]map[string]any, 0)
for _, v := range taskNeKPIs {
KpiValues = append(KpiValues, map[string]any{
"kpiId": v.KPIID,
"value": v.Value,
"err": v.Err,
})
}
KpiValuesByte, _ := json.Marshal(KpiValues)
kpiData := neDataModel.KpiReport{
NeType: taskNe.NeType,
NeName: taskNe.NEName,
RmUid: taskNe.RmUID,
Date: date.ParseDateToStr(receiverTime, "2006-01-02"),
StartTime: date.ParseDateToStr(startTime, "15:04:05"),
EndTime: date.ParseDateToStr(endTime, "15:04:05"),
Index: index,
Granularity: granularity,
KpiValues: string(KpiValuesByte),
CreatedAt: receiverTime.UnixMilli(), // 时间戳毫秒实际记录到秒
}
insertId := neDataService.NewKpiReport.Insert(kpiData)
if insertId > 0 {
// 指标事件对象
kpiEvent := map[string]any{
"neType": kpiData.NeType,
"neName": kpiData.NeName,
"rmUID": kpiData.RmUid,
"startIndex": kpiData.Index,
"timeGroup": kpiData.CreatedAt,
// kip_id ...
}
for _, v := range taskNeKPIs {
kpiEvent[v.KPIID] = v.Value
}
// 发送到匹配的网元
neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid)
if neInfo.RmUID == kpiData.RmUid {
// 推送到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent)
if neInfo.NeType == "UPF" {
wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent)
}
}
}
return insertId
}
// saveKPIDataC 存储自定义KPI数据并推送到ws订阅组
func saveKPIDataC(kpiReport KpiReport, index int64) int64 {
timestamp := kpiReport.Timestamp
taskPeriod := kpiReport.Task.Period
taskNe := kpiReport.Task.NE
taskNeKPIs := kpiReport.Task.NE.KPIs
// 时间数据处理
receiverTime := date.ParseStrToDate(timestamp, date.YYYY_MM_DDTHH_MM_SSZ)
startTime := date.ParseStrToDate(taskPeriod.StartTime, date.YYYY_MM_DDTHH_MM_SSZ)
endTime := date.ParseStrToDate(taskPeriod.EndTime, date.YYYY_MM_DDTHH_MM_SSZ)
granularity := parse.Number(endTime.Sub(startTime).Seconds())
// kpi data数据
KpiValues := make([]map[string]any, 0)
kpiValMap := map[string]any{}
for _, v := range taskNeKPIs {
kpiValMap[v.KPIID] = v.Value
}
// 自定义kpiId数据
cTitles := kpi_c_title.GetActiveKPICList(taskNe.NeType)
for _, v := range cTitles {
item := map[string]any{
"kpiId": *v.KpiID,
"value": 0,
"err": "",
}
// 计算结果
result, err := evaluate.CalcExpr(*v.Expression, kpiValMap)
if err != nil {
item["value"] = 0
item["err"] = err.Error()
} else {
item["value"] = result
}
KpiValues = append(KpiValues, item)
}
KpiValuesByte, _ := json.Marshal(KpiValues)
kpiData := neDataModel.KpiCReport{
NeType: taskNe.NeType,
NeName: taskNe.NEName,
RmUid: taskNe.RmUID,
Date: date.ParseDateToStr(receiverTime, "2006-01-02"),
StartTime: date.ParseDateToStr(startTime, "15:04:05"),
EndTime: date.ParseDateToStr(endTime, "15:04:05"),
Index: index,
Granularity: granularity,
KpiValues: string(KpiValuesByte),
CreatedAt: receiverTime.UnixMilli(), // 时间戳毫秒实际记录到秒
}
insertId := neDataService.NewKpiCReport.Insert(kpiData)
if insertId > 0 {
// 指标事件对象
kpiEvent := map[string]any{
"neType": kpiData.NeType,
"neName": kpiData.NeName,
"rmUID": kpiData.RmUid,
"startIndex": kpiData.Index,
"timeGroup": kpiData.CreatedAt,
// kip_id ...
}
for _, v := range KpiValues {
kpiEvent[fmt.Sprint(v["kpiId"])] = v["value"]
}
// 发送到匹配的网元
neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid)
if neInfo.RmUID == kpiData.RmUid {
// 推送自定义KPI到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiEvent)
}
}
return insertId
}
// process KPI report post message from NFs 旧版
func PostKPIReportFromNFOld(w http.ResponseWriter, r *http.Request) {
log.Debug("PostKPIReportFromNF processing... ")
vars := mux.Vars(r)
apiVer := vars["apiVersion"]
if apiVer != global.ApiVersionV1 {
@@ -224,9 +311,9 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
endTime := global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime)
// get time granularity from startTime and endTime
seconds, _ := global.GetSecondDuration(startTime, endTime)
var granularity int8 = 60
var granularity int64 = 60
if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 {
granularity = int8(seconds)
granularity = int64(seconds)
}
// insert into new kpi_report_xxx table
@@ -269,8 +356,9 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
// insert kpi_report table, no session
tableName := "kpi_report_" + strings.ToLower(kpiReport.Task.NE.NeType)
affected, err := xEngine.Table(tableName).Insert(kpiData)
if err != nil && affected <= 0 {
// affected, err := xEngine.Table(tableName).Insert(kpiData)
tx := db.DB("").Table(tableName).Create(kpiData)
if tx.Error != nil && tx.RowsAffected <= 0 {
log.Errorf("Failed to insert %s:%v", tableName, err)
services.ResponseInternalServerError500ProcessError(w, err)
return
@@ -283,12 +371,12 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
Date: kpiData.Date,
StartTime: &kpiData.StartTime,
EndTime: &kpiData.EndTime,
Index: int16(kpiData.Index),
Index: int64(kpiData.Index),
Granularity: &kpiData.Granularity,
}
// 发送到匹配的网元
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(kpiData.RmUid)
neInfo := neService.NewNeInfo.FindByRmuid(kpiData.RmUid)
// custom kpi report to FE
kpiCEvent := map[string]any{
// kip_id ...
@@ -301,23 +389,23 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
"createdAt": kpiData.CreatedAt,
"granularity": kpiData.Granularity,
}
kpiCList := kpi_c_title.GetActiveKPICList(kpiData.NEType)
for _, k := range kpiCList {
result, err := evaluate.CalcExpr(*k.Expression, kpiValMap)
kpiCVal := new(kpi_c_report.KpiCVal)
kpiCVal.KPIID = *k.KpiID
if err != nil {
kpiCVal.Value = 0.0
kpiCVal.Err = err.Error()
} else {
kpiCVal.Value = result
}
// kpiCList := kpi_c_title.GetActiveKPICList(kpiData.NEType)
// for _, k := range kpiCList {
// result, err := evaluate.CalcExpr(*k.Expression, kpiValMap)
// kpiCVal := new(kpi_c_report.KpiCVal)
// kpiCVal.KPIID = *k.KpiID
// if err != nil {
// kpiCVal.Value = 0.0
// kpiCVal.Err = err.Error()
// } else {
// kpiCVal.Value = result
// }
report.KpiValues = append(report.KpiValues, *kpiCVal)
// report.KpiValues = append(report.KpiValues, *kpiCVal)
// set KPIC event kpiid and value
kpiCEvent[kpiCVal.KPIID] = kpiCVal.Value
}
// // set KPIC event kpiid and value
// kpiCEvent[kpiCVal.KPIID] = kpiCVal.Value
// }
// KPI自定义指标入库
kpi_c_report.InsertKpiCReport(kpiData.NEType, report)
@@ -335,11 +423,19 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
services.ResponseStatusOK204NoContent(w)
}
type MeasureTask struct {
Tasks []Task `json:"Tasks"`
NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */
// type MeasureTask struct {
// Tasks []Task `json:"Tasks"`
// NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */
// }
type ScheduleJ struct {
Type string `json:"Type"`
Days []int `json:"Days"`
}
type Period struct {
Start string `json:"Start"`
End string `json:"End"`
}
type Task struct {
Id int `json:"Id"`
@@ -347,9 +443,9 @@ type Task struct {
EndTime string `json:"EndTime"`
Schedule struct {
Type string `json:"Type"` // 计划类型Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行
Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31]
Periods []dborm.Period `json:"Periods"`
Type string `json:"Type"` // 计划类型Weekly/Monthly, 如果type为"", 则任务以StartTime和EndTime为条件进行统计, 否则以Shedule方式进行
Days []int `json:"Days"` // Weekly: [0,1,...,5,6] 星期日为0, Monthly: [1,2,3,...,30,31]
Periods []Period `json:"Periods"`
/*
Periods []struct {
Start string `json:"Start"` // 零点或者零点加测量粒度的整数倍
@@ -358,8 +454,8 @@ type Task struct {
*/
} `json:"Schedule"`
GranulOption string `json:"GranulOption"` // 测量粒度选项15M/30M/60M/24H
KPISet []dborm.KpiSetJ `json:"KPISet"`
GranulOption string `json:"GranulOption"` // 测量粒度选项15M/30M/60M/24H
KPISet []KpiSetJ `json:"KPISet"`
/*
KPISet []struct {
Code string `json:"Code"` // 统计编码 如SMFHA01
@@ -367,7 +463,31 @@ type Task struct {
} `json:"KPISet"`
*/
}
type KpiSetJ struct {
Code string `json:"Code"` // 统计编码 如SMFHA01
KPIs []string `json:"KPIs` // 指标项集合 ["SMF.AttCreatePduSession", "SMF.AttCreatePduSession._Dnn"]
}
type MeasureTask struct {
Id int `json:"id" xorm:"pk 'id' autoincr"`
NeType string `json:"neType" xorm:"ne_type"`
NeIds []string `json:"neIds" xorm:"ne_ids"`
KpiSet []KpiSetJ `json:"KPISet" xorm:"kpi_set"`
StartTime string `json:"startTime" xorm:"start_time"`
EndTime string `json:"endTime" xorm:"end_time"`
Periods []Period `json:"Periods" xorm:"periods`
Schedule []ScheduleJ `json:"Schedule" xorm:"schedule"`
GranulOption string `json:"granulOption" xorm:"granul_option"`
Status string `json:"status" xorm:"status"`
AccountID string `json:"accountId" xorm:"account_id"`
Comment string `json:"comment" xorm:"comment"`
CreateTime string `json:"createTime" xorm:"create_time"`
UpdateTime string `json:"updateTime" xorm:"update_time"`
DeleteTime string `json:"deleteTime xorm:"delete_time"`
Tasks []Task `json:"Tasks"`
NotifyUrl string `json:"NotifyUrl"` /* "http://xEngine.xEngine.xEngine.x:xxxx/api/rest/performanceManagement/v1/elementType/smf/objectType/measureReport */
}
type MeasureReport struct {
Id int `json:"Id"`
TimeStamp string `json:"TimeStamp"`
@@ -394,6 +514,33 @@ type MeasureReport struct {
} `json:"Report"`
}
func GetMeasureTask(taskId int) (*MeasureTask, error) {
log.Debug("GetMeasureTask processing... ")
measureTask := new(MeasureTask)
tx := db.DB("").Table("measure_task").Where("id=?", taskId).Find(measureTask)
if tx.Error != nil {
log.Error("Failed to get table measure_task from database:", tx.Error)
return nil, tx.Error
}
log.Debug("Measure Task:", measureTask)
return measureTask, nil
}
func XormGetActiveMeasureTask(measureTasks *[]MeasureTask) (*[]MeasureTask, error) {
log.Debug("XormGetActiveMeasureTask processing... ")
tx := db.DB("").Table("measure_task").Where("status='Active'").Find(measureTasks)
if tx.Error != nil {
log.Error("Failed to get table measure_task:", tx.Error)
return nil, tx.Error
}
log.Debug("measureTasks:", measureTasks)
return measureTasks, nil
}
type MeasureData struct {
// Id int `json:"id" xorm:"pk 'id' autoincr"`
Id int `json:"id" xorm:"-"`
@@ -438,8 +585,6 @@ func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
_ = json.Unmarshal(body, &measureReport)
log.Debug("measureReport:", measureReport)
session := xEngine.NewSession()
defer session.Close()
measureData := new(MeasureData)
layout := global.DateTime
measureData.Date = GetDateFromTimeString(layout, measureReport.Report.Period.StartTime)
@@ -469,8 +614,8 @@ func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
measureData.Value = v.Value
log.Debug("measureData:", measureData)
affected, err := session.Insert(measureData)
if err != nil && affected <= 0 {
err := db.DB("").Create(measureData).Error
if err != nil {
log.Error("Failed to insert measure_data:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
@@ -480,8 +625,8 @@ func PostMeasureReportFromNF(w http.ResponseWriter, r *http.Request) {
measureData.Value = 0
log.Debug("measureData:", measureData)
affected, err := session.Insert(measureData)
if err != nil && affected <= 0 {
err := db.DB("").Create(measureData).Error
if err != nil {
log.Error("Failed to insert measure_data:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
@@ -514,7 +659,7 @@ func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
measureTask.Tasks = make([]Task, 1)
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
task, err := GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
@@ -634,7 +779,7 @@ func PostMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusActive
taskInfo.CreateTime = time.Now().Format(time.DateTime)
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
@@ -688,7 +833,7 @@ func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
respMsg := make(map[string]interface{})
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
task, err := GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
@@ -711,7 +856,7 @@ func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("neIds:", task.NeIds)
if len(task.NeIds) == 0 {
log.Warn("Not found target NE in the measure task")
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
@@ -745,7 +890,7 @@ func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
if err != nil {
// to avoid can't delete the task for abnormal NF
log.Error("Failed to resty delete:", err)
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
@@ -762,7 +907,7 @@ func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Info("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
@@ -781,7 +926,7 @@ func DeleteMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
respMsg["error"] = body
}
} else {
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusDeleted
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
@@ -819,7 +964,7 @@ func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
respMsg := make(map[string]interface{})
for _, taskId := range taskIds {
id, _ := strconv.Atoi(taskId)
task, err := dborm.GetMeasureTask(id)
task, err := GetMeasureTask(id)
if err != nil {
log.Error("Failed to connect database: ", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
@@ -842,7 +987,7 @@ func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
}
if len(task.NeIds) == 0 {
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
@@ -865,7 +1010,7 @@ func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
return
}
if neInfo == nil {
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
@@ -897,7 +1042,7 @@ func PatchMeasureTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
taskInfo := new(dborm.MeasureTask)
taskInfo := new(MeasureTask)
taskInfo.Status = dborm.MeasureTaskStatusInactive
affected, err := dborm.XormUpdateTableById(id, dborm.TableNameMeasureTask, taskInfo)
if err != nil {
@@ -975,15 +1120,12 @@ func PostMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
_ = json.Unmarshal(body, &measurement)
log.Debug("measurement:", measurement)
session := dborm.DbClient.XEngine.NewSession()
defer session.Close()
// layout := global.DateTime
layout := time.RFC3339
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
affected, err := session.Table("nbi_pm").Insert(measurement)
if err != nil && affected <= 0 {
tx := db.DB("").Table("nbi_pm").Create(measurement)
if tx.Error != nil && tx.RowsAffected <= 0 {
log.Error("Failed to insert nbi_pm:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return
@@ -1059,14 +1201,11 @@ func GetMeasurementFromNF(w http.ResponseWriter, r *http.Request) {
_ = json.Unmarshal(response.Body(), &measurement)
log.Debug("measurement:", measurement)
session := dborm.DbClient.XEngine.NewSession()
defer session.Close()
layout := time.RFC3339
measurement.Date = GetDateFromTimeString(layout, measurement.StartTime)
measurement.StartTime = GetDateTimeFromTimeString(layout, measurement.StartTime)
affected, err := session.Table("nbi_pm").Insert(measurement)
if err != nil && affected <= 0 {
tx := db.DB("").Table("nbi_pm").Create(measurement)
if tx.Error != nil && tx.RowsAffected <= 0 {
log.Error("Failed to insert nbi_pm:", err)
services.ResponseInternalServerError500DatabaseOperationFailed(w)
return

View File

@@ -1,11 +1,7 @@
package sm
import (
"database/sql"
"fmt"
"net/http"
"os"
"os/exec"
"time"
"be.ems/lib/config"
@@ -47,99 +43,3 @@ func GetOMCLocalTime(w http.ResponseWriter, r *http.Request) {
}
services.ResponseWithJson(w, http.StatusOK, response)
}
var dbConfig = config.GetYamlConfig().Database
func DatabaseWhoreBackup() {
// MySQL数据库连接信息
sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbConfig.User, dbConfig.Password, dbConfig.Host, dbConfig.Port, dbConfig.Name)
db, err := sql.Open("mysql", sqlStr)
if err != nil {
log.Error("Failed to connect to database:", err)
return
}
defer db.Close()
// 备份SQL文件路径
backupFile := dbConfig.Backup + "/" + "whore_backup_" + dbConfig.Name + ".sql"
// 执行mysqldump命令进行备份
cmd := exec.Command("mysqldump", "-u", dbConfig.User, "-p"+dbConfig.Password, "-h", dbConfig.Host, dbConfig.Name)
output, err := cmd.Output()
if err != nil {
log.Error("Failed to execute mysqldump command:", err)
return
}
// 将备份结果写入SQL文件
file, err := os.Create(backupFile)
if err != nil {
log.Error("Failed to create backup file:", err)
return
}
defer file.Close()
_, err = file.Write(output)
if err != nil {
log.Error("Failed to write backup file:", err)
return
}
log.Info("Backup completed successfully.")
}
func DatabaseIncrementalBackup() {
// MySQL数据库连接信息
sqlStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&collation=utf8mb4_general_ci&parseTime=true&loc=Local",
dbConfig.User, dbConfig.Password, dbConfig.Host, dbConfig.Port, dbConfig.Name)
db, err := sql.Open("mysql", sqlStr)
if err != nil {
log.Error("Failed to connect to database:", err)
return
}
defer db.Close()
// 备份SQL文件路径
backupFile := dbConfig.Backup + "/" + "incremental_backup_" + dbConfig.Name + ".sql"
// 上次备份的时间点
lastBackupTime := time.Date(2022, time.January, 1, 0, 0, 0, 0, time.Local)
// 构建增量备份SQL语句
query := fmt.Sprintf("SELECT * FROM table WHERE modified_at > '%s'", lastBackupTime.Format("2006-01-02 15:04:05"))
// 执行查询
rows, err := db.Query(query)
if err != nil {
log.Error("Failed to execute query:", err)
return
}
defer rows.Close()
// 创建增量备份SQL文件
file, err := os.Create(backupFile)
if err != nil {
log.Error("Failed to create backup file:", err)
return
}
defer file.Close()
// 将查询结果写入SQL文件
for rows.Next() {
var data string
err := rows.Scan(&data)
if err != nil {
log.Error("Failed to scan row:", err)
return
}
_, err = file.WriteString(data + "\n")
if err != nil {
log.Error("Failed to write backup file:", err)
return
}
}
log.Info("Incremental backup completed successfully.")
}

View File

@@ -9,17 +9,16 @@ import (
"strings"
"time"
"github.com/shirou/gopsutil/v4/net"
"github.com/go-resty/resty/v2"
"github.com/gorilla/mux"
"github.com/shirou/gopsutil/v4/net"
"be.ems/lib/config"
"be.ems/lib/dborm"
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
tokenConst "be.ems/src/framework/constants/token"
"be.ems/src/framework/constants"
)
type CpuUsage struct {
@@ -246,7 +245,7 @@ func GetOneLicenseInfoFromNF(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -362,7 +361,7 @@ func GetAllLicenseInfoFromNF(w http.ResponseWriter, r *http.Request) {
resp, err := client.SetTimeout(time.Duration(1 * time.Second)).R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -488,7 +487,7 @@ func GetOneSysinfoFromNF(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -633,7 +632,7 @@ func GetAllSysinfoFromNF(w http.ResponseWriter, r *http.Request) {
resp, err := client.SetTimeout(time.Duration(1 * time.Second)).R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -756,7 +755,7 @@ func GetStateFromNF(w http.ResponseWriter, r *http.Request) {
result["ipAddress"] = ne.Ip
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
//SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -846,136 +845,6 @@ func GetStateFromNF(w http.ResponseWriter, r *http.Request) {
// services.ResponseWithJson(w, http.StatusOK, response)
}
// GetStateFromNF 旧函数
// Get system state from NF/NFs
func GetStateFromNFOld(w http.ResponseWriter, r *http.Request) {
log.Debug("GetStateFromNF processing... ")
data := make([]map[string]interface{}, 0)
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
var neList []dborm.NeInfo
if neType == "" {
services.ResponseNotFound404UriNotExist(w, r)
return
}
// token, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
// log.Debug("AccessToken:", token)
switch strings.ToLower(neType) {
case "all":
// query all NFs
// create rest client
restHostPort := config.GetOMCHostUrl()
getNeInfoPattern := fmt.Sprintf(config.DefaultUriPrefix+"/databaseManagement/v1/elementType/%s/objectType/ne_info",
config.GetYamlConfig().Database.Name)
getNeInfoURI := restHostPort + getNeInfoPattern + "?WHERE=status+in+('0','3')"
log.Debug("getNeInfoPattern:", getNeInfoPattern)
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
// SetHeaders(map[string]string{"AccessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Get(getNeInfoURI)
if err != nil {
log.Error("Get ne_info from DB is failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
neList, _ = dborm.XormParseResult(resp.Body())
default:
restHostPort := config.GetOMCHostUrl()
getNeInfoPattern := fmt.Sprintf(config.DefaultUriPrefix+"/databaseManagement/v1/elementType/%s/objectType/ne_info",
config.GetYamlConfig().Database.Name)
getNeInfoURI := restHostPort + getNeInfoPattern
neId := services.GetUriParamString(r, "ne_id", ",", true, true)
if neId == "" {
getNeInfoURI = getNeInfoURI + fmt.Sprintf("?WHERE=status+in+('0','3')+and+ne_type='%s'", neType)
} else {
getNeInfoURI = getNeInfoURI + fmt.Sprintf("?WHERE=status+in+('0','3')+and+ne_type='%v'+and+ne_id+in+%v", neType, neId)
}
log.Debug("getNeInfoURI:", getNeInfoURI)
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Get(getNeInfoURI)
if err != nil {
log.Error("Get ne_info from DB is failed:", err)
services.ResponseInternalServerError500NFConnectRefused(w)
return
}
neList, _ = dborm.XormParseResult(resp.Body())
}
omcNeTypeLower := "omc"
if config.GetYamlConfig().OMC.NeType != "" {
omcNeTypeLower = strings.ToLower(config.GetYamlConfig().OMC.NeType)
}
for _, ne := range neList {
result := make(map[string]interface{})
log.Debugf("r.RemoteAddr: %s omcNeTypeLower: %s", r.RemoteAddr, omcNeTypeLower)
log.Trace("ne: ", ne)
//if strings.ToLower(ne.NeType) != omcNeTypeLower || !strings.Contains(r.RemoteAddr, ne.Ip) {
if strings.ToLower(ne.NeType) != omcNeTypeLower {
hostUri := fmt.Sprintf("http://%s:%v", ne.Ip, ne.Port)
requestURI2NF := fmt.Sprintf("%s/api/rest/systemManagement/v1/elementType/%s/objectType/systemState",
hostUri, ne.NeType)
log.Debug("requestURI2NF:", requestURI2NF)
result["ipAddress"] = ne.Ip
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Get(requestURI2NF)
if err != nil {
log.Error("Get system state from NF is failed:", err)
errorMessage := services.ErrorMessage{
ErrorCode: "1", ErrorInfo: "Internal server error, NF connnect refused",
}
result["error"] = errorMessage
SN, Version, _ := dborm.XormGetNEStateInfo(ne.NeType, ne.NeId)
result["serialNum"] = SN
result["version"] = Version
} else {
systemState := make(map[string]interface{})
_ = json.Unmarshal(resp.Body(), &systemState)
result["systemState"] = systemState
}
} else {
result["ipAddress"] = ne.Ip
emsState := GetEMSState(ne.Ip)
result["systemState"] = emsState
}
neItem := strings.ToUpper(ne.NeType) + "/" + ne.NeId
mapState := make(map[string]interface{})
mapState[neItem] = result
data = append(data, mapState)
log.Trace("data:", data)
}
var response Response
response.Data = data
services.ResponseWithJson(w, http.StatusOK, response)
}
func GetEMSState(ip string) *SysState {
log.Debug("GetEMSState processing... ")

View File

@@ -126,10 +126,10 @@ func GetSysInfo(sysInfo *SysInfo) error {
return nil
}
func getProcess() process.Process {
func getProcess() *process.Process {
checkPid := os.Getpid()
ret, _ := process.NewProcess(int32(checkPid))
return *ret
return ret
}
func GetSystemCpuInfo() {

View File

@@ -125,10 +125,10 @@ func GetSysInfo(sysInfo *SysInfo) error {
return nil
}
func getProcess() process.Process {
func getProcess() *process.Process {
checkPid := os.Getpid()
ret, _ := process.NewProcess(int32(checkPid))
return *ret
return ret
}
func GetSystemCpuInfo() {

View File

@@ -2,29 +2,23 @@ package trace
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/go-resty/resty/v2"
"github.com/gorilla/mux"
"be.ems/lib/config"
"be.ems/lib/dborm"
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/run"
"be.ems/lib/services"
)
var (
UriTraceTaskV1 = config.DefaultUriPrefix + "/traceManagement/v1/subscriptions"
UriTraceTask = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/subscriptions"
UriTraceRawMsg = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/rawMessage/{id}"
UriTraceDecMsg = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/decMessage/{id}" // decode message api
CustomUriTraceTaskV1 = config.UriPrefix + "/traceManagement/v1/subscriptions"
CustomUriTraceTask = config.UriPrefix + "/traceManagement/{apiVersion}/subscriptions"
@@ -387,51 +381,3 @@ func GetRawMessage(w http.ResponseWriter, r *http.Request) {
log.Debug("GetRawMessage processing... ")
}
func ParseRawMsg2Html(w http.ResponseWriter, r *http.Request) {
log.Debug("ParseRawMsg2Html processing... ")
vars := mux.Vars(r)
idStr := vars["id"]
id, _ := strconv.Atoi(idStr)
traceData, err := dborm.XormGetTraceData(id)
if err != nil {
log.Error("Failed to dborm.XormGetTraceRawMsg:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Trace("traceData:", traceData)
filePath := traceData.DecMsg
if traceData.DecMsg == "" {
htmlFile := fmt.Sprintf("traceDecMessage-%d-%d.html", traceData.TaskID, traceData.ID)
filePath = config.GetYamlConfig().OMC.FrontTraceDir + "/" + htmlFile
command := fmt.Sprintf("/usr/local/omc/bin/data2html -f %s -t %d -i N%d -d %x", filePath, traceData.Timestamp, traceData.IfType, traceData.RawMsg)
out, err := run.ExecCmd(command, "/")
log.Tracef("Exec output: %v", string(out))
if err != nil {
log.Errorf("Faile to ipdate2html:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
exist, err := global.FilePathExists(filePath)
if err != nil {
log.Errorf("Failed to stat:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
if !exist {
err = errors.New(string(strings.ReplaceAll(string(out), "\n", "")))
services.ResponseInternalServerError500ProcessError(w, err)
return
}
traceData.DecMsg = filePath
_, err = dborm.XormUpdateTraceData(id, traceData)
if err != nil {
log.Errorf("Faile to XormUpdateTraceData:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
services.ResponseHtmlContent(w, http.StatusOK, filePath)
}

View File

@@ -14,10 +14,11 @@ import (
"be.ems/lib/global"
"be.ems/lib/log"
"be.ems/lib/services"
tokenConst "be.ems/src/framework/constants/token"
"be.ems/src/framework/constants"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
"github.com/go-resty/resty/v2"
"github.com/gorilla/mux"
)
@@ -164,7 +165,7 @@ func GetAvailableAMFsFromNSSF(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -228,7 +229,7 @@ func GetSubscriptionsFromNSSF(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -257,7 +258,7 @@ func GetUEInfoFromNF(w http.ResponseWriter, r *http.Request) {
return
}
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neType, neId)
neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neType, neId)
var response services.MapResponse
if neInfo.NeId == neId && neInfo.NeId != "" {
@@ -336,7 +337,7 @@ func PostPCFUserInfo(w http.ResponseWriter, r *http.Request) {
client.SetTimeout(1 * time.Minute)
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
//SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -409,7 +410,7 @@ func PutPCFUserInfo(w http.ResponseWriter, r *http.Request) {
client.SetTimeout(1 * time.Minute)
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
//SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -475,7 +476,7 @@ func DeletePCFUserInfo(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
//SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -541,7 +542,7 @@ func GetUENumFromNF(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -605,7 +606,7 @@ func GetNBInfoFromNF(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -651,7 +652,7 @@ func PostNBInfoFromNF(w http.ResponseWriter, r *http.Request) {
return
}
neInfo := neService.NewNeInfo.SelectNeInfoByRmuid(body.RmUID)
neInfo := neService.NewNeInfo.FindByRmuid(body.RmUID)
if neInfo.RmUID != body.RmUID {
services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("inconsistent network element rmUID"))
return
@@ -739,7 +740,7 @@ func GetNBInfoAllFromNF(w http.ResponseWriter, r *http.Request) {
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
//SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
@@ -812,7 +813,7 @@ func GetUEInfoFileExportNF(w http.ResponseWriter, r *http.Request) {
client.SetTimeout(3 * time.Minute)
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
SetHeaders(map[string]string{constants.HEADER_KEY: r.Header.Get(constants.HEADER_KEY)}).
// SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).