From 9df27f349bb1634b663cb63a3f48ea6b2841fb9b Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Tue, 22 Apr 2025 11:39:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20ws=E6=A8=A1=E5=9D=97=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E7=A0=81=E5=8F=98=E6=9B=B4=E5=8F=8A=E4=BF=AE=E5=A4=8D=E6=96=AD?= =?UTF-8?q?=E9=93=BE=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- features/cdr/cdrevent.go | 8 ++++---- features/pm/performance.go | 8 ++++---- src/modules/ws/controller/ws.go | 5 +++-- src/modules/ws/controller/ws_redis.go | 2 +- src/modules/ws/controller/ws_view.go | 4 ++-- src/modules/ws/service/ws.go | 11 +++++++++-- src/modules/ws/service/ws_receive.go | 10 +++++++++- src/modules/ws/service/ws_send.go | 20 ++++++++++++-------- 8 files changed, 44 insertions(+), 24 deletions(-) diff --git a/features/cdr/cdrevent.go b/features/cdr/cdrevent.go index 6fd05585..b0c3880c 100644 --- a/features/cdr/cdrevent.go +++ b/features/cdr/cdrevent.go @@ -89,14 +89,14 @@ func PostCDREventFrom(w http.ResponseWriter, r *http.Request) { switch neInfo.NeType { case "IMS": if v, ok := cdrEvent.CDR["recordType"]; ok && (v == "MOC" || v == "MTSM") { - wsService.NewWSSend.ByGroupID(wsService.GROUP_IMS_CDR+neInfo.NeId, cdrEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_IMS_CDR+"_"+neInfo.NeId, cdrEvent) } case "SMF": - wsService.NewWSSend.ByGroupID(wsService.GROUP_SMF_CDR+neInfo.NeId, cdrEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_SMF_CDR+"_"+neInfo.NeId, cdrEvent) case "SMSC": - wsService.NewWSSend.ByGroupID(wsService.GROUP_SMSC_CDR+neInfo.NeId, cdrEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_SMSC_CDR+"_"+neInfo.NeId, cdrEvent) case "SGWC": - wsService.NewWSSend.ByGroupID(wsService.GROUP_SGWC_CDR+neInfo.NeId, cdrEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_SGWC_CDR+"_"+neInfo.NeId, cdrEvent) } } diff --git a/features/pm/performance.go b/features/pm/performance.go index 3b597213..f45ec95f 100644 --- a/features/pm/performance.go +++ b/features/pm/performance.go @@ -335,13 +335,13 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { if neInfo.RmUID == kpiData.RmUid { // 推送到ws订阅组 - wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) // 推送自定义KPI到ws订阅组 - wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) if neInfo.NeType == "UPF" { - wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+"_"+neInfo.NeId, kpiEvent) // 推送标识为:12_RMUID, exp: 12_4400HXUPF001, for multi-tenancy - wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+kpiReport.Task.NE.RmUID, kpiEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+"_"+kpiReport.Task.NE.RmUID, kpiEvent) // 更新UPF总流量 upValue := parse.Number(kpiEvent["UPF.03"]) downValue := parse.Number(kpiEvent["UPF.06"]) diff --git a/src/modules/ws/controller/ws.go b/src/modules/ws/controller/ws.go index ece900d7..6eca1883 100644 --- a/src/modules/ws/controller/ws.go +++ b/src/modules/ws/controller/ws.go @@ -36,8 +36,9 @@ type WSController struct { // @Tags ws // @Accept json // @Produce json -// @Param subGroupID query string false "Subscribe to message groups, multiple separated by commas" -// @Success 200 {object} object "Response Results" +// @Param subGroupID query string false "Subscribe to message groups, multiple separated by commas" +// @Param access_token query string false "Authorization tokens are used when it is inconvenient to pass parameters through the header." +// @Success 200 {object} object "Response Results" // @Summary (ws://) Generic // @Description (ws://) Generic // @Router /ws [get] diff --git a/src/modules/ws/controller/ws_redis.go b/src/modules/ws/controller/ws_redis.go index febcd3c7..0d24a188 100644 --- a/src/modules/ws/controller/ws_redis.go +++ b/src/modules/ws/controller/ws_redis.go @@ -1,9 +1,9 @@ package controller import ( + "be.ems/src/framework/database/redis" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" - "be.ems/src/framework/redis" "be.ems/src/framework/utils/ctx" "be.ems/src/framework/vo/result" neService "be.ems/src/modules/network_element/service" diff --git a/src/modules/ws/controller/ws_view.go b/src/modules/ws/controller/ws_view.go index e910b130..7a1e872d 100644 --- a/src/modules/ws/controller/ws_view.go +++ b/src/modules/ws/controller/ws_view.go @@ -21,11 +21,11 @@ import ( // @Tags ws // @Accept json // @Produce json -// @Param neType query string true "NE Type" Enums(IMS,AMF,AUSF,UDM,SMF,PCF,NSSF,NRF,UPF,MME,CBC,OMC,SGWC) +// @Param neType query string true "NE Type" Enums(IMS,AMF,AUSF,UDM,SMF,PCF,NSSF,NRF,UPF,MME,CBC,OMC,SGWC,SMSC) // @Param neId query string true "NE ID" default(001) // @Param cols query number false "Terminal line characters" default(120) // @Param rows query number false "Terminal display lines" default(40) -// @Param access_token query string true "Authorization" +// @Param access_token query string true "Authorization tokens are used when it is inconvenient to pass parameters through the header." // @Success 200 {object} object "Response Results" // @Summary (ws://) Terminal Interactive File Content Viewing // @Description (ws://) Terminal Interactive File Content Viewing diff --git a/src/modules/ws/service/ws.go b/src/modules/ws/service/ws.go index 728dd609..72e72321 100644 --- a/src/modules/ws/service/ws.go +++ b/src/modules/ws/service/ws.go @@ -10,6 +10,7 @@ import ( "be.ems/src/framework/utils/generate" "be.ems/src/framework/vo/result" "be.ems/src/modules/ws/model" + "github.com/gorilla/websocket" ) @@ -30,7 +31,7 @@ func (s *WS) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn wsUpgrader := websocket.Upgrader{ Subprotocols: []string{"omc-ws"}, // 设置消息发送缓冲区大小(byte),如果这个值设置得太小,可能会导致服务端在发送大型消息时遇到问题 - WriteBufferSize: 1024, + WriteBufferSize: 4096, // 消息包启用压缩 EnableCompression: true, // ws握手超时时间 @@ -195,11 +196,18 @@ func (s *WS) ClientWriteListen(wsClient *model.WSClient) { wsClient.MsgChan <- msgByte // 消息发送监听 for msg := range wsClient.MsgChan { + // PONG句柄 + if string(msg) == "ws:pong" { + wsClient.LastHeartbeat = time.Now().UnixMilli() + wsClient.Conn.WriteMessage(websocket.PongMessage, []byte{}) + continue + } // 关闭句柄 if string(msg) == "ws:close" { wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } + // 发送消息 err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg) if err != nil { @@ -207,6 +215,5 @@ func (s *WS) ClientWriteListen(wsClient *model.WSClient) { s.ClientClose(wsClient.ID) return } - wsClient.LastHeartbeat = time.Now().UnixMilli() } } diff --git a/src/modules/ws/service/ws_receive.go b/src/modules/ws/service/ws_receive.go index 1b3b516d..c47ec446 100644 --- a/src/modules/ws/service/ws_receive.go +++ b/src/modules/ws/service/ws_receive.go @@ -8,8 +8,8 @@ import ( "strings" "time" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" - "be.ems/src/framework/redis" "be.ems/src/framework/telnet" "be.ems/src/framework/utils/ssh" "be.ems/src/framework/vo/result" @@ -51,6 +51,14 @@ func (s *WSReceive) Commont(client *model.WSClient, reqMsg model.WSRequest) { case "close": s.close(client) return + case "ping", "PING": + resByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": reqMsg.RequestID, + "data": "PONG", + })) + client.MsgChan <- resByte + client.MsgChan <- []byte("ws:pong") + return case "ps": resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data) case "net": diff --git a/src/modules/ws/service/ws_send.go b/src/modules/ws/service/ws_send.go index eba535b4..c32a7e04 100644 --- a/src/modules/ws/service/ws_send.go +++ b/src/modules/ws/service/ws_send.go @@ -13,27 +13,31 @@ const ( // 组号-其他 GROUP_OTHER = "0" // 组号-跟踪任务网元数据变更 2_traceId - GROUP_TRACE_NE = "2_" + GROUP_TRACE_NE = "2" // 组号-信令跟踪Packet 4_taskNo - GROUP_TRACE_PACKET = "4_" + GROUP_TRACE_PACKET = "4" // 组号-指标通用 10_neType_neId - GROUP_KPI = "10_" + GROUP_KPI = "10" // 组号-指标UPF 12_neId GROUP_KPI_UPF = "12_" // 组号-自定义KPI指标 20_neType_neId - GROUP_KPI_C = "20_" + GROUP_KPI_C = "20" // 组号-IMS_CDR会话事件 1005_neId - GROUP_IMS_CDR = "1005_" + GROUP_IMS_CDR = "1005" // 组号-SMF_CDR会话事件 1006_neId - GROUP_SMF_CDR = "1006_" + GROUP_SMF_CDR = "1006" // 组号-SMSC_CDR会话事件 1007_neId - GROUP_SMSC_CDR = "1007_" + GROUP_SMSC_CDR = "1007" // 组号-SGWC_CDR会话事件 1008_neId - GROUP_SGWC_CDR = "1008_" + GROUP_SGWC_CDR = "1008" // 组号-AMF_UE会话事件 1010_neId GROUP_AMF_UE = "1010" // 组号-MME_UE会话事件 1011_neId GROUP_MME_UE = "1011" + // 组号-告警 2000_neType_neId + GROUP_ALARM = "2000" + // 组号-告警事件 2000_neType_neId + GROUP_ALARM_EVENT = "2002" ) // 实例化服务层 WSSend 结构体