From d5f7a2077e090d3171cd3b619562c6df8e734286 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Wed, 7 Feb 2024 12:31:25 +0800 Subject: [PATCH] =?UTF-8?q?marge:=20=E5=90=88=E5=B9=B6=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 + features/cdr/cdrevent.go | 10 +- features/cm/ne.go | 20 +- features/event/event.go | 69 ++++++ features/pm/performance.go | 43 +++- features/state/state_linux.go | 8 +- features/state/state_windows.go | 8 +- features/ue/ue.go | 83 ++++++- lib/dborm/dborm.go | 41 +++- lib/global/kits.go | 21 ++ lib/routes/routes.go | 3 + restagent/etc/restconf.yaml | 4 +- restagent/makefile | 2 +- restagent/restagent.go | 91 ++++++-- src/app.go | 8 +- src/framework/constants/cachekey/cachekey.go | 6 + .../middleware/collectlogs/operate_log.go | 2 +- src/framework/middleware/pre_authorize.go | 9 +- src/framework/utils/ctx/ctx.go | 24 +- .../getStateFromNE/getStateFromNE.go | 2 +- src/modules/monitor/controller/sys_cache.go | 33 +-- src/modules/network_data/controller/amf.go | 51 +++++ src/modules/network_data/controller/ims.go | 51 +++++ .../network_data/controller/perf_kpi.go | 84 +++++++ src/modules/network_data/controller/upf.go | 54 +++++ src/modules/network_data/model/cdr_event.go | 28 +++ src/modules/network_data/model/perf_kpi.go | 23 ++ src/modules/network_data/model/ue_event.go | 28 +++ src/modules/network_data/network_data.go | 58 +++++ .../network_data/repository/cdr_event.go | 9 + .../network_data/repository/cdr_event.impl.go | 136 +++++++++++ .../network_data/repository/perf_kpi.go | 15 ++ .../network_data/repository/perf_kpi.impl.go | 133 +++++++++++ .../network_data/repository/ue_event.go | 9 + .../network_data/repository/ue_event.impl.go | 133 +++++++++++ src/modules/network_data/service/cdr_event.go | 9 + .../network_data/service/cdr_event.impl.go | 22 ++ src/modules/network_data/service/perf_kpi.go | 15 ++ .../network_data/service/perf_kpi.impl.go | 76 ++++++ src/modules/network_data/service/ue_event.go | 9 + .../network_data/service/ue_event.impl.go | 22 ++ .../network_element/controller/ne_info.go | 21 +- .../network_element/network_element.go | 26 +-- .../network_element/service/ne_direct_link.go | 1 + .../network_element/service/ne_info.go | 6 + .../network_element/service/ne_info.impl.go | 50 +++- src/modules/system/controller/sys_menu.go | 12 +- src/modules/system/service/sys_menu.go | 2 +- src/modules/ws/controller/ws.go | 104 +++++++++ src/modules/ws/model/net_connect.go | 20 ++ src/modules/ws/model/ps_process.go | 38 +++ src/modules/ws/model/ws.go | 21 ++ src/modules/ws/processor/cdr_connect.go | 29 +++ src/modules/ws/processor/ne_state.go | 57 +++++ src/modules/ws/processor/net_connect.go | 61 +++++ src/modules/ws/processor/ps_process.go | 142 ++++++++++++ src/modules/ws/processor/ue_connect.go | 29 +++ src/modules/ws/processor/upf_total_flow.go | 44 ++++ src/modules/ws/service/ws.go | 20 ++ src/modules/ws/service/ws.impl.go | 216 ++++++++++++++++++ src/modules/ws/service/ws_receive.go | 9 + src/modules/ws/service/ws_receive.impl.go | 56 +++++ src/modules/ws/service/ws_send.go | 10 + src/modules/ws/service/ws_send.impl.go | 85 +++++++ src/modules/ws/ws.go | 30 +++ 65 files changed, 2445 insertions(+), 99 deletions(-) create mode 100644 features/event/event.go create mode 100644 src/modules/network_data/controller/amf.go create mode 100644 src/modules/network_data/controller/ims.go create mode 100644 src/modules/network_data/controller/perf_kpi.go create mode 100644 src/modules/network_data/controller/upf.go create mode 100644 src/modules/network_data/model/cdr_event.go create mode 100644 src/modules/network_data/model/perf_kpi.go create mode 100644 src/modules/network_data/model/ue_event.go create mode 100644 src/modules/network_data/network_data.go create mode 100644 src/modules/network_data/repository/cdr_event.go create mode 100644 src/modules/network_data/repository/cdr_event.impl.go create mode 100644 src/modules/network_data/repository/perf_kpi.go create mode 100644 src/modules/network_data/repository/perf_kpi.impl.go create mode 100644 src/modules/network_data/repository/ue_event.go create mode 100644 src/modules/network_data/repository/ue_event.impl.go create mode 100644 src/modules/network_data/service/cdr_event.go create mode 100644 src/modules/network_data/service/cdr_event.impl.go create mode 100644 src/modules/network_data/service/perf_kpi.go create mode 100644 src/modules/network_data/service/perf_kpi.impl.go create mode 100644 src/modules/network_data/service/ue_event.go create mode 100644 src/modules/network_data/service/ue_event.impl.go create mode 100644 src/modules/ws/controller/ws.go create mode 100644 src/modules/ws/model/net_connect.go create mode 100644 src/modules/ws/model/ps_process.go create mode 100644 src/modules/ws/model/ws.go create mode 100644 src/modules/ws/processor/cdr_connect.go create mode 100644 src/modules/ws/processor/ne_state.go create mode 100644 src/modules/ws/processor/net_connect.go create mode 100644 src/modules/ws/processor/ps_process.go create mode 100644 src/modules/ws/processor/ue_connect.go create mode 100644 src/modules/ws/processor/upf_total_flow.go create mode 100644 src/modules/ws/service/ws.go create mode 100644 src/modules/ws/service/ws.impl.go create mode 100644 src/modules/ws/service/ws_receive.go create mode 100644 src/modules/ws/service/ws_receive.impl.go create mode 100644 src/modules/ws/service/ws_send.go create mode 100644 src/modules/ws/service/ws_send.impl.go create mode 100644 src/modules/ws/ws.go diff --git a/.gitignore b/.gitignore index 252eeb5..b35b094 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ restagent/log/ restagent/upload/ restagent/software/ restagent/database/ +restagent/license/ restagent/restagent sshsvc/sshsvc @@ -36,6 +37,7 @@ captrace/log/ tools/loadmconf/loadmconf tools/loadpconf/loadpconf +reference vendor # Built Visual Studio Code Extensions @@ -43,5 +45,6 @@ vendor *.log *.log-* *.bak +*.exe __debug_bin*.exe diff --git a/features/cdr/cdrevent.go b/features/cdr/cdrevent.go index c30c98f..50599f7 100644 --- a/features/cdr/cdrevent.go +++ b/features/cdr/cdrevent.go @@ -10,6 +10,7 @@ import ( "ems.agt/lib/log" "ems.agt/lib/services" "ems.agt/restagent/config" + wsService "ems.agt/src/modules/ws/service" ) var ( @@ -41,7 +42,7 @@ func PostCDREventFromNF(w http.ResponseWriter, r *http.Request) { cdrEvent := new(CDREvent) err = json.Unmarshal(body, &cdrEvent) - if err != nil { + if cdrEvent.NeType == "" || err != nil { log.Error("Failed to Unmarshal cdrEvent:", err) services.ResponseInternalServerError500ProcessError(w, err) return @@ -55,5 +56,12 @@ func PostCDREventFromNF(w http.ResponseWriter, r *http.Request) { return } + // 推送到ws订阅组 + if v, ok := cdrEvent.CDR["recordType"]; ok { + if v == "MOC" || v == "MTSM" { + wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_IMS_CDR, cdrEvent) + } + } + services.ResponseStatusOK204NoContent(w) } diff --git a/features/cm/ne.go b/features/cm/ne.go index 458850f..16e96f2 100644 --- a/features/cm/ne.go +++ b/features/cm/ne.go @@ -16,6 +16,7 @@ import ( "ems.agt/lib/services" "ems.agt/restagent/config" tokenConst "ems.agt/src/framework/constants/token" + neService "ems.agt/src/modules/network_element/service" "github.com/go-resty/resty/v2" "github.com/gorilla/mux" @@ -136,7 +137,7 @@ func PostNeInfo(w http.ResponseWriter, r *http.Request) { services.ResponseNotFound404UriNotExist(w, r) return } - log.Debug("Body:", string(body)) + log.Trace("Body:", string(body)) neInfo := new(dborm.NeInfo) err = json.Unmarshal(body, neInfo) @@ -158,6 +159,9 @@ func PostNeInfo(w http.ResponseWriter, r *http.Request) { return } + // 刷新缓存,不存在结构体网元Id空字符串 + neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId) + mapRow := make(map[string]interface{}) row := map[string]interface{}{"affectedRows": affected} mapRow["data"] = row @@ -205,6 +209,10 @@ func PostNeInfo(w http.ResponseWriter, r *http.Request) { } else if affected <= 0 { log.Infof("Not record affected to insert ne_info") } + + // 刷新缓存,不存在结构体网元Id空字符串 + neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId) + services.ResponseStatusOK204NoContent(w) return default: @@ -261,6 +269,9 @@ func PutNeInfo(w http.ResponseWriter, r *http.Request) { return } + // 刷新缓存,不存在结构体网元Id空字符串 + neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId) + mapRow := make(map[string]interface{}) row := map[string]interface{}{"affectedRows": affected} mapRow["data"] = row @@ -309,6 +320,10 @@ func PutNeInfo(w http.ResponseWriter, r *http.Request) { } else if affected <= 0 { log.Infof("Not record affected to insert ne_info") } + + // 刷新缓存,不存在结构体网元Id空字符串 + neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId) + services.ResponseStatusOK204NoContent(w) return default: @@ -390,6 +405,9 @@ func DeleteNeInfo(w http.ResponseWriter, r *http.Request) { return } + // 刷新缓存,不存在结构体网元Id空字符串 + neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId) + mapRow := make(map[string]interface{}) row := map[string]interface{}{"affectedRows": affected} mapRow["data"] = row diff --git a/features/event/event.go b/features/event/event.go new file mode 100644 index 0000000..fbfbf1f --- /dev/null +++ b/features/event/event.go @@ -0,0 +1,69 @@ +package event + +import ( + "encoding/json" + "io" + "time" + + "ems.agt/lib/dborm" + "ems.agt/lib/global" + "ems.agt/lib/log" + "ems.agt/lib/services" + wsService "ems.agt/src/modules/ws/service" + "github.com/gin-gonic/gin" +) + +var ( + UriUEEvent = "/upload-ue/v1/:eventType" +) + +type UEEvent struct { + NeType string `json:"neType" xorm:"ne_type"` + NeName string `json:"neName" xorm:"ne_name"` + RmUID string `json:"rmUID" xorm:"rm_uid"` + Timestamp int `json:"timestamp" xorm:"timestamp"` + EventType string `json:"eventType" xorm:"event_type"` + EventJson map[string]any `json:"eventJSON" xorm:"event_json"` +} + +func PostUEEventFromAMF(c *gin.Context) { + log.Info("PostUEEventFromAMF processing... ") + + body, err := io.ReadAll(io.LimitReader(c.Request.Body, global.RequestBodyMaxLen)) + if err != nil { + log.Error("Failed to io.ReadAll: ", err) + services.ResponseNotFound404UriNotExist(c.Writer, c.Request) + return + } + + //vars := mux.Vars(c.Request) + eventType, ok := c.Params.Get("eventType") + if !ok || eventType == "" { + log.Error("eventType is empty") + services.ResponseNotFound404UriNotExist(c.Writer, c.Request) + return + } + ueEvent := new(UEEvent) + err = json.Unmarshal(body, &ueEvent.EventJson) + if err != nil { + log.Error("Failed to Unmarshal ueEvent:", err) + services.ResponseInternalServerError500ProcessError(c.Writer, err) + return + } + ueEvent.NeType = "AMF" + ueEvent.Timestamp = int(time.Now().Unix()) + ueEvent.EventType = eventType + log.Trace("ueEvent:", ueEvent) + + affected, err := dborm.XormInsertTableOne("ue_event", ueEvent) + if err != nil && affected <= 0 { + log.Error("Failed to insert ue_event:", err) + services.ResponseInternalServerError500ProcessError(c.Writer, err) + return + } + + // 推送到ws订阅组 + wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_AMF_UE, ueEvent) + + services.ResponseStatusOK204NoContent(c.Writer) +} diff --git a/features/pm/performance.go b/features/pm/performance.go index 57ae178..efde586 100644 --- a/features/pm/performance.go +++ b/features/pm/performance.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "strconv" "time" @@ -16,6 +17,7 @@ import ( "ems.agt/restagent/config" "xorm.io/xorm" + wsService "ems.agt/src/modules/ws/service" "github.com/go-resty/resty/v2" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/mux" @@ -47,17 +49,18 @@ type KpiReport struct { type GoldKpi struct { // Id int `json:"-" xorm:"pk 'id' autoincr"` - Date string `json:"date" xorm:"date"` - Index int `json:"index"` - StartTime string `json:"startTime"` - EndTime string `json:"endTime"` - NEName string `json:"neName" xorm:"ne_name"` - RmUid string `json:"rmUid" xorm:"rm_uid"` - NEType string `json:"neType" xorm:"ne_type"` - KpiId string `json:"kpiId" xorm:"kpi_id"` - Value int `json:"value"` - Error string `json:"error"` - Timestamp string `json:"timestamp"` + Date string `json:"date" xorm:"date"` + Index int `json:"index"` + Granularity int8 `json:"granularity"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + NEName string `json:"neName" xorm:"ne_name"` + RmUid string `json:"rmUid" xorm:"rm_uid"` + NEType string `json:"neType" xorm:"ne_type"` + KpiId string `json:"kpiId" xorm:"kpi_id"` + Value int `json:"value"` + Error string `json:"error"` + Timestamp string `json:"timestamp"` } var ( @@ -182,11 +185,26 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { goldKpi.Index, _ = strconv.Atoi(vars["index"]) goldKpi.StartTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime) goldKpi.EndTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime) + // get time granularity from startTime and endTime + seconds, _ := global.GetSecondDuration(goldKpi.StartTime, goldKpi.EndTime) + goldKpi.Granularity = 60 + if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 { + goldKpi.Granularity = int8(seconds) + } goldKpi.NEName = kpiReport.Task.NE.NEName goldKpi.RmUid = kpiReport.Task.NE.RmUID goldKpi.NEType = kpiReport.Task.NE.NeType goldKpi.Timestamp = global.GetFmtTimeString(layout, kpiReport.Timestamp, time.DateTime) + // 黄金指标事件对象 + kpiEvent := map[string]any{ + // kip_id ... + "neType": goldKpi.NEType, + "neName": goldKpi.NEName, + "startIndex": goldKpi.Index, + "timeGroup": goldKpi.StartTime, + } for _, k := range kpiReport.Task.NE.KPIs { + kpiEvent[k.KPIID] = k.Value // kip_id goldKpi.KpiId = k.KPIID goldKpi.Value = k.Value goldKpi.Error = k.Err @@ -224,6 +242,9 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { } } + // 推送到ws订阅组 + wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent) + services.ResponseStatusOK200Null(w) } diff --git a/features/state/state_linux.go b/features/state/state_linux.go index 2c9a330..7722431 100644 --- a/features/state/state_linux.go +++ b/features/state/state_linux.go @@ -12,10 +12,10 @@ import ( "time" "ems.agt/lib/log" - "github.com/shirou/gopsutil/cpu" - "github.com/shirou/gopsutil/disk" - "github.com/shirou/gopsutil/mem" - "github.com/shirou/gopsutil/process" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/disk" + "github.com/shirou/gopsutil/v3/mem" + "github.com/shirou/gopsutil/v3/process" ) type SysInfo struct { diff --git a/features/state/state_windows.go b/features/state/state_windows.go index 548dbe3..ef37c5c 100644 --- a/features/state/state_windows.go +++ b/features/state/state_windows.go @@ -11,10 +11,10 @@ import ( "syscall" "time" - "github.com/shirou/gopsutil/cpu" - "github.com/shirou/gopsutil/disk" - "github.com/shirou/gopsutil/mem" - "github.com/shirou/gopsutil/process" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/disk" + "github.com/shirou/gopsutil/v3/mem" + "github.com/shirou/gopsutil/v3/process" ) type SysInfo struct { diff --git a/features/ue/ue.go b/features/ue/ue.go index 29a28d8..a49dc3f 100644 --- a/features/ue/ue.go +++ b/features/ue/ue.go @@ -302,7 +302,7 @@ func GetUEInfoFromNF(w http.ResponseWriter, r *http.Request) { } } -// Get UEInfo from NF/NFs +// POST User Info from NF/NFs func PostPCFUserInfo(w http.ResponseWriter, r *http.Request) { log.Info("PostPCFUserInfo processing... ") @@ -374,7 +374,7 @@ func PostPCFUserInfo(w http.ResponseWriter, r *http.Request) { } } -// Get UEInfo from NF/NFs +// PUT PCF User Info from NF/NFs func PutPCFUserInfo(w http.ResponseWriter, r *http.Request) { log.Info("PutPCFUserInfo processing... ") @@ -575,7 +575,7 @@ func GetUENumFromNF(w http.ResponseWriter, r *http.Request) { } } -// Get UEInfo from NF/NFs +// Get Radio Info from NF/NFs func GetNBInfoFromNF(w http.ResponseWriter, r *http.Request) { log.Info("GetNBInfoFromNF processing... ") @@ -639,6 +639,83 @@ func GetNBInfoFromNF(w http.ResponseWriter, r *http.Request) { } } +// Get Radio Info from NF/NFs +func GetNBInfoAllFromNF(w http.ResponseWriter, r *http.Request) { + log.Info("GetNBInfoAllFromNF processing... ") + + vars := mux.Vars(r) + neType := vars["elementTypeValue"] + if neType == "" { + services.ResponseNotFound404UriNotExist(w, r) + return + } + //neTypeLower := strings.ToLower(neType) + // var neID string + neIDs := services.GetParamsArrByName("neId", r) + // if len(neIDs) == 1 { + // neID = neIDs[0] + // } else { + // services.ResponseNotFound404UriNotExist(w, r) + // return + // } + + // token, err := services.CheckFrontValidRequest(w, r) + // if err != nil { + // log.Error("Request error:", err) + // return + // } + // log.Debug("token:", token) + + //var ret error + var statusCode int = 500 + var dataResponse []services.MapResponse + var neInfos []dborm.NeInfo + dborm.XormGetNeInfo2(neType, neIDs, &neInfos) + for _, neInfo := range neInfos { + // neInfo, err := dborm.XormGetNeInfo(neType, neID) + // if err != nil { + // log.Error("Failed to XormGetNeInfo:", err) + // services.ResponseInternalServerError500ProcessError(w, err) + // return + // } else if neInfo == nil { + // err := global.ErrCMNotFoundTargetNE + // log.Error(global.ErrCMNotFoundTargetNE) + // services.ResponseInternalServerError500ProcessError(w, err) + // return + // } + + hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port) + requestURI2NF := fmt.Sprintf("%s%s", hostUri, r.RequestURI) + + log.Debug("requestURI2NF:", requestURI2NF) + + resp, err := client.R(). + EnableTrace(). + SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}). + //SetHeaders(map[string]string{"accessToken": token}). + SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). + SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). + Get(requestURI2NF) + if err != nil { + log.Error("Failed to GET:", err) + continue + // services.ResponseInternalServerError500ProcessError(w, err) + // return + } else { + switch resp.StatusCode() { + case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: + var response services.MapResponse + _ = json.Unmarshal(resp.Body(), &response) + dataResponse = append(dataResponse, response) + statusCode = http.StatusOK + } + } + } + var response services.DataResponse + response.Data = dataResponse + services.ResponseWithJson(w, statusCode, response) +} + // Get GetUEInfoFileExportNF from NF/NFs func GetUEInfoFileExportNF(w http.ResponseWriter, r *http.Request) { log.Info("GetUEInfoFromNF processing... ") diff --git a/lib/dborm/dborm.go b/lib/dborm/dborm.go index 75ff852..209b64e 100644 --- a/lib/dborm/dborm.go +++ b/lib/dborm/dborm.go @@ -298,7 +298,40 @@ func XormGetNeInfoByNeType(neType string, nes *[]NeInfo) error { } *nes = append(*nes, *ne) } - log.Debug("nes:", nes) + + return nil +} + +func XormGetNeInfo2(neType string, neIDs []string, nes *[]NeInfo) error { + log.Info("XormGetNeInfo2 processing... ") + + ne := new(NeInfo) + var rows *xorm.Rows + var err error + if len(neIDs) == 0 { + rows, err = xEngine.Table("ne_info"). + Where("status in ('0','3') and ne_type=?", neType). + Rows(ne) + } else { + rows, err = xEngine.Table("ne_info"). + In("ne_id", neIDs). + And("status in ('0','3') and ne_type=?", neType). + Rows(ne) + } + if err != nil { + log.Error("Failed to get table ne_info from database:", err) + return err + } + defer rows.Close() + for rows.Next() { + err := rows.Scan(ne) + if err != nil { + log.Error("Failed to get table ne_info from database:", err) + return err + } + *nes = append(*nes, *ne) + } + return nil } @@ -1350,7 +1383,7 @@ func XormGetTableRows(tableName string, where string, tbInfo *[]interface{}) (*[ } *tbInfo = append(*tbInfo, row) } - log.Debug("tbInfo:", tbInfo) + log.Trace("tbInfo:", tbInfo) return tbInfo, nil } @@ -1396,7 +1429,7 @@ func XormGetDataBySQL(sql string) (*[]map[string]string, error) { rows := make([]map[string]string, 0) rows, err := DbClient.XEngine.QueryString(sql) if err != nil { - log.Errorf("Failed to QueryString:", err) + log.Error("Failed to QueryString:", err) return nil, err } @@ -1410,7 +1443,7 @@ func XormDeleteDataByWhere(where, table string) (int64, error) { defer xSession.Close() affected, err := xSession.Table(table).Where(where).Delete() if err != nil { - log.Errorf("Failed to Delete:", err) + log.Error("Failed to Delete:", err) return 0, err } xSession.Commit() diff --git a/lib/global/kits.go b/lib/global/kits.go index b36da02..593f18c 100644 --- a/lib/global/kits.go +++ b/lib/global/kits.go @@ -391,6 +391,27 @@ func GetSecondsSinceDatetime(datetimeStr string) (int64, error) { return seconds, nil } +func GetSecondDuration(time1, time2 string) (int64, error) { + loc1, _ := time.LoadLocation("Local") + // 解析日期时间字符串为时间对象 + t1, err := time.ParseInLocation(time.DateTime, time1, loc1) + if err != nil { + return 0, err + } + t2, err := time.ParseInLocation(time.DateTime, time2, loc1) + if err != nil { + return 0, err + } + + // 计算时间差 + duration := t2.Sub(t1) + + // 获取时间差的秒数 + seconds := int64(duration.Seconds()) + + return seconds, nil +} + // 0: invalid ip // 4: IPv4 // 6: IPv6 diff --git a/lib/routes/routes.go b/lib/routes/routes.go index a480649..d2c94e3 100644 --- a/lib/routes/routes.go +++ b/lib/routes/routes.go @@ -303,6 +303,9 @@ func init() { Register("POST", cdr.UriCDREvent, cdr.PostCDREventFromNF, nil) Register("POST", cdr.CustomUriCDREvent, cdr.PostCDREventFromNF, nil) + // UE event + //Register("POST", event.UriUEEvent, event.PostUEEventFromAMF, nil) + // 进程网络 Register("GET", psnet.UriWs, psnet.ProcessWs, nil) Register("POST", psnet.UriStop, psnet.StopProcess, nil) diff --git a/restagent/etc/restconf.yaml b/restagent/etc/restconf.yaml index d1b9216..6f4d4ed 100644 --- a/restagent/etc/restconf.yaml +++ b/restagent/etc/restconf.yaml @@ -26,7 +26,7 @@ rest: keyFile: ./etc/certs/omc-server.key webServer: - enabled: true + enabled: false rootDir: d:/local.git/fe.ems.vue3/dist listen: - addr: :80 @@ -42,7 +42,7 @@ database: type: mysql user: root # password: 1000omc@kp! - # host: 127.0.0.1 + # host: "192.168.8.103" # port: 33066 name: omc_db backup: d:/local.git/ems.agt/restagent/database diff --git a/restagent/makefile b/restagent/makefile index 0e592c7..b353e9e 100644 --- a/restagent/makefile +++ b/restagent/makefile @@ -1,7 +1,7 @@ # Makefile for rest agent project PROJECT = OMC -VERSION = 2.2401.3 +VERSION = 2.2402.5 PLATFORM = amd64 ARMPLATFORM = aarch64 BUILDDIR = ../../build diff --git a/restagent/restagent.go b/restagent/restagent.go index 26be20e..b58aee2 100644 --- a/restagent/restagent.go +++ b/restagent/restagent.go @@ -10,6 +10,7 @@ import ( "strings" "ems.agt/features/dbrest" + "ems.agt/features/event" "ems.agt/features/fm" "ems.agt/features/lm" "ems.agt/features/pm" @@ -22,6 +23,8 @@ import ( "ems.agt/src/framework/middleware" libSession "ems.agt/src/lib_features/session" "github.com/gin-gonic/gin" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" ) // const defaultConfigFile = "./etc/restconf.yaml" @@ -67,23 +70,29 @@ import ( // } func HttpListen(addr string, router http.Handler) { - err := http.ListenAndServe(addr, router) + // 创建HTTP服务器 + h2s := &http2.Server{ + // ... + } + server := &http.Server{ + Addr: addr, + Handler: h2c.NewHandler(router, h2s), + } + + // // support http 2.0 server + // err := http2.ConfigureServer(server, &http2.Server{}) + // if err != nil { + // fmt.Println("ConfigureServer err:", err) + // os.Exit(11) + // } + err := server.ListenAndServe() if err != nil { fmt.Println("ListenAndServe err:", err) - os.Exit(5) + os.Exit(12) } } func HttpListenTLS(addr, caFile, certFile, keyFile string, clientAuthType int, router http.Handler) { - HttpListenConfigTLS(addr, caFile, certFile, keyFile, clientAuthType, router) - err := http.ListenAndServeTLS(addr, certFile, keyFile, router) - if err != nil { - fmt.Println("ListenAndServeTLS err:", err) - os.Exit(6) - } -} - -func HttpListenConfigTLS(addr, caFile, certFile, keyFile string, clientAuthType int, router http.Handler) { // 加载根证书 caCert, err := os.ReadFile(caFile) if err != nil { @@ -107,27 +116,71 @@ func HttpListenConfigTLS(addr, caFile, certFile, keyFile string, clientAuthType TLSConfig: tlsConfig, } + // support http 2.0 server + http2.ConfigureServer(server, &http2.Server{}) + if err != nil { + fmt.Println("ConfigureServer err:", err) + os.Exit(13) + } err = server.ListenAndServeTLS(certFile, keyFile) if err != nil { fmt.Println("ListenAndServeTLS err:", err) - os.Exit(6) + os.Exit(14) } } func HttpListenWebServerTLS(addr, caFile, certFile, keyFile string, clientAuthType int) { - HttpListenConfigTLS(addr, caFile, certFile, keyFile, clientAuthType, nil) - err := http.ListenAndServeTLS(addr, certFile, keyFile, nil) + // 加载根证书 + caCert, err := os.ReadFile(caFile) + if err != nil { + log.Fatal(err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // 创建自定义的TLS配置 + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS10, + MaxVersion: tls.VersionTLS13, + ClientCAs: caCertPool, + ClientAuth: tls.ClientAuthType(clientAuthType), + } + + // 创建HTTP服务器 + server := &http.Server{ + Addr: addr, + TLSConfig: tlsConfig, + } + + // support http 2.0 server + http2.ConfigureServer(server, &http2.Server{}) + if err != nil { + fmt.Println("ConfigureServer err:", err) + os.Exit(9) + } + err = server.ListenAndServeTLS(certFile, keyFile) if err != nil { fmt.Println("ListenAndServeTLS err:", err) - os.Exit(7) + os.Exit(10) } } func HttpListenWebServer(addr string) { - err := http.ListenAndServe(addr, nil) + // 创建HTTP服务器 + server := &http.Server{ + Addr: addr, + } + + // support http 2.0 server + err := http2.ConfigureServer(server, &http2.Server{}) + if err != nil { + fmt.Println("ConfigureServer err:", err) + os.Exit(7) + } + err = server.ListenAndServe() if err != nil { fmt.Println("ListenAndServe err:", err) - os.Exit(7) + os.Exit(8) } } @@ -164,7 +217,7 @@ func main() { err = dbrest.InitDbClient(conf.Database.Type, conf.Database.User, conf.Database.Password, conf.Database.Host, conf.Database.Port, conf.Database.Name) if err != nil { - fmt.Println("rests.initDbClient err:", err) + fmt.Println("dbrest.initDbClient err:", err) os.Exit(4) } err = lm.InitDbClient(conf.Database.Type, conf.Database.User, conf.Database.Password, @@ -185,6 +238,8 @@ func main() { uriGroup := app.Group(config.UriPrefix) uriGroup.Use(libSession.SessionHeader()) uriGroup.Any("/*any", gin.WrapH(routes.NewRouter())) + // AMF上报的UE事件, 无前缀,暂时特殊处理 + app.POST(event.UriUEEvent, event.PostUEEventFromAMF) // 开启监控采集 // monitor.StartMonitor(false, "") diff --git a/src/app.go b/src/app.go index f422066..939df23 100644 --- a/src/app.go +++ b/src/app.go @@ -12,9 +12,11 @@ import ( "ems.agt/src/modules/common" "ems.agt/src/modules/crontask" "ems.agt/src/modules/monitor" + networkdata "ems.agt/src/modules/network_data" networkelement "ems.agt/src/modules/network_element" "ems.agt/src/modules/system" "ems.agt/src/modules/trace" + "ems.agt/src/modules/ws" "github.com/gin-gonic/gin" ) @@ -117,12 +119,16 @@ func initModulesRoute(app *gin.Engine) { common.Setup(app) // 系统模块 system.Setup(app) - // 网元模块 + // 网元功能模块 networkelement.Setup(app) + // 网元数据模块 + networkdata.Setup(app) // 跟踪模块 trace.Setup(app) // 图表模块 chart.Setup(app) + // ws 模块 + ws.Setup(app) // 调度任务模块--暂无接口 crontask.Setup(app) // 监控模块 - 含调度处理加入队列,放最后 diff --git a/src/framework/constants/cachekey/cachekey.go b/src/framework/constants/cachekey/cachekey.go index 0e69445..dd7c11f 100644 --- a/src/framework/constants/cachekey/cachekey.go +++ b/src/framework/constants/cachekey/cachekey.go @@ -22,3 +22,9 @@ const RATE_LIMIT_KEY = "rate_limit:" // 登录账户密码错误次数 const PWD_ERR_CNT_KEY = "pwd_err_cnt:" + +// 网元信息管理 +const NE_KEY = "ne_info:" + +// 网元数据管理 +const NE_DATA_KEY = "ne_data:" diff --git a/src/framework/middleware/collectlogs/operate_log.go b/src/framework/middleware/collectlogs/operate_log.go index 65da37d..7d344b0 100644 --- a/src/framework/middleware/collectlogs/operate_log.go +++ b/src/framework/middleware/collectlogs/operate_log.go @@ -121,7 +121,7 @@ func OperateLog(options Options) gin.HandlerFunc { BusinessType: options.BusinessType, OperatorType: options.OperatorType, Method: funcName, - OperURL: c.Request.RequestURI, + OperURL: c.Request.URL.Path, RequestMethod: c.Request.Method, OperIP: ipaddr, OperLocation: location, diff --git a/src/framework/middleware/pre_authorize.go b/src/framework/middleware/pre_authorize.go index 0fb8f8f..240ee72 100644 --- a/src/framework/middleware/pre_authorize.go +++ b/src/framework/middleware/pre_authorize.go @@ -14,7 +14,14 @@ import ( ) /**无Token可访问白名单 */ -var URL_WHITE_LIST = []string{"/performanceManagement", "/faultManagement", "/systemState", "/omcNeConfig"} +var URL_WHITE_LIST = []string{ + "/performanceManagement", + "/faultManagement", + "/systemState", + "/omcNeConfig", + "/cdrEvent", + "/upload-ue", +} // PreAuthorize 用户身份授权认证校验 // diff --git a/src/framework/utils/ctx/ctx.go b/src/framework/utils/ctx/ctx.go index ff9efe0..b005e13 100644 --- a/src/framework/utils/ctx/ctx.go +++ b/src/framework/utils/ctx/ctx.go @@ -65,6 +65,11 @@ func IPAddrLocation(c *gin.Context) (string, string) { // Authorization 解析请求头 func Authorization(c *gin.Context) string { + // Query请求查询 + if authQuery, ok := c.GetQuery(token.RESPONSE_FIELD); ok && authQuery != "" { + return authQuery + } + // Header请求头 authHeader := c.GetHeader(token.HEADER_KEY) if authHeader == "" { return "" @@ -99,11 +104,22 @@ func UaOsBrowser(c *gin.Context) (string, string) { // AcceptLanguage 解析客户端接收语言 zh:中文 en: 英文 func AcceptLanguage(c *gin.Context) string { preferredLanguage := language.English - acceptLanguage := c.GetHeader("Accept-Language") - tags, _, _ := language.ParseAcceptLanguage(acceptLanguage) - if len(tags) > 0 { - preferredLanguage = tags[0] + + // Query请求查询 + if v, ok := c.GetQuery("language"); ok && v != "" { + tags, _, _ := language.ParseAcceptLanguage(v) + if len(tags) > 0 { + preferredLanguage = tags[0] + } } + // Header请求头 + if v := c.GetHeader("Accept-Language"); v != "" { + tags, _, _ := language.ParseAcceptLanguage(v) + if len(tags) > 0 { + preferredLanguage = tags[0] + } + } + // 只取前缀 lang := preferredLanguage.String() arr := strings.Split(lang, "-") diff --git a/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go b/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go index b92e3d5..fd7f6fb 100644 --- a/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go +++ b/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go @@ -117,7 +117,7 @@ func (s *BarProcessor) Execute(data any) (any, error) { state := new(SystemState) _ = json.Unmarshal(response.Body(), &state) var dateStr *string = nil - if state.ExpiryDate != "" { + if state.ExpiryDate != "" && state.ExpiryDate != "-" { dateStr = &state.ExpiryDate } neState := new(dborm.NeState) diff --git a/src/modules/monitor/controller/sys_cache.go b/src/modules/monitor/controller/sys_cache.go index 9d4395d..abfeddb 100644 --- a/src/modules/monitor/controller/sys_cache.go +++ b/src/modules/monitor/controller/sys_cache.go @@ -34,14 +34,17 @@ func (s *SysCacheController) Info(c *gin.Context) { // // GET /getNames func (s *SysCacheController) Names(c *gin.Context) { + language := ctx.AcceptLanguage(c) caches := []model.SysCache{ - model.NewSysCacheNames("user", cachekey.LOGIN_TOKEN_KEY), - model.NewSysCacheNames("configuration", cachekey.SYS_CONFIG_KEY), - model.NewSysCacheNames("dictionary", cachekey.SYS_DICT_KEY), - model.NewSysCacheNames("captcha", cachekey.CAPTCHA_CODE_KEY), - model.NewSysCacheNames("anti-submission", cachekey.REPEAT_SUBMIT_KEY), - model.NewSysCacheNames("current-limiting", cachekey.RATE_LIMIT_KEY), - model.NewSysCacheNames("password-errors-number", cachekey.PWD_ERR_CNT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.user"), cachekey.LOGIN_TOKEN_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_config"), cachekey.SYS_CONFIG_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_dict"), cachekey.SYS_DICT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.captcha_codes"), cachekey.CAPTCHA_CODE_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.repeat_submit"), cachekey.REPEAT_SUBMIT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.rate_limit"), cachekey.RATE_LIMIT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.pwd_err_cnt"), cachekey.PWD_ERR_CNT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_info"), cachekey.NE_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_data"), cachekey.NE_DATA_KEY), } c.JSON(200, result.OkData(caches)) } @@ -136,13 +139,17 @@ func (s *SysCacheController) ClearCacheKey(c *gin.Context) { // // DELETE /clearCacheSafe func (s *SysCacheController) ClearCacheSafe(c *gin.Context) { + language := ctx.AcceptLanguage(c) caches := []model.SysCache{ - model.NewSysCacheNames("configuration", cachekey.SYS_CONFIG_KEY), - model.NewSysCacheNames("dictionary", cachekey.SYS_DICT_KEY), - model.NewSysCacheNames("captcha", cachekey.CAPTCHA_CODE_KEY), - model.NewSysCacheNames("anti-submission", cachekey.REPEAT_SUBMIT_KEY), - model.NewSysCacheNames("current-limiting", cachekey.RATE_LIMIT_KEY), - model.NewSysCacheNames("password-errors-number", cachekey.PWD_ERR_CNT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.user"), cachekey.LOGIN_TOKEN_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_config"), cachekey.SYS_CONFIG_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_dict"), cachekey.SYS_DICT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.captcha_codes"), cachekey.CAPTCHA_CODE_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.repeat_submit"), cachekey.REPEAT_SUBMIT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.rate_limit"), cachekey.RATE_LIMIT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.pwd_err_cnt"), cachekey.PWD_ERR_CNT_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_info"), cachekey.NE_KEY), + model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_data"), cachekey.NE_DATA_KEY), } for _, v := range caches { cacheKeys, err := redis.GetKeys("", v.CacheName+":*") diff --git a/src/modules/network_data/controller/amf.go b/src/modules/network_data/controller/amf.go new file mode 100644 index 0000000..1c92fb4 --- /dev/null +++ b/src/modules/network_data/controller/amf.go @@ -0,0 +1,51 @@ +package controller + +import ( + "ems.agt/src/framework/i18n" + "ems.agt/src/framework/utils/ctx" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/network_data/model" + neDataService "ems.agt/src/modules/network_data/service" + neService "ems.agt/src/modules/network_element/service" + "github.com/gin-gonic/gin" +) + +// 实例化控制层 AMFController 结构体 +var NewAMFController = &AMFController{ + neInfoService: neService.NewNeInfoImpl, + ueEventService: neDataService.NewUEEventImpl, +} + +// 网元AMF +// +// PATH /amf +type AMFController struct { + // 网元信息服务 + neInfoService neService.INeInfo + // CDR会话事件服务 + ueEventService neDataService.IUEEvent +} + +// UE会话列表 +// +// GET /ues +func (s *AMFController) UEs(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var querys model.UEEventQuery + if err := c.ShouldBindQuery(&querys); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 查询网元获取IP + neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID) + if neInfo.NeId != querys.NeID || neInfo.IP == "" { + c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo"))) + return + } + querys.RmUID = neInfo.RmUID + + // 查询数据 + data := s.ueEventService.SelectPage(querys) + c.JSON(200, result.Ok(data)) +} diff --git a/src/modules/network_data/controller/ims.go b/src/modules/network_data/controller/ims.go new file mode 100644 index 0000000..dfd4988 --- /dev/null +++ b/src/modules/network_data/controller/ims.go @@ -0,0 +1,51 @@ +package controller + +import ( + "ems.agt/src/framework/i18n" + "ems.agt/src/framework/utils/ctx" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/network_data/model" + neDataService "ems.agt/src/modules/network_data/service" + neService "ems.agt/src/modules/network_element/service" + "github.com/gin-gonic/gin" +) + +// 实例化控制层 IMSController 结构体 +var NewIMSController = &IMSController{ + neInfoService: neService.NewNeInfoImpl, + cdrEventService: neDataService.NewCDREventImpl, +} + +// 网元IMS +// +// PATH /ims +type IMSController struct { + // 网元信息服务 + neInfoService neService.INeInfo + // CDR会话事件服务 + cdrEventService neDataService.ICDREvent +} + +// CDR会话列表 +// +// GET /cdrs +func (s *IMSController) CDRs(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var querys model.CDREventQuery + if err := c.ShouldBindQuery(&querys); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 查询网元获取IP + neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID) + if neInfo.NeId != querys.NeID || neInfo.IP == "" { + c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo"))) + return + } + querys.RmUID = neInfo.RmUID + + // 查询数据 + data := s.cdrEventService.SelectPage(querys) + c.JSON(200, result.Ok(data)) +} diff --git a/src/modules/network_data/controller/perf_kpi.go b/src/modules/network_data/controller/perf_kpi.go new file mode 100644 index 0000000..18274f1 --- /dev/null +++ b/src/modules/network_data/controller/perf_kpi.go @@ -0,0 +1,84 @@ +package controller + +import ( + "ems.agt/lib/core/utils/date" + "ems.agt/src/framework/i18n" + "ems.agt/src/framework/utils/ctx" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/network_data/model" + neDataService "ems.agt/src/modules/network_data/service" + neService "ems.agt/src/modules/network_element/service" + "github.com/gin-gonic/gin" +) + +// 实例化控制层 PerfKPIController 结构体 +var NewPerfKPIController = &PerfKPIController{ + neInfoService: neService.NewNeInfoImpl, + perfKPIService: neDataService.NewPerfKPIImpl, +} + +// 性能统计 +// +// PATH /kpi +type PerfKPIController struct { + // 网元信息服务 + neInfoService neService.INeInfo + // 统计信息服务 + perfKPIService neDataService.IPerfKPI +} + +// 获取统计数据 +// +// GET /data +func (s *PerfKPIController) GoldKPI(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var querys model.GoldKPIQuery + if err := c.ShouldBindQuery(&querys); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 时间格式校验 + startTime := date.ParseStrToDate(querys.StartTime, date.YYYY_MM_DD_HH_MM_SS) + if startTime.IsZero() { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + endTime := date.ParseStrToDate(querys.EndTime, date.YYYY_MM_DD_HH_MM_SS) + if endTime.IsZero() { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + if querys.Interval < 5 || querys.Interval > 3600 { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 查询网元获取IP + neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID) + if neInfo.NeId != querys.NeID || neInfo.IP == "" { + c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo"))) + return + } + querys.RmUID = neInfo.RmUID + + // 查询数据 + kpiData := s.perfKPIService.SelectGoldKPI(querys) + c.JSON(200, result.OkData(kpiData)) +} + +// 获取统计标题 +// +// GET /title +func (s *PerfKPIController) Title(c *gin.Context) { + language := ctx.AcceptLanguage(c) + neType := c.Query("neType") + if neType == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + kpiTitles := s.perfKPIService.SelectGoldKPITitle(neType) + + c.JSON(200, result.OkData(kpiTitles)) +} diff --git a/src/modules/network_data/controller/upf.go b/src/modules/network_data/controller/upf.go new file mode 100644 index 0000000..8a6b285 --- /dev/null +++ b/src/modules/network_data/controller/upf.go @@ -0,0 +1,54 @@ +package controller + +import ( + "ems.agt/src/framework/i18n" + "ems.agt/src/framework/utils/ctx" + "ems.agt/src/framework/vo/result" + neDataService "ems.agt/src/modules/network_data/service" + neService "ems.agt/src/modules/network_element/service" + "github.com/gin-gonic/gin" +) + +// 实例化控制层 UPFController 结构体 +var NewUPFController = &UPFController{ + neInfoService: neService.NewNeInfoImpl, + perfKPIService: neDataService.NewPerfKPIImpl, +} + +// 网元UPF +// +// PATH /upf +type UPFController struct { + // 网元信息服务 + neInfoService neService.INeInfo + // 统计信息服务 + perfKPIService neDataService.IPerfKPI +} + +// 总流量数 N3上行 N6下行 +// 单位 比特(bit) +// +// GET /totalFlow +func (s *UPFController) TotalFlow(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var querys struct { + NeType string `json:"neType" form:"neType" binding:"required"` + NeID string `form:"neId" binding:"required"` + Day int `form:"day" binding:"required"` + } + if err := c.ShouldBindQuery(&querys); querys.Day < 0 || err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 查询网元获取IP + neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID) + if neInfo.NeId != querys.NeID || neInfo.IP == "" { + c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo"))) + return + } + + data := s.perfKPIService.SelectUPFTotalFlow(neInfo.NeType, neInfo.RmUID, querys.Day) + + c.JSON(200, result.OkData(data)) +} diff --git a/src/modules/network_data/model/cdr_event.go b/src/modules/network_data/model/cdr_event.go new file mode 100644 index 0000000..96896c2 --- /dev/null +++ b/src/modules/network_data/model/cdr_event.go @@ -0,0 +1,28 @@ +package model + +import "time" + +// CDREvent CDR会话对象 cdr_event +type CDREvent struct { + ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"` + NeType string `json:"neType" gorm:"column:ne_type"` + NeName string `json:"neName" gorm:"column:ne_name"` + RmUID string `json:"rmUID" gorm:"column:rm_uid"` + Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` + CDRJSONStr string `json:"cdrJSON" gorm:"column:cdr_json"` + CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;default:CURRENT_TIMESTAMP"` +} + +// CDREventQuery CDR会话对象查询参数结构体 +type CDREventQuery struct { + NeType string `json:"neType" form:"neType" binding:"required"` + NeID string `json:"neId" form:"neId" binding:"required"` + RmUID string `json:"rmUID" form:"rmUID"` + RecordType string `json:"recordType" form:"recordType"` + StartTime string `json:"startTime" form:"startTime"` + EndTime string `json:"endTime" form:"endTime"` + SortField string `json:"sortField" form:"sortField" binding:"omitempty,oneof=timestamp"` + SortOrder string `json:"sortOrder" form:"sortOrder" binding:"omitempty,oneof=asc desc"` + PageNum int64 `json:"pageNum" form:"pageNum" binding:"required"` + PageSize int64 `json:"pageSize" form:"pageSize" binding:"required"` +} diff --git a/src/modules/network_data/model/perf_kpi.go b/src/modules/network_data/model/perf_kpi.go new file mode 100644 index 0000000..108d81f --- /dev/null +++ b/src/modules/network_data/model/perf_kpi.go @@ -0,0 +1,23 @@ +package model + +// GoldKPITitle 黄金指标标题信息对象 kpi_title +type GoldKPITitle struct { + ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"` + NeType string `json:"neType" gorm:"column:ne_type"` + KPIID string `json:"kpiId" gorm:"column:kpi_id"` + TitleJson string `json:"titleJson" gorm:"column:title_json"` + CnTitle string `json:"cnTitle" gorm:"column:cn_title"` + EnTitle string `json:"enTitle" gorm:"column:en_title"` +} + +// GoldKPIQuery 黄金指标查询参数结构体 +type GoldKPIQuery struct { + NeType string `form:"neType" binding:"required"` + NeID string `form:"neId" binding:"required"` + StartTime string `form:"startTime" binding:"required"` + EndTime string `form:"endTime" binding:"required"` + Interval int64 `form:"interval" binding:"required"` + RmUID string `form:"rmUID"` + SortField string `form:"sortField" binding:"omitempty,oneof=timeGroup"` + SortOrder string `form:"sortOrder" binding:"omitempty,oneof=asc desc"` +} diff --git a/src/modules/network_data/model/ue_event.go b/src/modules/network_data/model/ue_event.go new file mode 100644 index 0000000..eed904f --- /dev/null +++ b/src/modules/network_data/model/ue_event.go @@ -0,0 +1,28 @@ +package model + +import "time" + +// UEEvent UE会话对象 ue_event +type UEEvent struct { + ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"` + NeType string `json:"neType" gorm:"column:ne_type"` + NeName string `json:"neName" gorm:"column:ne_name"` + RmUID string `json:"rmUID" gorm:"column:rm_uid"` + Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` + EventType string `json:"eventType" gorm:"column:event_type"` + EventJSONStr string `json:"eventJSON" gorm:"column:event_json"` + CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;default:CURRENT_TIMESTAMP"` +} + +// UEEventQuery UE会话对象查询参数结构体 +type UEEventQuery struct { + NeType string `json:"neType" form:"neType" binding:"required"` + NeID string `json:"neId" form:"neId" binding:"required"` + RmUID string `json:"rmUID" form:"rmUID"` + StartTime string `json:"startTime" form:"startTime"` + EndTime string `json:"endTime" form:"endTime"` + SortField string `json:"sortField" form:"sortField" binding:"omitempty,oneof=timestamp"` + SortOrder string `json:"sortOrder" form:"sortOrder" binding:"omitempty,oneof=asc desc"` + PageNum int64 `json:"pageNum" form:"pageNum" binding:"required"` + PageSize int64 `json:"pageSize" form:"pageSize" binding:"required"` +} diff --git a/src/modules/network_data/network_data.go b/src/modules/network_data/network_data.go new file mode 100644 index 0000000..41f83b0 --- /dev/null +++ b/src/modules/network_data/network_data.go @@ -0,0 +1,58 @@ +package networkdata + +import ( + "ems.agt/src/framework/logger" + "ems.agt/src/framework/middleware" + "ems.agt/src/modules/network_data/controller" + + "github.com/gin-gonic/gin" +) + +// 模块路由注册 +func Setup(router *gin.Engine) { + logger.Infof("开始加载 ====> network_data 模块路由") + + neDataGroup := router.Group("/neData") + + // 性能统计信息 + kpiGroup := neDataGroup.Group("/kpi") + { + kpiGroup.GET("/title", + middleware.PreAuthorize(nil), + controller.NewPerfKPIController.Title, + ) + kpiGroup.GET("/data", + middleware.PreAuthorize(nil), + controller.NewPerfKPIController.GoldKPI, + ) + } + + // 网元IMS + imsGroup := neDataGroup.Group("/ims") + { + // CDR会话事件信息 + imsGroup.GET("/cdrs", + middleware.PreAuthorize(nil), + controller.NewIMSController.CDRs, + ) + } + + // 网元AMF + amfGroup := neDataGroup.Group("/amf") + { + amfGroup.GET("/ues", + middleware.PreAuthorize(nil), + controller.NewAMFController.UEs, + ) + } + + // 网元UPF + upfGroup := neDataGroup.Group("/upf") + { + upfGroup.GET("/totalFlow", + middleware.PreAuthorize(nil), + controller.NewUPFController.TotalFlow, + ) + } + +} diff --git a/src/modules/network_data/repository/cdr_event.go b/src/modules/network_data/repository/cdr_event.go new file mode 100644 index 0000000..0df96d9 --- /dev/null +++ b/src/modules/network_data/repository/cdr_event.go @@ -0,0 +1,9 @@ +package repository + +import "ems.agt/src/modules/network_data/model" + +// CDR会话事件 数据层接口 +type ICDREvent interface { + // SelectPage 根据条件分页查询 + SelectPage(querys model.CDREventQuery) map[string]any +} diff --git a/src/modules/network_data/repository/cdr_event.impl.go b/src/modules/network_data/repository/cdr_event.impl.go new file mode 100644 index 0000000..b8cf5df --- /dev/null +++ b/src/modules/network_data/repository/cdr_event.impl.go @@ -0,0 +1,136 @@ +package repository + +import ( + "fmt" + "strings" + + "ems.agt/src/framework/datasource" + "ems.agt/src/framework/logger" + "ems.agt/src/framework/utils/date" + "ems.agt/src/framework/utils/parse" + "ems.agt/src/framework/utils/repo" + "ems.agt/src/modules/network_data/model" +) + +// 实例化数据层 CDREventImpl 结构体 +var NewCDREventImpl = &CDREventImpl{ + selectSql: `select id, ne_type, ne_name, rm_uid, timestamp, cdr_json, created_at from cdr_event`, + + resultMap: map[string]string{ + "id": "ID", + "ne_type": "NeType", + "ne_name": "NeName", + "rm_uid": "RmUID", + "timestamp": "Timestamp", + "cdr_json": "CDRJSONStr", + "created_at": "CreatedAt", + }, +} + +// CDREventImpl CDR会话事件 数据层处理 +type CDREventImpl struct { + // 查询视图对象SQL + selectSql string + // 结果字段与实体映射 + resultMap map[string]string +} + +// convertResultRows 将结果记录转实体结果组 +func (r *CDREventImpl) convertResultRows(rows []map[string]any) []model.CDREvent { + arr := make([]model.CDREvent, 0) + for _, row := range rows { + item := model.CDREvent{} + for key, value := range row { + if keyMapper, ok := r.resultMap[key]; ok { + repo.SetFieldValue(&item, keyMapper, value) + } + } + arr = append(arr, item) + } + return arr +} + +// SelectPage 根据条件分页查询 +func (r *CDREventImpl) SelectPage(querys model.CDREventQuery) map[string]any { + // 查询条件拼接 + var conditions []string + var params []any + if querys.NeType != "" { + conditions = append(conditions, "ne_type = ?") + params = append(params, querys.NeType) + } + if querys.RmUID != "" { + conditions = append(conditions, "rm_uid = ?") + params = append(params, querys.RmUID) + } + if querys.StartTime != "" { + conditions = append(conditions, "timestamp >= ?") + beginDate := date.ParseStrToDate(querys.StartTime, date.YYYY_MM_DD_HH_MM_SS) + params = append(params, beginDate.Unix()) + } + if querys.EndTime != "" { + conditions = append(conditions, "timestamp <= ?") + endDate := date.ParseStrToDate(querys.EndTime, date.YYYY_MM_DD_HH_MM_SS) + params = append(params, endDate.Unix()) + } + if querys.RecordType != "" { + conditions = append(conditions, "JSON_EXTRACT(cdr_json, '$.recordType') = ?") + params = append(params, querys.RecordType) + } + + // 构建查询条件语句 + whereSql := "" + if len(conditions) > 0 { + whereSql += " where " + strings.Join(conditions, " and ") + } + + result := map[string]any{ + "total": 0, + "rows": []model.CDREvent{}, + } + + // 查询数量 长度为0直接返回 + totalSql := "select count(1) as 'total' from cdr_event" + totalRows, err := datasource.RawDB("", totalSql+whereSql, params) + if err != nil { + logger.Errorf("total err => %v", err) + return result + } + total := parse.Number(totalRows[0]["total"]) + if total == 0 { + return result + } else { + result["total"] = total + } + + // 分页 + pageNum, pageSize := repo.PageNumSize(querys.PageNum, querys.PageSize) + pageSql := " limit ?,? " + params = append(params, pageNum*pageSize) + params = append(params, pageSize) + + // 排序 + orderSql := "" + if querys.SortField != "" { + sortSql := querys.SortField + if querys.SortOrder != "" { + if querys.SortOrder == "desc" { + sortSql += " desc " + } else { + sortSql += " asc " + } + } + orderSql = fmt.Sprintf(" order by %s ", sortSql) + } + + // 查询数据 + querySql := r.selectSql + whereSql + orderSql + pageSql + results, err := datasource.RawDB("", querySql, params) + if err != nil { + logger.Errorf("query err => %v", err) + } + + // 转换实体 + result["rows"] = r.convertResultRows(results) + return result +} diff --git a/src/modules/network_data/repository/perf_kpi.go b/src/modules/network_data/repository/perf_kpi.go new file mode 100644 index 0000000..6376d79 --- /dev/null +++ b/src/modules/network_data/repository/perf_kpi.go @@ -0,0 +1,15 @@ +package repository + +import "ems.agt/src/modules/network_data/model" + +// 性能统计 数据层接口 +type IPerfKPI interface { + // SelectGoldKPI 通过网元指标数据信息 + SelectGoldKPI(query model.GoldKPIQuery, kpiIds []string) []map[string]any + + // SelectGoldKPITitle 网元对应的指标名称 + SelectGoldKPITitle(neType string) []model.GoldKPITitle + + // SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行 + SelectUPFTotalFlow(neType, rmUID, startDate, endDate string) map[string]any +} diff --git a/src/modules/network_data/repository/perf_kpi.impl.go b/src/modules/network_data/repository/perf_kpi.impl.go new file mode 100644 index 0000000..b665347 --- /dev/null +++ b/src/modules/network_data/repository/perf_kpi.impl.go @@ -0,0 +1,133 @@ +package repository + +import ( + "fmt" + "strings" + + "ems.agt/src/framework/datasource" + "ems.agt/src/framework/logger" + "ems.agt/src/modules/network_data/model" +) + +// 实例化数据层 PerfKPIImpl 结构体 +var NewPerfKPIImpl = &PerfKPIImpl{} + +// PerfKPIImpl 性能统计 数据层处理 +type PerfKPIImpl struct{} + +// SelectGoldKPI 通过网元指标数据信息 +func (r *PerfKPIImpl) SelectGoldKPI(query model.GoldKPIQuery, kpiIds []string) []map[string]any { + // 查询条件拼接 + var conditions []string + var params []any + if query.RmUID != "" { + conditions = append(conditions, "gk.rm_uid = ?") + params = append(params, query.RmUID) + } + if query.NeType != "" { + conditions = append(conditions, "gk.ne_type = ?") + params = append(params, query.NeType) + } + if query.StartTime != "" { + conditions = append(conditions, "gk.start_time >= ?") + params = append(params, query.StartTime) + } + if query.EndTime != "" { + conditions = append(conditions, "gk.start_time <= ?") + params = append(params, query.EndTime) + } + // 构建查询条件语句 + whereSql := "" + if len(conditions) > 0 { + whereSql += " where " + strings.Join(conditions, " and ") + } + + // 查询字段列 + timeFormat := "DATE_FORMAT(gk.start_time, '%Y-%m-%d %H:%i:')" + secondGroup := fmt.Sprintf("LPAD(FLOOR(SECOND(gk.start_time) / %d) * %d, 2, '0')", query.Interval, query.Interval) + groupByField := fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, secondGroup) + if query.Interval > 60 { + minute := query.Interval / 60 + timeFormat = "DATE_FORMAT(gk.start_time, '%Y-%m-%d %H:')" + minuteGroup := fmt.Sprintf("LPAD(FLOOR(MINUTE(gk.start_time) / %d) * %d, 2, '0')", minute, minute) + groupByField = fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, minuteGroup) + } + var fields = []string{ + groupByField, + "min(CASE WHEN gk.index != '' THEN gk.index ELSE 0 END) AS startIndex", + "min(CASE WHEN gk.ne_type != '' THEN gk.ne_type ELSE 0 END) AS neType", + "min(CASE WHEN gk.ne_name != '' THEN gk.ne_name ELSE 0 END) AS neName", + } + for _, kid := range kpiIds { + // 特殊字段,只取最后一次收到的非0值 + if kid == "AMF.01" || kid == "UDM.01" || kid == "UDM.02" || kid == "UDM.03" || kid == "SMF.01" { + str := fmt.Sprintf("IFNULL(SUBSTRING_INDEX(GROUP_CONCAT( CASE WHEN gk.kpi_id = '%s' and gk.VALUE != 0 THEN gk.VALUE END ), ',', 1), 0) AS '%s'", kid, kid) + fields = append(fields, str) + } else { + str := fmt.Sprintf("sum(CASE WHEN gk.kpi_id = '%s' THEN gk.value ELSE 0 END) AS '%s'", kid, kid) + fields = append(fields, str) + } + } + fieldsSql := strings.Join(fields, ",") + + // 查询数据 + if query.SortField == "" { + query.SortField = "timeGroup" + } + if query.SortOrder == "" { + query.SortOrder = "desc" + } + orderSql := fmt.Sprintf(" order by %s %s", query.SortField, query.SortOrder) + querySql := fmt.Sprintf("SELECT %s FROM gold_kpi gk %s GROUP BY timeGroup %s", fieldsSql, whereSql, orderSql) + results, err := datasource.RawDB("", querySql, params) + if err != nil { + logger.Errorf("query err => %v", err) + } + return results +} + +// SelectGoldKPITitle 网元对应的指标名称 +func (r *PerfKPIImpl) SelectGoldKPITitle(neType string) []model.GoldKPITitle { + result := []model.GoldKPITitle{} + tx := datasource.DefaultDB().Table("kpi_title").Where("ne_type = ?", neType).Find(&result) + if err := tx.Error; err != nil { + logger.Errorf("Find err => %v", err) + } + return result +} + +// SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行 +func (r *PerfKPIImpl) SelectUPFTotalFlow(neType, rmUID, startDate, endDate string) map[string]any { + // 查询条件拼接 + var conditions []string + var params []any + if neType != "" { + conditions = append(conditions, "gk.ne_type = ?") + params = append(params, neType) + } + if rmUID != "" { + conditions = append(conditions, "gk.rm_uid = ?") + params = append(params, rmUID) + } + if startDate != "" { + conditions = append(conditions, "gk.date >= ?") + params = append(params, startDate) + } + if endDate != "" { + conditions = append(conditions, "gk.date <= ?") + params = append(params, endDate) + } + // 构建查询条件语句 + whereSql := "" + if len(conditions) > 0 { + whereSql += " where " + strings.Join(conditions, " and ") + } + + // 查询数据 + querySql := fmt.Sprintf("SELECT sum( CASE WHEN gk.kpi_id = 'UPF.03' THEN gk.VALUE ELSE 0 END ) AS 'up', sum( CASE WHEN gk.kpi_id = 'UPF.06' THEN gk.VALUE ELSE 0 END ) AS 'down' FROM gold_kpi gk %s", whereSql) + results, err := datasource.RawDB("", querySql, params) + if err != nil { + logger.Errorf("query err => %v", err) + } + return results[0] +} diff --git a/src/modules/network_data/repository/ue_event.go b/src/modules/network_data/repository/ue_event.go new file mode 100644 index 0000000..42c520c --- /dev/null +++ b/src/modules/network_data/repository/ue_event.go @@ -0,0 +1,9 @@ +package repository + +import "ems.agt/src/modules/network_data/model" + +// UE会话事件 数据层接口 +type IUEEvent interface { + // SelectPage 根据条件分页查询 + SelectPage(querys model.UEEventQuery) map[string]any +} diff --git a/src/modules/network_data/repository/ue_event.impl.go b/src/modules/network_data/repository/ue_event.impl.go new file mode 100644 index 0000000..66e104c --- /dev/null +++ b/src/modules/network_data/repository/ue_event.impl.go @@ -0,0 +1,133 @@ +package repository + +import ( + "fmt" + "strings" + + "ems.agt/src/framework/datasource" + "ems.agt/src/framework/logger" + "ems.agt/src/framework/utils/date" + "ems.agt/src/framework/utils/parse" + "ems.agt/src/framework/utils/repo" + "ems.agt/src/modules/network_data/model" +) + +// 实例化数据层 UEEventImpl 结构体 +var NewUEEventImpl = &UEEventImpl{ + selectSql: `select id, ne_type, ne_name, rm_uid, timestamp, event_type, event_json, created_at from ue_event`, + + resultMap: map[string]string{ + "id": "ID", + "ne_type": "NeType", + "ne_name": "NeName", + "rm_uid": "RmUID", + "timestamp": "Timestamp", + "event_type": "EventType", + "event_json": "EventJSONStr", + "created_at": "CreatedAt", + }, +} + +// UEEventImpl UE会话事件 数据层处理 +type UEEventImpl struct { + // 查询视图对象SQL + selectSql string + // 结果字段与实体映射 + resultMap map[string]string +} + +// convertResultRows 将结果记录转实体结果组 +func (r *UEEventImpl) convertResultRows(rows []map[string]any) []model.UEEvent { + arr := make([]model.UEEvent, 0) + for _, row := range rows { + item := model.UEEvent{} + for key, value := range row { + if keyMapper, ok := r.resultMap[key]; ok { + repo.SetFieldValue(&item, keyMapper, value) + } + } + arr = append(arr, item) + } + return arr +} + +// SelectPage 根据条件分页查询 +func (r *UEEventImpl) SelectPage(querys model.UEEventQuery) map[string]any { + // 查询条件拼接 + var conditions []string + var params []any + if querys.NeType != "" { + conditions = append(conditions, "ne_type = ?") + params = append(params, querys.NeType) + } + if querys.RmUID != "" { + conditions = append(conditions, "rm_uid = ?") + params = append(params, querys.RmUID) + } + if querys.StartTime != "" { + conditions = append(conditions, "timestamp >= ?") + beginDate := date.ParseStrToDate(querys.StartTime, date.YYYY_MM_DD_HH_MM_SS) + params = append(params, beginDate.Unix()) + } + if querys.EndTime != "" { + conditions = append(conditions, "timestamp <= ?") + endDate := date.ParseStrToDate(querys.EndTime, date.YYYY_MM_DD_HH_MM_SS) + params = append(params, endDate.Unix()) + } + + // 构建查询条件语句 + whereSql := "" + if len(conditions) > 0 { + whereSql += " where " + strings.Join(conditions, " and ") + } + + result := map[string]any{ + "total": 0, + "rows": []model.CDREvent{}, + } + + // 查询数量 长度为0直接返回 + totalSql := "select count(1) as 'total' from ue_event" + totalRows, err := datasource.RawDB("", totalSql+whereSql, params) + if err != nil { + logger.Errorf("total err => %v", err) + return result + } + total := parse.Number(totalRows[0]["total"]) + if total == 0 { + return result + } else { + result["total"] = total + } + + // 分页 + pageNum, pageSize := repo.PageNumSize(querys.PageNum, querys.PageSize) + pageSql := " limit ?,? " + params = append(params, pageNum*pageSize) + params = append(params, pageSize) + + // 排序 + orderSql := "" + if querys.SortField != "" { + sortSql := querys.SortField + if querys.SortOrder != "" { + if querys.SortOrder == "desc" { + sortSql += " desc " + } else { + sortSql += " asc " + } + } + orderSql = fmt.Sprintf(" order by %s ", sortSql) + } + + // 查询数据 + querySql := r.selectSql + whereSql + orderSql + pageSql + results, err := datasource.RawDB("", querySql, params) + if err != nil { + logger.Errorf("query err => %v", err) + } + + // 转换实体 + result["rows"] = r.convertResultRows(results) + return result +} diff --git a/src/modules/network_data/service/cdr_event.go b/src/modules/network_data/service/cdr_event.go new file mode 100644 index 0000000..8efc223 --- /dev/null +++ b/src/modules/network_data/service/cdr_event.go @@ -0,0 +1,9 @@ +package service + +import "ems.agt/src/modules/network_data/model" + +// CDR会话事件 服务层接口 +type ICDREvent interface { + // SelectPage 根据条件分页查询 + SelectPage(querys model.CDREventQuery) map[string]any +} diff --git a/src/modules/network_data/service/cdr_event.impl.go b/src/modules/network_data/service/cdr_event.impl.go new file mode 100644 index 0000000..e6a950c --- /dev/null +++ b/src/modules/network_data/service/cdr_event.impl.go @@ -0,0 +1,22 @@ +package service + +import ( + "ems.agt/src/modules/network_data/model" + "ems.agt/src/modules/network_data/repository" +) + +// 实例化数据层 CDREventImpl 结构体 +var NewCDREventImpl = &CDREventImpl{ + cdrEventRepository: repository.NewCDREventImpl, +} + +// CDREventImpl CDR会话事件 服务层处理 +type CDREventImpl struct { + // CDR会话事件数据信息 + cdrEventRepository repository.ICDREvent +} + +// SelectPage 根据条件分页查询 +func (r *CDREventImpl) SelectPage(querys model.CDREventQuery) map[string]any { + return r.cdrEventRepository.SelectPage(querys) +} diff --git a/src/modules/network_data/service/perf_kpi.go b/src/modules/network_data/service/perf_kpi.go new file mode 100644 index 0000000..41a9364 --- /dev/null +++ b/src/modules/network_data/service/perf_kpi.go @@ -0,0 +1,15 @@ +package service + +import "ems.agt/src/modules/network_data/model" + +// 性能统计 服务层接口 +type IPerfKPI interface { + // SelectGoldKPI 通过网元指标数据信息 + SelectGoldKPI(query model.GoldKPIQuery) []map[string]any + + // SelectGoldKPITitle 网元对应的指标名称 + SelectGoldKPITitle(neType string) []model.GoldKPITitle + + // SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行 + SelectUPFTotalFlow(neType, rmUID string, day int) map[string]any +} diff --git a/src/modules/network_data/service/perf_kpi.impl.go b/src/modules/network_data/service/perf_kpi.impl.go new file mode 100644 index 0000000..4e193d7 --- /dev/null +++ b/src/modules/network_data/service/perf_kpi.impl.go @@ -0,0 +1,76 @@ +package service + +import ( + "encoding/json" + "fmt" + "time" + + "ems.agt/src/framework/constants/cachekey" + "ems.agt/src/framework/redis" + "ems.agt/src/modules/network_data/model" + "ems.agt/src/modules/network_data/repository" +) + +// 实例化数据层 PerfKPIImpl 结构体 +var NewPerfKPIImpl = &PerfKPIImpl{ + perfKPIRepository: repository.NewPerfKPIImpl, +} + +// PerfKPIImpl 性能统计 服务层处理 +type PerfKPIImpl struct { + // 性能统计数据信息 + perfKPIRepository repository.IPerfKPI +} + +// SelectGoldKPI 通过网元指标数据信息 +func (r *PerfKPIImpl) SelectGoldKPI(query model.GoldKPIQuery) []map[string]any { + // 获取数据指标id + var kpiIds []string + kpiTitles := r.perfKPIRepository.SelectGoldKPITitle(query.NeType) + for _, kpiId := range kpiTitles { + kpiIds = append(kpiIds, kpiId.KPIID) + } + + data := r.perfKPIRepository.SelectGoldKPI(query, kpiIds) + if data == nil { + return []map[string]any{} + } + return data +} + +// SelectGoldKPITitle 网元对应的指标名称 +func (r *PerfKPIImpl) SelectGoldKPITitle(neType string) []model.GoldKPITitle { + return r.perfKPIRepository.SelectGoldKPITitle(neType) +} + +// SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行 +func (r *PerfKPIImpl) SelectUPFTotalFlow(neType, rmUID string, day int) map[string]any { + // 获取当前日期 + now := time.Now() + endDate := now.Format("2006-01-02") + // 将当前日期前几天数 + afterDays := now.AddDate(0, 0, -day) + startDate := afterDays.Format("2006-01-02") + + var info map[string]any + + // 读取缓存数据 + key := fmt.Sprintf("%sUPFTotalFlow_%s_%d", cachekey.NE_DATA_KEY, rmUID, day) + infoStr, _ := redis.Get("", key) + if infoStr != "" { + json.Unmarshal([]byte(infoStr), &info) + expireSecond, _ := redis.GetExpire("", key) + expireMinute := (time.Duration(int64(expireSecond)) * time.Second) + if expireMinute > 1*time.Minute { + return info + } + } + + info = r.perfKPIRepository.SelectUPFTotalFlow(neType, rmUID, startDate, endDate) + + // 保存到缓存 + infoJSON, _ := json.Marshal(info) + redis.SetByExpire("", key, string(infoJSON), time.Duration(10)*time.Minute) + + return info +} diff --git a/src/modules/network_data/service/ue_event.go b/src/modules/network_data/service/ue_event.go new file mode 100644 index 0000000..e85c689 --- /dev/null +++ b/src/modules/network_data/service/ue_event.go @@ -0,0 +1,9 @@ +package service + +import "ems.agt/src/modules/network_data/model" + +// UE会话事件 服务层接口 +type IUEEvent interface { + // SelectPage 根据条件分页查询 + SelectPage(querys model.UEEventQuery) map[string]any +} diff --git a/src/modules/network_data/service/ue_event.impl.go b/src/modules/network_data/service/ue_event.impl.go new file mode 100644 index 0000000..46e7354 --- /dev/null +++ b/src/modules/network_data/service/ue_event.impl.go @@ -0,0 +1,22 @@ +package service + +import ( + "ems.agt/src/modules/network_data/model" + "ems.agt/src/modules/network_data/repository" +) + +// 实例化数据层 UEEventImpl 结构体 +var NewUEEventImpl = &UEEventImpl{ + ueEventRepository: repository.NewUEEventImpl, +} + +// UEEventImpl UE会话事件 服务层处理 +type UEEventImpl struct { + // UE会话事件数据信息 + ueEventRepository repository.IUEEvent +} + +// SelectPage 根据条件分页查询 +func (r *UEEventImpl) SelectPage(querys model.UEEventQuery) map[string]any { + return r.ueEventRepository.SelectPage(querys) +} diff --git a/src/modules/network_element/controller/ne_info.go b/src/modules/network_element/controller/ne_info.go index 7e21666..975f7d1 100644 --- a/src/modules/network_element/controller/ne_info.go +++ b/src/modules/network_element/controller/ne_info.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "sync" "ems.agt/src/framework/i18n" "ems.agt/src/framework/utils/ctx" @@ -26,7 +27,8 @@ type NeInfoController struct { } // neStateCacheMap 网元状态缓存最后一次成功的信息 -var neStateCacheMap map[string]map[string]any = make(map[string]map[string]any) +var neStateCacheMap sync.Map +var mutex sync.Mutex // 网元状态 // @@ -52,12 +54,13 @@ func (s *NeInfoController) NeState(c *gin.Context) { // 网元直连 resData, err := neService.NeState(neInfo) if err != nil { + mutex.Lock() // 异常取上次缓存 - if v, ok := neStateCacheMap[neKey]; ok && v != nil { - v["online"] = false - neStateCacheMap[neKey] = v + resDataCache, ok := neStateCacheMap.Load(neKey) + if ok && resDataCache != nil { + resDataCache.(map[string]any)["online"] = false } else { - neStateCacheMap[neKey] = map[string]any{ + resDataCache = map[string]any{ "online": false, "neId": neInfo.NeId, "neName": neInfo.NeName, @@ -65,13 +68,17 @@ func (s *NeInfoController) NeState(c *gin.Context) { "neIP": neInfo.IP, } } - c.JSON(200, result.OkData(neStateCacheMap[neKey])) + neStateCacheMap.Store(neKey, resDataCache) + mutex.Unlock() + c.JSON(200, result.OkData(resDataCache)) return } // 存入缓存 resData["online"] = true - neStateCacheMap[neKey] = resData + mutex.Lock() + neStateCacheMap.Store(neKey, resData) + mutex.Unlock() c.JSON(200, result.OkData(resData)) } diff --git a/src/modules/network_element/network_element.go b/src/modules/network_element/network_element.go index 6ae71f7..7f7f7e3 100644 --- a/src/modules/network_element/network_element.go +++ b/src/modules/network_element/network_element.go @@ -6,6 +6,7 @@ import ( "ems.agt/src/framework/middleware/collectlogs" "ems.agt/src/framework/middleware/repeat" "ems.agt/src/modules/network_element/controller" + "ems.agt/src/modules/network_element/service" "github.com/gin-gonic/gin" ) @@ -14,7 +15,12 @@ import ( func Setup(router *gin.Engine) { logger.Infof("开始加载 ====> network_element 模块路由") + // 启动时需要的初始参数 + InitLoad() + neGroup := router.Group("/ne") + + // 网元信息 { neGroup.GET("/info", middleware.PreAuthorize(nil), @@ -155,18 +161,10 @@ func Setup(router *gin.Engine) { controller.NewUDMSub.Import, ) } - - // 性能统计信息 - kpiGroup := neGroup.Group("/kpi") - { - kpiGroup.GET("/title", - middleware.PreAuthorize(nil), - controller.NewPerfKPI.Title, - ) - kpiGroup.GET("/data", - middleware.PreAuthorize(nil), - controller.NewPerfKPI.GoldKPI, - ) - } - +} + +// InitLoad 初始参数 +func InitLoad() { + // 启动时,清除缓存-网元类型 + service.NewNeInfoImpl.ClearNeCacheByNeType("*") } diff --git a/src/modules/network_element/service/ne_direct_link.go b/src/modules/network_element/service/ne_direct_link.go index 28025a8..596fd9a 100644 --- a/src/modules/network_element/service/ne_direct_link.go +++ b/src/modules/network_element/service/ne_direct_link.go @@ -41,5 +41,6 @@ func NeState(neInfo model.NeInfo) (map[string]any, error) { "expire": resData["expiryDate"], "cpu": resData["cpuUsage"], "mem": resData["memUsage"], + "disk": resData["diskSpace"], }, nil } diff --git a/src/modules/network_element/service/ne_info.go b/src/modules/network_element/service/ne_info.go index e6e611b..6122504 100644 --- a/src/modules/network_element/service/ne_info.go +++ b/src/modules/network_element/service/ne_info.go @@ -7,6 +7,12 @@ type INeInfo interface { // SelectNeInfoByNeTypeAndNeID 通过ne_type和ne_id查询网元信息 SelectNeInfoByNeTypeAndNeID(neType, neID string) model.NeInfo + // RefreshByNeTypeAndNeID 通过ne_type和ne_id刷新redis中的缓存 + RefreshByNeTypeAndNeID(neType, neID string) model.NeInfo + + // ClearNeCacheByNeType 清除网元类型缓存 + ClearNeCacheByNeType(neType string) bool + // SelectNeList 查询网元列表 SelectNeList(ne model.NeInfo, bandStatus bool) []model.NeInfo } diff --git a/src/modules/network_element/service/ne_info.impl.go b/src/modules/network_element/service/ne_info.impl.go index 8ef7cc1..d2ae5c1 100644 --- a/src/modules/network_element/service/ne_info.impl.go +++ b/src/modules/network_element/service/ne_info.impl.go @@ -1,6 +1,11 @@ package service import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/constants/cachekey" + "ems.agt/src/framework/redis" "ems.agt/src/modules/network_element/model" "ems.agt/src/modules/network_element/repository" ) @@ -18,7 +23,50 @@ type NeInfoImpl struct { // SelectNeInfoByNeTypeAndNeID 通过ne_type和ne_id查询网元信息 func (r *NeInfoImpl) SelectNeInfoByNeTypeAndNeID(neType, neID string) model.NeInfo { - return r.neInfoRepository.SelectNeInfoByNeTypeAndNeID(neType, neID) + var neInfo model.NeInfo + key := fmt.Sprintf("%s%s.%s", cachekey.NE_KEY, neType, neID) + jsonStr, _ := redis.Get("", key) + if len(jsonStr) > 7 { + err := json.Unmarshal([]byte(jsonStr), &neInfo) + if err != nil { + neInfo = model.NeInfo{} + } + } else { + neInfo = r.neInfoRepository.SelectNeInfoByNeTypeAndNeID(neType, neID) + if neInfo.NeId == neID { + redis.Del("", key) + values, _ := json.Marshal(neInfo) + redis.Set("", key, string(values)) + } + } + return neInfo +} + +// RefreshByNeTypeAndNeID 通过ne_type和ne_id刷新redis中的缓存 +func (r *NeInfoImpl) RefreshByNeTypeAndNeID(neType, neID string) model.NeInfo { + var neInfo model.NeInfo + key := fmt.Sprintf("%s%s.%s", cachekey.NE_KEY, neType, neID) + redis.Del("", key) + neInfo = r.neInfoRepository.SelectNeInfoByNeTypeAndNeID(neType, neID) + if neInfo.NeId == neID { + values, _ := json.Marshal(neInfo) + redis.Set("", key, string(values)) + } + return neInfo +} + +// ClearNeCacheByNeType 清除网元类型缓存 +func (r *NeInfoImpl) ClearNeCacheByNeType(neType string) bool { + key := fmt.Sprintf("%s*", cachekey.NE_KEY) + if neType != "*" { + key = fmt.Sprintf("%s%s*", cachekey.NE_KEY, neType) + } + keys, err := redis.GetKeys("", key) + if err != nil { + return false + } + delOk, _ := redis.DelKeys("", keys) + return delOk } // SelectNeList 查询网元列表 diff --git a/src/modules/system/controller/sys_menu.go b/src/modules/system/controller/sys_menu.go index 256718d..3cb023b 100644 --- a/src/modules/system/controller/sys_menu.go +++ b/src/modules/system/controller/sys_menu.go @@ -34,10 +34,10 @@ type SysMenuController struct { // GET /list func (s *SysMenuController) List(c *gin.Context) { query := model.SysMenu{} - if v, ok := c.GetQuery("menuName"); ok { + if v, ok := c.GetQuery("menuName"); ok && v != "" { query.MenuName = v } - if v, ok := c.GetQuery("status"); ok { + if v, ok := c.GetQuery("status"); ok && v != "" { query.Status = v } @@ -289,10 +289,10 @@ func (s *SysMenuController) Remove(c *gin.Context) { // GET /treeSelect func (s *SysMenuController) TreeSelect(c *gin.Context) { query := model.SysMenu{} - if v, ok := c.GetQuery("menuName"); ok { + if v, ok := c.GetQuery("menuName"); ok && v != "" { query.MenuName = v } - if v, ok := c.GetQuery("status"); ok { + if v, ok := c.GetQuery("status"); ok && v != "" { query.Status = v } @@ -331,10 +331,10 @@ func (s *SysMenuController) RoleMenuTreeSelect(c *gin.Context) { } query := model.SysMenu{} - if v, ok := c.GetQuery("menuName"); ok { + if v, ok := c.GetQuery("menuName"); ok && v != "" { query.MenuName = v } - if v, ok := c.GetQuery("status"); ok { + if v, ok := c.GetQuery("status"); ok && v != "" { query.Status = v } diff --git a/src/modules/system/service/sys_menu.go b/src/modules/system/service/sys_menu.go index e4fbd5e..f7648b5 100644 --- a/src/modules/system/service/sys_menu.go +++ b/src/modules/system/service/sys_menu.go @@ -13,7 +13,7 @@ type ISysMenu interface { // SelectMenuPermsByUserId 根据用户ID查询权限 SelectMenuPermsByUserId(userId string) []string - // SelectMenuPermsByUserId 根据用户ID查询权限 + // SelectMenuTreeByUserId 根据用户ID查询权限 SelectMenuTreeByUserId(userId string) []model.SysMenu // SelectMenuTreeSelectByUserId 查询菜单树结构信息 diff --git a/src/modules/ws/controller/ws.go b/src/modules/ws/controller/ws.go new file mode 100644 index 0000000..1257f59 --- /dev/null +++ b/src/modules/ws/controller/ws.go @@ -0,0 +1,104 @@ +package controller + +import ( + "strings" + + "ems.agt/src/framework/i18n" + "ems.agt/src/framework/logger" + "ems.agt/src/framework/utils/ctx" + "ems.agt/src/framework/utils/parse" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/ws/service" + "github.com/gin-gonic/gin" +) + +// 实例化控制层 WSController 结构体 +var NewWSController = &WSController{ + wsService: service.NewWSImpl, + wsSendService: service.NewWSSendImpl, +} + +// WebSocket通信 +// +// PATH /ws +type WSController struct { + // WebSocket 服务 + wsService service.IWS + // WebSocket消息发送 服务 + wsSendService service.IWSSend +} + +// 通用 +// +// GET /?subGroupIDs=0 +func (s *WSController) WS(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + // 登录用户信息 + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + // 订阅消息组 + var subGroupIDs []string + subGroupIDStr := c.Query("subGroupID") + if subGroupIDStr != "" { + // 处理字符转id数组后去重 + ids := strings.Split(subGroupIDStr, ",") + uniqueIDs := parse.RemoveDuplicates(ids) + if len(uniqueIDs) > 0 { + subGroupIDs = uniqueIDs + } + } + + // 将 HTTP 连接升级为 WebSocket 连接 + conn := s.wsService.UpgraderWs(c.Writer, c.Request) + if conn == nil { + return + } + defer conn.Close() + + wsClient := s.wsService.NewClient(loginUser.UserID, subGroupIDs, conn) + + // 等待停止信号 + for value := range wsClient.StopChan { + logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) + return + } +} + +// 测试 +// +// GET /test?clientId=&groupID= +func (s *WSController) Test(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + // 登录用户信息 + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + errMsgArr := []string{} + + clientId := c.Query("clientId") + if clientId != "" { + err := s.wsSendService.ByClientID(c.Query("clientId"), loginUser) + if err != nil { + errMsgArr = append(errMsgArr, "clientId: "+err.Error()) + } + } + + groupID := c.Query("groupID") + if groupID != "" { + err := s.wsSendService.ByGroupID(c.Query("groupID"), loginUser) + if err != nil { + errMsgArr = append(errMsgArr, "groupID: "+err.Error()) + } + } + + c.JSON(200, result.OkData(errMsgArr)) +} diff --git a/src/modules/ws/model/net_connect.go b/src/modules/ws/model/net_connect.go new file mode 100644 index 0000000..8116c28 --- /dev/null +++ b/src/modules/ws/model/net_connect.go @@ -0,0 +1,20 @@ +package model + +import "github.com/shirou/gopsutil/v3/net" + +// NetConnectData 网络连接进程数据 +type NetConnectData struct { + Type string `json:"type"` + Status string `json:"status"` + Laddr net.Addr `json:"localaddr"` + Raddr net.Addr `json:"remoteaddr"` + PID int32 `json:"PID"` + Name string `json:"name"` +} + +// NetConnectQuery 网络连接进程查询 +type NetConnectQuery struct { + Port int32 `json:"port"` + ProcessName string `json:"processName"` + ProcessID int32 `json:"processID"` +} diff --git a/src/modules/ws/model/ps_process.go b/src/modules/ws/model/ps_process.go new file mode 100644 index 0000000..e93247a --- /dev/null +++ b/src/modules/ws/model/ps_process.go @@ -0,0 +1,38 @@ +package model + +// PsProcessData 进程数据 +type PsProcessData struct { + PID int32 `json:"PID"` + Name string `json:"name"` + PPID int32 `json:"PPID"` + Username string `json:"username"` + Status string `json:"status"` + StartTime string `json:"startTime"` + NumThreads int32 `json:"numThreads"` + NumConnections int `json:"numConnections"` + CpuPercent string `json:"cpuPercent"` + + DiskRead string `json:"diskRead"` + DiskWrite string `json:"diskWrite"` + CmdLine string `json:"cmdLine"` + + Rss string `json:"rss"` + VMS string `json:"vms"` + HWM string `json:"hwm"` + Data string `json:"data"` + Stack string `json:"stack"` + Locked string `json:"locked"` + Swap string `json:"swap"` + + CpuValue float64 `json:"cpuValue"` + RssValue uint64 `json:"rssValue"` + + Envs []string `json:"envs"` +} + +// PsProcessQuery 进程查询 +type PsProcessQuery struct { + Pid int32 `json:"pid"` + Name string `json:"name"` + Username string `json:"username"` +} diff --git a/src/modules/ws/model/ws.go b/src/modules/ws/model/ws.go new file mode 100644 index 0000000..130419c --- /dev/null +++ b/src/modules/ws/model/ws.go @@ -0,0 +1,21 @@ +package model + +import "github.com/gorilla/websocket" + +// WSClient ws客户端 +type WSClient struct { + ID string // 连接ID-随机字符串16位 + Conn *websocket.Conn // 连接实例 + LastHeartbeat int64 // 最近一次心跳消息(毫秒) + BindUid string // 绑定登录用户ID + SubGroup []string // 订阅组ID + MsgChan chan []byte // 消息通道 + StopChan chan struct{} // 停止信号-退出协程 +} + +// WSRequest ws消息接收 +type WSRequest struct { + RequestID string `json:"requestId"` // 请求ID + Type string `json:"type"` // 业务类型 + Data any `json:"data"` // 查询结构 +} diff --git a/src/modules/ws/processor/cdr_connect.go b/src/modules/ws/processor/cdr_connect.go new file mode 100644 index 0000000..607e26e --- /dev/null +++ b/src/modules/ws/processor/cdr_connect.go @@ -0,0 +1,29 @@ +package processor + +import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/vo/result" + neDataModel "ems.agt/src/modules/network_data/model" + neDataService "ems.agt/src/modules/network_data/service" +) + +// GetCDRConnect 获取CDR会话事件-IMS +func GetCDRConnect(requestID string, data any) ([]byte, error) { + msgByte, _ := json.Marshal(data) + var query neDataModel.CDREventQuery + err := json.Unmarshal(msgByte, &query) + if err != nil { + logger.Warnf("ws processor GetCDRConnect err: %s", err.Error()) + return nil, fmt.Errorf("query data structure error") + } + + dataMap := neDataService.NewCDREventImpl.SelectPage(query) + resultByte, err := json.Marshal(result.Ok(map[string]any{ + "requestId": requestID, + "data": dataMap, + })) + return resultByte, err +} diff --git a/src/modules/ws/processor/ne_state.go b/src/modules/ws/processor/ne_state.go new file mode 100644 index 0000000..38d9baf --- /dev/null +++ b/src/modules/ws/processor/ne_state.go @@ -0,0 +1,57 @@ +package processor + +import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/vo/result" + neService "ems.agt/src/modules/network_element/service" +) + +// GetNeState 获取网元服务状态 +func GetNeState(requestID string, data any) ([]byte, error) { + msgByte, _ := json.Marshal(data) + var querys struct { + NeType string `json:"neType" form:"neType" binding:"required"` + NeID string `json:"neId" form:"neId" binding:"required"` + } + err := json.Unmarshal(msgByte, &querys) + if err != nil { + logger.Warnf("ws processor GetUPFTotalFlow err: %s", err.Error()) + return nil, fmt.Errorf("query data structure error") + } + + if querys.NeType == "" || querys.NeID == "" { + return nil, fmt.Errorf("query neType any neId empty") + } + + // 查询网元获取IP + neInfo := neService.NewNeInfoImpl.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID) + if neInfo.NeId != querys.NeID || neInfo.IP == "" { + return nil, fmt.Errorf("no matching network element information found") + } + + // 网元直连 + resData, err := neService.NeState(neInfo) + if err != nil { + resultByte, err := json.Marshal(result.Ok(map[string]any{ + "requestId": requestID, + "data": map[string]any{ + "online": false, + "neId": neInfo.NeId, + "neName": neInfo.NeName, + "neType": neInfo.NeType, + "neIP": neInfo.IP, + }, + })) + return resultByte, err + } + + resData["online"] = true + resultByte, err := json.Marshal(result.Ok(map[string]any{ + "requestId": requestID, + "data": resData, + })) + return resultByte, err +} diff --git a/src/modules/ws/processor/net_connect.go b/src/modules/ws/processor/net_connect.go new file mode 100644 index 0000000..d38c3ca --- /dev/null +++ b/src/modules/ws/processor/net_connect.go @@ -0,0 +1,61 @@ +package processor + +import ( + "encoding/json" + "fmt" + "strings" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/ws/model" + "github.com/shirou/gopsutil/v3/net" + "github.com/shirou/gopsutil/v3/process" +) + +// GetNetConnections 获取网络连接进程 +func GetNetConnections(requestID string, data any) ([]byte, error) { + msgByte, _ := json.Marshal(data) + var query model.NetConnectQuery + err := json.Unmarshal(msgByte, &query) + if err != nil { + logger.Warnf("ws processor GetNetConnections err: %s", err.Error()) + return nil, fmt.Errorf("query data structure error") + } + + dataArr := []model.NetConnectData{} + for _, netType := range [...]string{"tcp", "udp"} { + connections, err := net.Connections(netType) + if err != nil { + continue + } + for _, conn := range connections { + if query.ProcessID > 0 && query.ProcessID != conn.Pid { + continue + } + proc, err := process.NewProcess(conn.Pid) + if err == nil { + name, _ := proc.Name() + if name != "" && query.ProcessName != "" && !strings.Contains(name, query.ProcessName) { + continue + } + if query.Port > 0 && query.Port != int32(conn.Laddr.Port) && query.Port != int32(conn.Raddr.Port) { + continue + } + dataArr = append(dataArr, model.NetConnectData{ + Type: netType, + Status: conn.Status, + Laddr: conn.Laddr, + Raddr: conn.Raddr, + PID: conn.Pid, + Name: name, + }) + } + } + } + + resultByte, err := json.Marshal(result.Ok(map[string]any{ + "requestId": requestID, + "data": dataArr, + })) + return resultByte, err +} diff --git a/src/modules/ws/processor/ps_process.go b/src/modules/ws/processor/ps_process.go new file mode 100644 index 0000000..03238c0 --- /dev/null +++ b/src/modules/ws/processor/ps_process.go @@ -0,0 +1,142 @@ +package processor + +import ( + "encoding/json" + "fmt" + "sort" + "strings" + "sync" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/utils/date" + "ems.agt/src/framework/utils/parse" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/ws/model" + "github.com/shirou/gopsutil/v3/process" +) + +// GetProcessData 获取进程数据 +func GetProcessData(requestID string, data any) ([]byte, error) { + msgByte, _ := json.Marshal(data) + var query model.PsProcessQuery + err := json.Unmarshal(msgByte, &query) + if err != nil { + logger.Warnf("ws processor GetNetConnections err: %s", err.Error()) + return nil, fmt.Errorf("query data structure error") + } + + var processes []*process.Process + processes, err = process.Processes() + if err != nil { + return nil, err + } + + var ( + dataArr = []model.PsProcessData{} + resultMutex sync.Mutex + wg sync.WaitGroup + numWorkers = 4 + ) + + handleData := func(proc *process.Process) { + procData := model.PsProcessData{ + PID: proc.Pid, + } + if query.Pid > 0 && query.Pid != proc.Pid { + return + } + procName, err := proc.Name() + if procName == "" || err != nil { + return + } else { + procData.Name = procName + } + if query.Name != "" && !strings.Contains(procData.Name, query.Name) { + return + } + if username, err := proc.Username(); err == nil { + procData.Username = username + } + if query.Username != "" && !strings.Contains(procData.Username, query.Username) { + return + } + + procData.PPID, _ = proc.Ppid() + statusArray, _ := proc.Status() + if len(statusArray) > 0 { + procData.Status = strings.Join(statusArray, ",") + } + createTime, procErr := proc.CreateTime() + if procErr == nil { + procData.StartTime = date.ParseDateToStr(createTime, date.YYYY_MM_DD_HH_MM_SS) + } + procData.NumThreads, _ = proc.NumThreads() + procData.CpuValue, _ = proc.CPUPercent() + procData.CpuPercent = fmt.Sprintf("%.2f", procData.CpuValue) + "%" + menInfo, procErr := proc.MemoryInfo() + if procErr == nil { + procData.Rss = parse.Bit(float64(menInfo.RSS)) + procData.Data = parse.Bit(float64(menInfo.Data)) + procData.VMS = parse.Bit(float64(menInfo.VMS)) + procData.HWM = parse.Bit(float64(menInfo.HWM)) + procData.Stack = parse.Bit(float64(menInfo.Stack)) + procData.Locked = parse.Bit(float64(menInfo.Locked)) + procData.Swap = parse.Bit(float64(menInfo.Swap)) + + procData.RssValue = menInfo.RSS + } else { + procData.Rss = "--" + procData.Data = "--" + procData.VMS = "--" + procData.HWM = "--" + procData.Stack = "--" + procData.Locked = "--" + procData.Swap = "--" + + procData.RssValue = 0 + } + ioStat, procErr := proc.IOCounters() + if procErr == nil { + procData.DiskWrite = parse.Bit(float64(ioStat.WriteBytes)) + procData.DiskRead = parse.Bit(float64(ioStat.ReadBytes)) + } else { + procData.DiskWrite = "--" + procData.DiskRead = "--" + } + procData.CmdLine, _ = proc.Cmdline() + procData.Envs, _ = proc.Environ() + + resultMutex.Lock() + dataArr = append(dataArr, procData) + resultMutex.Unlock() + } + + chunkSize := (len(processes) + numWorkers - 1) / numWorkers + for i := 0; i < numWorkers; i++ { + wg.Add(1) + start := i * chunkSize + end := (i + 1) * chunkSize + if end > len(processes) { + end = len(processes) + } + + go func(start, end int) { + defer wg.Done() + for j := start; j < end; j++ { + handleData(processes[j]) + } + }(start, end) + } + + wg.Wait() + + sort.Slice(dataArr, func(i, j int) bool { + return dataArr[i].PID < dataArr[j].PID + }) + + resultByte, err := json.Marshal(result.Ok(map[string]any{ + "requestId": requestID, + "data": dataArr, + })) + return resultByte, err +} diff --git a/src/modules/ws/processor/ue_connect.go b/src/modules/ws/processor/ue_connect.go new file mode 100644 index 0000000..e43ecc9 --- /dev/null +++ b/src/modules/ws/processor/ue_connect.go @@ -0,0 +1,29 @@ +package processor + +import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/vo/result" + neDataModel "ems.agt/src/modules/network_data/model" + neDataService "ems.agt/src/modules/network_data/service" +) + +// GetUEConnect 获取UE会话事件-AMF +func GetUEConnect(requestID string, data any) ([]byte, error) { + msgByte, _ := json.Marshal(data) + var query neDataModel.UEEventQuery + err := json.Unmarshal(msgByte, &query) + if err != nil { + logger.Warnf("ws processor GetUEConnect err: %s", err.Error()) + return nil, fmt.Errorf("query data structure error") + } + + dataMap := neDataService.NewUEEventImpl.SelectPage(query) + resultByte, err := json.Marshal(result.Ok(map[string]any{ + "requestId": requestID, + "data": dataMap, + })) + return resultByte, err +} diff --git a/src/modules/ws/processor/upf_total_flow.go b/src/modules/ws/processor/upf_total_flow.go new file mode 100644 index 0000000..5c1feb7 --- /dev/null +++ b/src/modules/ws/processor/upf_total_flow.go @@ -0,0 +1,44 @@ +package processor + +import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/vo/result" + neDataService "ems.agt/src/modules/network_data/service" + neService "ems.agt/src/modules/network_element/service" +) + +// GetUPFTotalFlow 获取UPF-总流量数 N3上行 N6下行 单位比特(bit) +func GetUPFTotalFlow(requestID string, data any) ([]byte, error) { + msgByte, _ := json.Marshal(data) + var querys struct { + NeType string `json:"neType" form:"neType" binding:"required"` + NeID string `json:"neId" form:"neId" binding:"required"` + Day int `json:"day" binding:"required"` + } + err := json.Unmarshal(msgByte, &querys) + if err != nil { + logger.Warnf("ws processor GetUPFTotalFlow err: %s", err.Error()) + return nil, fmt.Errorf("query data structure error") + } + + if querys.NeType == "" || querys.NeID == "" || querys.Day < 0 { + return nil, fmt.Errorf("query neType any neId empty or day less 0 ") + } + + // 查询网元获取IP + neInfo := neService.NewNeInfoImpl.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID) + if neInfo.NeId != querys.NeID || neInfo.IP == "" { + return nil, fmt.Errorf("no matching network element information found") + } + + dataMap := neDataService.NewPerfKPIImpl.SelectUPFTotalFlow(neInfo.NeType, neInfo.RmUID, querys.Day) + + resultByte, err := json.Marshal(result.Ok(map[string]any{ + "requestId": requestID, + "data": dataMap, + })) + return resultByte, err +} diff --git a/src/modules/ws/service/ws.go b/src/modules/ws/service/ws.go new file mode 100644 index 0000000..f404c14 --- /dev/null +++ b/src/modules/ws/service/ws.go @@ -0,0 +1,20 @@ +package service + +import ( + "net/http" + + "ems.agt/src/modules/ws/model" + "github.com/gorilla/websocket" +) + +// IWS WebSocket通信 服务层接口 +type IWS interface { + // UpgraderWs http升级ws请求 + UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn + + // NewClient 新建客户端 uid 登录用户ID + NewClient(uid string, gids []string, conn *websocket.Conn) *model.WSClient + + // CloseClient 客户端关闭 + CloseClient(clientID string) +} diff --git a/src/modules/ws/service/ws.impl.go b/src/modules/ws/service/ws.impl.go new file mode 100644 index 0000000..350979d --- /dev/null +++ b/src/modules/ws/service/ws.impl.go @@ -0,0 +1,216 @@ +package service + +import ( + "encoding/json" + "net/http" + "sync" + "time" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/utils/generate" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/ws/model" + "github.com/gorilla/websocket" +) + +var ( + // ws客户端 [clientId: client] + WsClients sync.Map + // ws用户对应的多个客户端id [uid:clientIds] + WsUsers sync.Map + // ws组对应的多个用户id [groupID:uids] + WsGroup sync.Map +) + +// 实例化服务层 WSImpl 结构体 +var NewWSImpl = &WSImpl{} + +// WSImpl WebSocket通信 服务层处理 +type WSImpl struct{} + +// UpgraderWs http升级ws请求 +func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn { + wsUpgrader := websocket.Upgrader{ + Subprotocols: []string{"omc-ws"}, + // 设置消息发送缓冲区大小(byte),如果这个值设置得太小,可能会导致服务端在发送大型消息时遇到问题 + WriteBufferSize: 1024, + // 消息包启用压缩 + EnableCompression: true, + // ws握手超时时间 + HandshakeTimeout: 5 * time.Second, + // ws握手过程中允许跨域 + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := wsUpgrader.Upgrade(w, r, nil) + if err != nil { + logger.Errorf("ws Upgrade err: %s", err.Error()) + } + return conn +} + +// NewClient 新建客户端 uid 登录用户ID +func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) *model.WSClient { + // clientID也可以用其他方式生成,只要能保证在所有服务端中都能保证唯一即可 + clientID := generate.Code(16) + + wsClient := &model.WSClient{ + ID: clientID, + Conn: conn, + LastHeartbeat: time.Now().UnixMilli(), + BindUid: uid, + SubGroup: groupIDs, + MsgChan: make(chan []byte, 100), + StopChan: make(chan struct{}, 1), // 卡死循环标记 + } + + // 存入客户端 + WsClients.Store(clientID, wsClient) + + // 存入用户持有客户端 + if uid != "" { + if v, ok := WsUsers.Load(uid); ok { + uidClientIds := v.(*[]string) + *uidClientIds = append(*uidClientIds, clientID) + } else { + WsUsers.Store(uid, &[]string{clientID}) + } + } + + // 存入用户订阅组 + if uid != "" && len(groupIDs) > 0 { + for _, groupID := range groupIDs { + if v, ok := WsGroup.Load(groupID); ok { + groupUIDs := v.(*[]string) + // 避免同组内相同用户 + hasUid := false + for _, uidv := range *groupUIDs { + if uidv == uid { + hasUid = true + break + } + } + if !hasUid { + *groupUIDs = append(*groupUIDs, uid) + } + } else { + WsGroup.Store(groupID, &[]string{uid}) + } + } + } + + go s.clientRead(wsClient) + go s.clientWrite(wsClient) + + // 发客户端id确认是否连接 + msgByte, _ := json.Marshal(result.OkData(map[string]string{ + "clientId": clientID, + })) + wsClient.MsgChan <- msgByte + + return wsClient +} + +// clientRead 客户端读取消息 +func (s *WSImpl) clientRead(wsClient *model.WSClient) { + defer func() { + if err := recover(); err != nil { + logger.Errorf("ws ReadMessage Panic Error: %v", err) + } + }() + for { + // 读取消息 + messageType, msg, err := wsClient.Conn.ReadMessage() + if err != nil { + logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error()) + s.CloseClient(wsClient.ID) + return + } + + // 文本和二进制类型,只处理文本json + if messageType == websocket.TextMessage { + var reqMsg model.WSRequest + err := json.Unmarshal(msg, &reqMsg) + // fmt.Println(messageType, string(msg)) + if err != nil { + msgByte, _ := json.Marshal(result.ErrMsg("message format not supported")) + wsClient.MsgChan <- msgByte + } else { + go NewWSReceiveImpl.Receive(wsClient, reqMsg) + } + } + } +} + +// clientWrite 客户端写入消息 +func (s *WSImpl) clientWrite(wsClient *model.WSClient) { + defer func() { + if err := recover(); err != nil { + logger.Errorf("ws WriteMessage Panic Error: %v", err) + } + }() + for msg := range wsClient.MsgChan { + // 发送消息 + err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error()) + s.CloseClient(wsClient.ID) + return + } + wsClient.LastHeartbeat = time.Now().UnixMilli() + } +} + +// CloseClient 客户端关闭 +func (s *WSImpl) CloseClient(clientID string) { + v, ok := WsClients.Load(clientID) + if !ok { + return + } + + client := v.(*model.WSClient) + defer func() { + client.Conn.WriteMessage(websocket.CloseMessage, []byte{}) + client.Conn.Close() + client.StopChan <- struct{}{} + WsClients.Delete(clientID) + }() + + // 客户端断线时自动踢出Uid绑定列表 + if client.BindUid != "" { + if clientIds, ok := WsUsers.Load(client.BindUid); ok { + uidClientIds := clientIds.(*[]string) + if len(*uidClientIds) > 0 { + tempClientIds := make([]string, 0, len(*uidClientIds)) + for _, v := range *uidClientIds { + if v != client.ID { + tempClientIds = append(tempClientIds, v) + } + } + *uidClientIds = tempClientIds + } + } + } + + // 客户端断线时自动踢出已加入的组 + if client.BindUid != "" && len(client.SubGroup) > 0 { + for _, groupID := range client.SubGroup { + uids, ok := WsGroup.Load(groupID) + if !ok { + continue + } + + groupUIDs := uids.(*[]string) + if len(*groupUIDs) > 0 { + tempUIDs := make([]string, 0, len(*groupUIDs)) + for _, v := range *groupUIDs { + if v != client.BindUid { + tempUIDs = append(tempUIDs, v) + } + } + *groupUIDs = tempUIDs + } + } + } +} diff --git a/src/modules/ws/service/ws_receive.go b/src/modules/ws/service/ws_receive.go new file mode 100644 index 0000000..0a1d8e0 --- /dev/null +++ b/src/modules/ws/service/ws_receive.go @@ -0,0 +1,9 @@ +package service + +import "ems.agt/src/modules/ws/model" + +// IWSReceive WebSocket消息接收处理 服务层接口 +type IWSReceive interface { + // Receive 接收处理 + Receive(client *model.WSClient, reqMsg model.WSRequest) error +} diff --git a/src/modules/ws/service/ws_receive.impl.go b/src/modules/ws/service/ws_receive.impl.go new file mode 100644 index 0000000..454878d --- /dev/null +++ b/src/modules/ws/service/ws_receive.impl.go @@ -0,0 +1,56 @@ +package service + +import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/logger" + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/ws/model" + "ems.agt/src/modules/ws/processor" +) + +// 实例化服务层 WSReceiveImpl 结构体 +var NewWSReceiveImpl = &WSReceiveImpl{} + +// WSReceiveImpl WebSocket消息接收处理 服务层处理 +type WSReceiveImpl struct{} + +// Receive 接收处理 +func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) { + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "ps": + resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data) + case "net": + resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data) + case "cdr": + resByte, err = processor.GetCDRConnect(reqMsg.RequestID, reqMsg.Data) + case "ue": + resByte, err = processor.GetUEConnect(reqMsg.RequestID, reqMsg.Data) + case "upf_tf": + resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data) + case "ne_state": + resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data) + default: + err = fmt.Errorf("message type not supported") + } + + if err != nil { + logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + return + } + client.MsgChan <- resByte +} diff --git a/src/modules/ws/service/ws_send.go b/src/modules/ws/service/ws_send.go new file mode 100644 index 0000000..020d022 --- /dev/null +++ b/src/modules/ws/service/ws_send.go @@ -0,0 +1,10 @@ +package service + +// IWSSend WebSocket消息发送处理 服务层接口 +type IWSSend interface { + // ByClientID 给已知客户端发消息 + ByClientID(clientID string, data any) error + + // ByGroupID 给订阅组的用户发送消息 + ByGroupID(gid string, data any) error +} diff --git a/src/modules/ws/service/ws_send.impl.go b/src/modules/ws/service/ws_send.impl.go new file mode 100644 index 0000000..8db8fcd --- /dev/null +++ b/src/modules/ws/service/ws_send.impl.go @@ -0,0 +1,85 @@ +package service + +import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/vo/result" + "ems.agt/src/modules/ws/model" +) + +// 订阅组指定编号为支持服务器向客户端主动推送数据 +const ( + // 组号-其他 + GROUP_OTHER = "0" + // 组号-指标 + GROUP_KPI = "10" + // 组号-指标UPF + GROUP_KPI_UPF = "12" + // 组号-IMS_CDR会话事件 + GROUP_IMS_CDR = "1005" + // 组号-AMF_UE会话事件 + GROUP_AMF_UE = "1010" +) + +// 实例化服务层 WSSendImpl 结构体 +var NewWSSendImpl = &WSSendImpl{} + +// IWSSend WebSocket消息发送处理 服务层处理 +type WSSendImpl struct{} + +// ByClientID 给已知客户端发消息 +func (s *WSSendImpl) ByClientID(clientID string, data any) error { + v, ok := WsClients.Load(clientID) + if !ok { + return fmt.Errorf("no fount client ID: %s", clientID) + } + + dataByte, err := json.Marshal(result.OkData(data)) + if err != nil { + return err + } + + client := v.(*model.WSClient) + if len(client.MsgChan) > 90 { + NewWSImpl.CloseClient(client.ID) + return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID) + } + client.MsgChan <- dataByte + return nil +} + +// ByGroupID 给订阅组的用户发送消息 +func (s *WSSendImpl) ByGroupID(groupID string, data any) error { + uids, ok := WsGroup.Load(groupID) + if !ok { + return fmt.Errorf("no fount Group ID: %s", groupID) + } + + groupUids := uids.(*[]string) + // 群组中没有成员 + if len(*groupUids) == 0 { + return fmt.Errorf("no members in the group") + } + + // 在群组中找到对应的 uid + for _, uid := range *groupUids { + clientIds, ok := WsUsers.Load(uid) + if !ok { + continue + } + // 在用户中找到客户端并发送 + uidClientIds := clientIds.(*[]string) + for _, clientId := range *uidClientIds { + err := s.ByClientID(clientId, map[string]any{ + "groupId": groupID, + "data": data, + }) + if err != nil { + continue + } + } + } + + return nil +} diff --git a/src/modules/ws/ws.go b/src/modules/ws/ws.go new file mode 100644 index 0000000..ee83ea3 --- /dev/null +++ b/src/modules/ws/ws.go @@ -0,0 +1,30 @@ +package ws + +import ( + "ems.agt/src/framework/logger" + "ems.agt/src/framework/middleware" + "ems.agt/src/framework/middleware/collectlogs" + "ems.agt/src/modules/ws/controller" + + "github.com/gin-gonic/gin" +) + +// 模块路由注册 +func Setup(router *gin.Engine) { + logger.Infof("开始加载 ====> ws 模块路由") + + // WebSocket 协议 + wsGroup := router.Group("/ws") + { + wsGroup.GET("", + middleware.PreAuthorize(nil), + collectlogs.OperateLog(collectlogs.OptionNew("WS Subscription", collectlogs.BUSINESS_TYPE_OTHER)), + controller.NewWSController.WS, + ) + + wsGroup.GET("/test", + middleware.PreAuthorize(nil), + controller.NewWSController.Test, + ) + } +}