From 67caba43799fb4550e7fda658751bde2fa1646f0 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Tue, 24 Sep 2024 11:51:46 +0800 Subject: [PATCH] =?UTF-8?q?style:=20=E5=8F=98=E6=9B=B4ws=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E5=AE=9E=E4=BE=8B=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- features/cdr/cdrevent.go | 6 +- features/event/event.go | 4 +- features/pm/performance.go | 10 +- src/modules/trace/service/trace_task.go | 4 +- src/modules/ws/controller/ws.go | 8 +- src/modules/ws/service/ws.go | 226 ++++++++++++++++-- src/modules/ws/service/ws.impl.go | 220 ------------------ src/modules/ws/service/ws_receive.go | 264 +++++++++++++++++++++- src/modules/ws/service/ws_receive.impl.go | 258 --------------------- src/modules/ws/service/ws_send.go | 89 +++++++- src/modules/ws/service/ws_send.impl.go | 87 ------- 11 files changed, 558 insertions(+), 618 deletions(-) delete mode 100644 src/modules/ws/service/ws.impl.go delete mode 100644 src/modules/ws/service/ws_receive.impl.go delete mode 100644 src/modules/ws/service/ws_send.impl.go diff --git a/features/cdr/cdrevent.go b/features/cdr/cdrevent.go index 9d73a265..6361fa05 100644 --- a/features/cdr/cdrevent.go +++ b/features/cdr/cdrevent.go @@ -62,12 +62,12 @@ func PostCDREventFrom(w http.ResponseWriter, r *http.Request) { switch neInfo.NeType { case "IMS": if v, ok := cdrEvent.CDR["recordType"]; ok && (v == "MOC" || v == "MTSM") { - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_IMS_CDR+neInfo.NeId, cdrEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_IMS_CDR+neInfo.NeId, cdrEvent) } case "SMF": - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_SMF_CDR+neInfo.NeId, cdrEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_SMF_CDR+neInfo.NeId, cdrEvent) case "SMSC": - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_SMSC_CDR+neInfo.NeId, cdrEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_SMSC_CDR+neInfo.NeId, cdrEvent) } } diff --git a/features/event/event.go b/features/event/event.go index c04cd44b..faad0e11 100644 --- a/features/event/event.go +++ b/features/event/event.go @@ -74,7 +74,7 @@ func PostUEEventFromAMF(c *gin.Context) { // AMF没有RmUID,直接推送 // 推送到ws订阅组 - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_AMF_UE, ueEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_AMF_UE, ueEvent) services.ResponseStatusOK204NoContent(c.Writer) } @@ -103,7 +103,7 @@ func PostUEEvent(w http.ResponseWriter, r *http.Request) { if neInfo.RmUID == ueEvent.RmUID { // 推送到ws订阅组 if ueEvent.NeType == "MME" { - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_MME_UE+neInfo.NeId, ueEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_MME_UE+neInfo.NeId, ueEvent) } } diff --git a/features/pm/performance.go b/features/pm/performance.go index ad99cc26..72fea0ea 100644 --- a/features/pm/performance.go +++ b/features/pm/performance.go @@ -324,11 +324,11 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) { if neInfo.RmUID == kpiData.RmUid { // 推送到ws订阅组 - wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), kpiEvent) // 推送自定义KPI到ws订阅组 - wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), kpiCEvent) if neInfo.NeType == "UPF" { - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF+neInfo.NeId, kpiEvent) } } @@ -456,9 +456,9 @@ func PostGoldKPIFromNF(w http.ResponseWriter, r *http.Request) { } // 推送到ws订阅组 - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI, kpiEvent) if goldKpi.NEType == "UPF" { - wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI_UPF, kpiEvent) + wsService.NewWSSend.ByGroupID(wsService.GROUP_KPI_UPF, kpiEvent) } services.ResponseStatusOK204NoContent(w) diff --git a/src/modules/trace/service/trace_task.go b/src/modules/trace/service/trace_task.go index 071f8b28..fbfb1643 100644 --- a/src/modules/trace/service/trace_task.go +++ b/src/modules/trace/service/trace_task.go @@ -96,7 +96,7 @@ func (r *TraceTask) CreateUDP() error { // 推送文件 if v, ok := mData["pcapFile"]; ok && v != "" { logger.Infof("pcapFile: %s", v) - wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId) } }) @@ -145,7 +145,7 @@ func (r *TraceTask) CreateUDP() error { // 推送文件 if v, ok := mData["pcapFile"]; ok && v != "" { logger.Infof("pcapFile: %s", v) - wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId) + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId) } }) return nil diff --git a/src/modules/ws/controller/ws.go b/src/modules/ws/controller/ws.go index 1b78ed73..c3da955a 100644 --- a/src/modules/ws/controller/ws.go +++ b/src/modules/ws/controller/ws.go @@ -21,8 +21,8 @@ import ( // NewWSController 实例化控制层 WSController 结构体 var NewWSController = &WSController{ - wsService: service.NewWSImpl, - wsSendService: service.NewWSSendImpl, + wsService: service.NewWS, + wsSendService: service.NewWSSend, neHostService: neService.NewNeHostImpl, neInfoService: neService.NewNeInfoImpl, } @@ -32,9 +32,9 @@ var NewWSController = &WSController{ // PATH /ws type WSController struct { // WebSocket 服务 - wsService service.IWS + wsService *service.WS // WebSocket消息发送 服务 - wsSendService service.IWSSend + wsSendService *service.WSSend // 网元主机连接服务 neHostService neService.INeHost // 网元信息服务 diff --git a/src/modules/ws/service/ws.go b/src/modules/ws/service/ws.go index 0026b589..58f2f870 100644 --- a/src/modules/ws/service/ws.go +++ b/src/modules/ws/service/ws.go @@ -1,32 +1,220 @@ package service import ( + "encoding/json" "net/http" + "sync" + "time" + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/generate" + "be.ems/src/framework/vo/result" "be.ems/src/modules/ws/model" "github.com/gorilla/websocket" ) -// IWS WebSocket通信 服务层接口 -type IWS interface { - // UpgraderWs http升级ws请求 - UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn +var ( + wsClients sync.Map // ws客户端 [clientId: client] + wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds] + wsGroup sync.Map // ws组对应的多个客户端id [groupId:clientIds] +) - // ClientCreate 客户端新建 - // - // uid 登录用户ID - // groupIDs 用户订阅组 - // conn ws连接实例 - // childConn 子连接实例 - ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient +// NewWS 实例化服务层 WS 结构体 +var NewWS = &WS{} - // ClientClose 客户端关闭 - ClientClose(clientID string) +// WS WebSocket通信 服务层处理 +type WS struct{} - // ClientReadListen 客户端读取消息监听 - // receiveType 根据接收类型进行消息处理 - ClientReadListen(wsClient *model.WSClient, receiveType int) - - // ClientWriteListen 客户端写入消息监听 - ClientWriteListen(wsClient *model.WSClient) +// UpgraderWs http升级ws请求 +func (s *WS) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn { + wsUpgrader := websocket.Upgrader{ + Subprotocols: []string{"omc-ws"}, + // 设置消息发送缓冲区大小(byte),如果这个值设置得太小,可能会导致服务端在发送大型消息时遇到问题 + WriteBufferSize: 1024, + // 消息包启用压缩 + EnableCompression: true, + // ws握手超时时间 + HandshakeTimeout: 5 * time.Second, + // ws握手过程中允许跨域 + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := wsUpgrader.Upgrade(w, r, nil) + if err != nil { + logger.Errorf("ws Upgrade err: %s", err.Error()) + } + return conn +} + +// ClientCreate 客户端新建 +// +// uid 登录用户ID +// groupIDs 用户订阅组 +// conn ws连接实例 +// childConn 子连接实例 +func (s *WS) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient { + // clientID也可以用其他方式生成,只要能保证在所有服务端中都能保证唯一即可 + clientID := generate.Code(16) + + wsClient := &model.WSClient{ + ID: clientID, + Conn: conn, + LastHeartbeat: time.Now().UnixMilli(), + BindUid: uid, + SubGroup: groupIDs, + MsgChan: make(chan []byte, 100), + StopChan: make(chan struct{}, 1), // 卡死循环标记 + ChildConn: childConn, + } + + // 存入客户端 + wsClients.Store(clientID, wsClient) + + // 存入用户持有客户端 + if uid != "" { + if v, ok := wsUsers.Load(uid); ok { + uidClientIds := v.(*[]string) + *uidClientIds = append(*uidClientIds, clientID) + } else { + wsUsers.Store(uid, &[]string{clientID}) + } + } + + // 存入用户订阅组 + if uid != "" && len(groupIDs) > 0 { + for _, groupID := range groupIDs { + if v, ok := wsGroup.Load(groupID); ok { + groupClientIds := v.(*[]string) + *groupClientIds = append(*groupClientIds, clientID) + } else { + wsGroup.Store(groupID, &[]string{clientID}) + } + } + } + + return wsClient +} + +// ClientClose 客户端关闭 +func (s *WS) ClientClose(clientID string) { + v, ok := wsClients.Load(clientID) + if !ok { + return + } + + client := v.(*model.WSClient) + defer func() { + client.MsgChan <- []byte("ws:close") + client.StopChan <- struct{}{} + client.Conn.Close() + wsClients.Delete(clientID) + }() + + // 客户端断线时自动踢出Uid绑定列表 + if client.BindUid != "" { + if v, ok := wsUsers.Load(client.BindUid); ok { + uidClientIds := v.(*[]string) + if len(*uidClientIds) > 0 { + tempClientIds := make([]string, 0, len(*uidClientIds)) + for _, v := range *uidClientIds { + if v != client.ID { + tempClientIds = append(tempClientIds, v) + } + } + *uidClientIds = tempClientIds + } + } + } + + // 客户端断线时自动踢出已加入的组 + if len(client.SubGroup) > 0 { + for _, groupID := range client.SubGroup { + v, ok := wsGroup.Load(groupID) + if !ok { + continue + } + groupClientIds := v.(*[]string) + if len(*groupClientIds) > 0 { + tempClientIds := make([]string, 0, len(*groupClientIds)) + for _, v := range *groupClientIds { + if v != client.ID { + tempClientIds = append(tempClientIds, v) + } + } + *groupClientIds = tempClientIds + } + } + } +} + +// ClientReadListen 客户端读取消息监听 +// receiveType 根据接收类型进行消息处理 +func (s *WS) ClientReadListen(wsClient *model.WSClient, receiveType int) { + defer func() { + if err := recover(); err != nil { + logger.Errorf("ws ReadMessage Panic Error: %v", err) + } + }() + for { + // 读取消息 + messageType, msg, err := wsClient.Conn.ReadMessage() + if err != nil { + logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error()) + s.ClientClose(wsClient.ID) + return + } + // fmt.Println(messageType, string(msg)) + + // 文本 只处理文本json + if messageType == websocket.TextMessage { + var reqMsg model.WSRequest + if err := json.Unmarshal(msg, &reqMsg); err != nil { + msgByte, _ := json.Marshal(result.ErrMsg("message format json error")) + wsClient.MsgChan <- msgByte + continue + } + // 接收器处理 + switch receiveType { + case ReceiveCommont: + go NewWSReceive.Commont(wsClient, reqMsg) + case ReceiveShell: + go NewWSReceive.Shell(wsClient, reqMsg) + case ReceiveShellView: + go NewWSReceive.ShellView(wsClient, reqMsg) + case ReceiveTelnet: + go NewWSReceive.Telnet(wsClient, reqMsg) + } + } + } +} + +// ClientWriteListen 客户端写入消息监听 +func (s *WS) ClientWriteListen(wsClient *model.WSClient) { + defer func() { + if err := recover(); err != nil { + logger.Errorf("ws WriteMessage Panic Error: %v", err) + } + }() + // 发客户端id确认是否连接 + msgByte, _ := json.Marshal(result.OkData(map[string]string{ + "clientId": wsClient.ID, + })) + wsClient.MsgChan <- msgByte + // 消息发送监听 + for msg := range wsClient.MsgChan { + // 关闭句柄 + if string(msg) == "ws:close" { + wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + // 发送消息 + err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg) + if err != nil { + logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error()) + s.ClientClose(wsClient.ID) + return + } + wsClient.LastHeartbeat = time.Now().UnixMilli() + } } diff --git a/src/modules/ws/service/ws.impl.go b/src/modules/ws/service/ws.impl.go deleted file mode 100644 index 33307ba2..00000000 --- a/src/modules/ws/service/ws.impl.go +++ /dev/null @@ -1,220 +0,0 @@ -package service - -import ( - "encoding/json" - "net/http" - "sync" - "time" - - "be.ems/src/framework/logger" - "be.ems/src/framework/utils/generate" - "be.ems/src/framework/vo/result" - "be.ems/src/modules/ws/model" - "github.com/gorilla/websocket" -) - -var ( - wsClients sync.Map // ws客户端 [clientId: client] - wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds] - wsGroup sync.Map // ws组对应的多个客户端id [groupId:clientIds] -) - -// NewWSImpl 实例化服务层 WSImpl 结构体 -var NewWSImpl = &WSImpl{} - -// WSImpl WebSocket通信 服务层处理 -type WSImpl struct{} - -// UpgraderWs http升级ws请求 -func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn { - wsUpgrader := websocket.Upgrader{ - Subprotocols: []string{"omc-ws"}, - // 设置消息发送缓冲区大小(byte),如果这个值设置得太小,可能会导致服务端在发送大型消息时遇到问题 - WriteBufferSize: 1024, - // 消息包启用压缩 - EnableCompression: true, - // ws握手超时时间 - HandshakeTimeout: 5 * time.Second, - // ws握手过程中允许跨域 - CheckOrigin: func(r *http.Request) bool { - return true - }, - } - conn, err := wsUpgrader.Upgrade(w, r, nil) - if err != nil { - logger.Errorf("ws Upgrade err: %s", err.Error()) - } - return conn -} - -// ClientCreate 客户端新建 -// -// uid 登录用户ID -// groupIDs 用户订阅组 -// conn ws连接实例 -// childConn 子连接实例 -func (s *WSImpl) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient { - // clientID也可以用其他方式生成,只要能保证在所有服务端中都能保证唯一即可 - clientID := generate.Code(16) - - wsClient := &model.WSClient{ - ID: clientID, - Conn: conn, - LastHeartbeat: time.Now().UnixMilli(), - BindUid: uid, - SubGroup: groupIDs, - MsgChan: make(chan []byte, 100), - StopChan: make(chan struct{}, 1), // 卡死循环标记 - ChildConn: childConn, - } - - // 存入客户端 - wsClients.Store(clientID, wsClient) - - // 存入用户持有客户端 - if uid != "" { - if v, ok := wsUsers.Load(uid); ok { - uidClientIds := v.(*[]string) - *uidClientIds = append(*uidClientIds, clientID) - } else { - wsUsers.Store(uid, &[]string{clientID}) - } - } - - // 存入用户订阅组 - if uid != "" && len(groupIDs) > 0 { - for _, groupID := range groupIDs { - if v, ok := wsGroup.Load(groupID); ok { - groupClientIds := v.(*[]string) - *groupClientIds = append(*groupClientIds, clientID) - } else { - wsGroup.Store(groupID, &[]string{clientID}) - } - } - } - - return wsClient -} - -// ClientClose 客户端关闭 -func (s *WSImpl) ClientClose(clientID string) { - v, ok := wsClients.Load(clientID) - if !ok { - return - } - - client := v.(*model.WSClient) - defer func() { - client.MsgChan <- []byte("ws:close") - client.StopChan <- struct{}{} - client.Conn.Close() - wsClients.Delete(clientID) - }() - - // 客户端断线时自动踢出Uid绑定列表 - if client.BindUid != "" { - if v, ok := wsUsers.Load(client.BindUid); ok { - uidClientIds := v.(*[]string) - if len(*uidClientIds) > 0 { - tempClientIds := make([]string, 0, len(*uidClientIds)) - for _, v := range *uidClientIds { - if v != client.ID { - tempClientIds = append(tempClientIds, v) - } - } - *uidClientIds = tempClientIds - } - } - } - - // 客户端断线时自动踢出已加入的组 - if len(client.SubGroup) > 0 { - for _, groupID := range client.SubGroup { - v, ok := wsGroup.Load(groupID) - if !ok { - continue - } - groupClientIds := v.(*[]string) - if len(*groupClientIds) > 0 { - tempClientIds := make([]string, 0, len(*groupClientIds)) - for _, v := range *groupClientIds { - if v != client.ID { - tempClientIds = append(tempClientIds, v) - } - } - *groupClientIds = tempClientIds - } - } - } -} - -// ClientReadListen 客户端读取消息监听 -// receiveType 根据接收类型进行消息处理 -func (s *WSImpl) ClientReadListen(wsClient *model.WSClient, receiveType int) { - defer func() { - if err := recover(); err != nil { - logger.Errorf("ws ReadMessage Panic Error: %v", err) - } - }() - for { - // 读取消息 - messageType, msg, err := wsClient.Conn.ReadMessage() - if err != nil { - logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error()) - s.ClientClose(wsClient.ID) - return - } - // fmt.Println(messageType, string(msg)) - - // 文本 只处理文本json - if messageType == websocket.TextMessage { - var reqMsg model.WSRequest - if err := json.Unmarshal(msg, &reqMsg); err != nil { - msgByte, _ := json.Marshal(result.ErrMsg("message format json error")) - wsClient.MsgChan <- msgByte - continue - } - // 接收器处理 - switch receiveType { - case ReceiveCommont: - go NewWSReceiveImpl.Commont(wsClient, reqMsg) - case ReceiveShell: - go NewWSReceiveImpl.Shell(wsClient, reqMsg) - case ReceiveShellView: - go NewWSReceiveImpl.ShellView(wsClient, reqMsg) - case ReceiveTelnet: - go NewWSReceiveImpl.Telnet(wsClient, reqMsg) - } - } - } -} - -// ClientWriteListen 客户端写入消息监听 -func (s *WSImpl) ClientWriteListen(wsClient *model.WSClient) { - defer func() { - if err := recover(); err != nil { - logger.Errorf("ws WriteMessage Panic Error: %v", err) - } - }() - // 发客户端id确认是否连接 - msgByte, _ := json.Marshal(result.OkData(map[string]string{ - "clientId": wsClient.ID, - })) - wsClient.MsgChan <- msgByte - // 消息发送监听 - for msg := range wsClient.MsgChan { - // 关闭句柄 - if string(msg) == "ws:close" { - wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } - // 发送消息 - err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg) - if err != nil { - logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error()) - s.ClientClose(wsClient.ID) - return - } - wsClient.LastHeartbeat = time.Now().UnixMilli() - } -} diff --git a/src/modules/ws/service/ws_receive.go b/src/modules/ws/service/ws_receive.go index 0372c39c..68ec9bdd 100644 --- a/src/modules/ws/service/ws_receive.go +++ b/src/modules/ws/service/ws_receive.go @@ -1,6 +1,18 @@ package service -import "be.ems/src/modules/ws/model" +import ( + "encoding/json" + "fmt" + "io" + "time" + + "be.ems/src/framework/logger" + "be.ems/src/framework/telnet" + "be.ems/src/framework/utils/ssh" + "be.ems/src/framework/vo/result" + "be.ems/src/modules/ws/model" + "be.ems/src/modules/ws/processor" +) const ( ReceiveCommont = iota // Commont 接收通用业务处理 @@ -9,17 +21,245 @@ const ( ReceiveTelnet // Telnet 接收终端交互业务处理 ) -// IWSReceive WebSocket消息接收处理 服务层接口 -type IWSReceive interface { - // Commont 接收通用业务处理 - Commont(client *model.WSClient, reqMsg model.WSRequest) +// 实例化服务层 WSReceive 结构体 +var NewWSReceive = &WSReceive{} - // Shell 接收终端交互业务处理 - Shell(client *model.WSClient, reqMsg model.WSRequest) +// WSReceive WebSocket消息接收处理 服务层处理 +type WSReceive struct{} - // ShellView 接收查看文件终端交互业务处理 - ShellView(client *model.WSClient, reqMsg model.WSRequest) - - // Telnet 接收终端交互业务处理 - Telnet(client *model.WSClient, reqMsg model.WSRequest) +// close 关闭服务连接 +func (s *WSReceive) close(client *model.WSClient) { + // 主动关闭 + resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) + client.MsgChan <- resultByte + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + NewWS.ClientClose(client.ID) +} + +// Commont 接收通用业务处理 +func (s *WSReceive) Commont(client *model.WSClient, reqMsg model.WSRequest) { + // 必传requestId确认消息 + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "close": + s.close(client) + return + case "ps": + resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data) + case "net": + resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data) + case "ims_cdr": + resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data) + case "smf_cdr": + resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data) + case "smsc_cdr": + resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data) + case "amf_ue": + resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data) + case "mme_ue": + resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data) + case "upf_tf": + resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data) + case "ne_state": + resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data) + default: + err = fmt.Errorf("message type %s not supported", reqMsg.Type) + } + + if err != nil { + logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + return + } + if len(resByte) > 0 { + client.MsgChan <- resByte + } +} + +// Shell 接收终端交互业务处理 +func (s *WSReceive) Shell(client *model.WSClient, reqMsg model.WSRequest) { + // 必传requestId确认消息 + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "close": + s.close(client) + return + case "ssh": + // SSH会话消息接收写入会话 + command := reqMsg.Data.(string) + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write(command) + case "ssh_resize": + // SSH会话窗口重置 + msgByte, _ := json.Marshal(reqMsg.Data) + var data struct { + Cols int `json:"cols"` + Rows int `json:"rows"` + } + err = json.Unmarshal(msgByte, &data) + if err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) + } + default: + err = fmt.Errorf("message type %s not supported", reqMsg.Type) + } + + if err != nil { + logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + if err == io.EOF { + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + } + return + } + if len(resByte) > 0 { + client.MsgChan <- resByte + } +} + +// ShellView 接收查看文件终端交互业务处理 +func (s *WSReceive) ShellView(client *model.WSClient, reqMsg model.WSRequest) { + // 必传requestId确认消息 + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Infof("ws ShellView UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "close": + s.close(client) + return + case "cat", "tail": + var command string + if reqMsg.Type == "cat" { + command, err = processor.ParseCat(reqMsg.Data) + } + if reqMsg.Type == "tail" { + command, err = processor.ParseTail(reqMsg.Data) + } + if command != "" && err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write(command) + } + case "ctrl-c": + // 模拟按下 Ctrl+C + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write("\u0003\n") + case "resize": + // 会话窗口重置 + msgByte, _ := json.Marshal(reqMsg.Data) + var data struct { + Cols int `json:"cols"` + Rows int `json:"rows"` + } + err = json.Unmarshal(msgByte, &data) + if err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) + } + default: + err = fmt.Errorf("message type %s not supported", reqMsg.Type) + } + + if err != nil { + logger.Warnf("ws ShellView UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + if err == io.EOF { + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + } + return + } + if len(resByte) > 0 { + client.MsgChan <- resByte + } +} + +// Telnet 接收终端交互业务处理 +func (s *WSReceive) Telnet(client *model.WSClient, reqMsg model.WSRequest) { + // 必传requestId确认消息 + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "close": + s.close(client) + return + case "telnet": + // Telnet会话消息接收写入会话 + command := reqMsg.Data.(string) + telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession) + _, err = telnetClientSession.Write(command) + case "telnet_resize": + // Telnet会话窗口重置 + msgByte, _ := json.Marshal(reqMsg.Data) + var data struct { + Cols int `json:"cols"` + Rows int `json:"rows"` + } + err = json.Unmarshal(msgByte, &data) + if err == nil { + // telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession) + // _ = telnetClientSession.WindowChange(data.Rows, data.Cols) + } + default: + err = fmt.Errorf("message type %s not supported", reqMsg.Type) + } + + if err != nil { + logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + if err == io.EOF { + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + } + return + } + if len(resByte) > 0 { + client.MsgChan <- resByte + } } diff --git a/src/modules/ws/service/ws_receive.impl.go b/src/modules/ws/service/ws_receive.impl.go deleted file mode 100644 index 918efcc5..00000000 --- a/src/modules/ws/service/ws_receive.impl.go +++ /dev/null @@ -1,258 +0,0 @@ -package service - -import ( - "encoding/json" - "fmt" - "io" - "time" - - "be.ems/src/framework/logger" - "be.ems/src/framework/telnet" - "be.ems/src/framework/utils/ssh" - "be.ems/src/framework/vo/result" - "be.ems/src/modules/ws/model" - "be.ems/src/modules/ws/processor" -) - -// 实例化服务层 WSReceiveImpl 结构体 -var NewWSReceiveImpl = &WSReceiveImpl{} - -// WSReceiveImpl WebSocket消息接收处理 服务层处理 -type WSReceiveImpl struct{} - -// Commont 接收通用业务处理 -func (s *WSReceiveImpl) close(client *model.WSClient) { - // 主动关闭 - resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) - client.MsgChan <- resultByte - // 等待1s后关闭连接 - time.Sleep(1 * time.Second) - NewWSImpl.ClientClose(client.ID) -} - -// Commont 接收通用业务处理 -func (s *WSReceiveImpl) Commont(client *model.WSClient, reqMsg model.WSRequest) { - // 必传requestId确认消息 - if reqMsg.RequestID == "" { - msg := "message requestId is required" - logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg) - msgByte, _ := json.Marshal(result.ErrMsg(msg)) - client.MsgChan <- msgByte - return - } - - var resByte []byte - var err error - - switch reqMsg.Type { - case "close": - s.close(client) - return - case "ps": - resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data) - case "net": - resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data) - case "ims_cdr": - resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data) - case "smf_cdr": - resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data) - case "smsc_cdr": - resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data) - case "amf_ue": - resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data) - case "mme_ue": - resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data) - case "upf_tf": - resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data) - case "ne_state": - resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data) - default: - err = fmt.Errorf("message type %s not supported", reqMsg.Type) - } - - if err != nil { - logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error()) - msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) - client.MsgChan <- msgByte - return - } - if len(resByte) > 0 { - client.MsgChan <- resByte - } -} - -// Shell 接收终端交互业务处理 -func (s *WSReceiveImpl) Shell(client *model.WSClient, reqMsg model.WSRequest) { - // 必传requestId确认消息 - if reqMsg.RequestID == "" { - msg := "message requestId is required" - logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg) - msgByte, _ := json.Marshal(result.ErrMsg(msg)) - client.MsgChan <- msgByte - return - } - - var resByte []byte - var err error - - switch reqMsg.Type { - case "close": - s.close(client) - return - case "ssh": - // SSH会话消息接收写入会话 - command := reqMsg.Data.(string) - sshClientSession := client.ChildConn.(*ssh.SSHClientSession) - _, err = sshClientSession.Write(command) - case "ssh_resize": - // SSH会话窗口重置 - msgByte, _ := json.Marshal(reqMsg.Data) - var data struct { - Cols int `json:"cols"` - Rows int `json:"rows"` - } - err = json.Unmarshal(msgByte, &data) - if err == nil { - sshClientSession := client.ChildConn.(*ssh.SSHClientSession) - err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) - } - default: - err = fmt.Errorf("message type %s not supported", reqMsg.Type) - } - - if err != nil { - logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error()) - msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) - client.MsgChan <- msgByte - if err == io.EOF { - // 等待1s后关闭连接 - time.Sleep(1 * time.Second) - client.StopChan <- struct{}{} - } - return - } - if len(resByte) > 0 { - client.MsgChan <- resByte - } -} - -// ShellView 接收查看文件终端交互业务处理 -func (s *WSReceiveImpl) ShellView(client *model.WSClient, reqMsg model.WSRequest) { - // 必传requestId确认消息 - if reqMsg.RequestID == "" { - msg := "message requestId is required" - logger.Infof("ws ShellView UID %s err: %s", client.BindUid, msg) - msgByte, _ := json.Marshal(result.ErrMsg(msg)) - client.MsgChan <- msgByte - return - } - - var resByte []byte - var err error - - switch reqMsg.Type { - case "close": - s.close(client) - return - case "cat", "tail": - var command string - if reqMsg.Type == "cat" { - command, err = processor.ParseCat(reqMsg.Data) - } - if reqMsg.Type == "tail" { - command, err = processor.ParseTail(reqMsg.Data) - } - if command != "" && err == nil { - sshClientSession := client.ChildConn.(*ssh.SSHClientSession) - _, err = sshClientSession.Write(command) - } - case "ctrl-c": - // 模拟按下 Ctrl+C - sshClientSession := client.ChildConn.(*ssh.SSHClientSession) - _, err = sshClientSession.Write("\u0003\n") - case "resize": - // 会话窗口重置 - msgByte, _ := json.Marshal(reqMsg.Data) - var data struct { - Cols int `json:"cols"` - Rows int `json:"rows"` - } - err = json.Unmarshal(msgByte, &data) - if err == nil { - sshClientSession := client.ChildConn.(*ssh.SSHClientSession) - err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) - } - default: - err = fmt.Errorf("message type %s not supported", reqMsg.Type) - } - - if err != nil { - logger.Warnf("ws ShellView UID %s err: %s", client.BindUid, err.Error()) - msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) - client.MsgChan <- msgByte - if err == io.EOF { - // 等待1s后关闭连接 - time.Sleep(1 * time.Second) - client.StopChan <- struct{}{} - } - return - } - if len(resByte) > 0 { - client.MsgChan <- resByte - } -} - -// Telnet 接收终端交互业务处理 -func (s *WSReceiveImpl) Telnet(client *model.WSClient, reqMsg model.WSRequest) { - // 必传requestId确认消息 - if reqMsg.RequestID == "" { - msg := "message requestId is required" - logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg) - msgByte, _ := json.Marshal(result.ErrMsg(msg)) - client.MsgChan <- msgByte - return - } - - var resByte []byte - var err error - - switch reqMsg.Type { - case "close": - s.close(client) - return - case "telnet": - // Telnet会话消息接收写入会话 - command := reqMsg.Data.(string) - telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession) - _, err = telnetClientSession.Write(command) - case "telnet_resize": - // Telnet会话窗口重置 - msgByte, _ := json.Marshal(reqMsg.Data) - var data struct { - Cols int `json:"cols"` - Rows int `json:"rows"` - } - err = json.Unmarshal(msgByte, &data) - if err == nil { - // telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession) - // _ = telnetClientSession.WindowChange(data.Rows, data.Cols) - } - default: - err = fmt.Errorf("message type %s not supported", reqMsg.Type) - } - - if err != nil { - logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error()) - msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) - client.MsgChan <- msgByte - if err == io.EOF { - // 等待1s后关闭连接 - time.Sleep(1 * time.Second) - client.StopChan <- struct{}{} - } - return - } - if len(resByte) > 0 { - client.MsgChan <- resByte - } -} diff --git a/src/modules/ws/service/ws_send.go b/src/modules/ws/service/ws_send.go index 91e48b14..04fff1f9 100644 --- a/src/modules/ws/service/ws_send.go +++ b/src/modules/ws/service/ws_send.go @@ -1,10 +1,87 @@ package service -// IWSSend WebSocket消息发送处理 服务层接口 -type IWSSend interface { - // ByClientID 给已知客户端发消息 - ByClientID(clientID string, data any) error +import ( + "encoding/json" + "fmt" - // ByGroupID 给订阅组的客户端发送消息 - ByGroupID(gid string, data any) error + "be.ems/src/framework/vo/result" + "be.ems/src/modules/ws/model" +) + +// 订阅组指定编号为支持服务器向客户端主动推送数据 +const ( + // 组号-其他 + GROUP_OTHER = "0" + // 组号-跟踪任务数据变更 2_traceId + GROUP_TRACE = "2_" + // 组号-指标通用 10_neType_neId + GROUP_KPI = "10_" + // 组号-指标UPF 12_neId + GROUP_KPI_UPF = "12_" + // 组号-自定义KPI指标20_neType_neId + GROUP_KPI_C = "20_" + // 组号-IMS_CDR会话事件 1005_neId + GROUP_IMS_CDR = "1005_" + // 组号-SMF_CDR会话事件 1006_neId + GROUP_SMF_CDR = "1006_" + // 组号-SMSC_CDR会话事件 1007_neId + GROUP_SMSC_CDR = "1007_" + // 组号-AMF_UE会话事件 + GROUP_AMF_UE = "1010" + // 组号-MME_UE会话事件 1011_neId + GROUP_MME_UE = "1011_" +) + +// 实例化服务层 WSSend 结构体 +var NewWSSend = &WSSend{} + +// WSSend WebSocket消息发送处理 服务层处理 +type WSSend struct{} + +// ByClientID 给已知客户端发消息 +func (s *WSSend) ByClientID(clientID string, data any) error { + v, ok := wsClients.Load(clientID) + if !ok { + return fmt.Errorf("no fount client ID: %s", clientID) + } + + dataByte, err := json.Marshal(result.OkData(data)) + if err != nil { + return err + } + + client := v.(*model.WSClient) + if len(client.MsgChan) > 90 { + NewWS.ClientClose(client.ID) + return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID) + } + client.MsgChan <- dataByte + return nil +} + +// ByGroupID 给订阅组的客户端发送消息 +func (s *WSSend) ByGroupID(groupID string, data any) error { + clientIds, ok := wsGroup.Load(groupID) + if !ok { + return fmt.Errorf("no fount Group ID: %s", groupID) + } + + // 检查组内是否有客户端 + ids := clientIds.(*[]string) + if len(*ids) == 0 { + return fmt.Errorf("no members in the group") + } + + // 遍历给客户端发消息 + for _, clientId := range *ids { + err := s.ByClientID(clientId, map[string]any{ + "groupId": groupID, + "data": data, + }) + if err != nil { + continue + } + } + + return nil } diff --git a/src/modules/ws/service/ws_send.impl.go b/src/modules/ws/service/ws_send.impl.go deleted file mode 100644 index 969b2688..00000000 --- a/src/modules/ws/service/ws_send.impl.go +++ /dev/null @@ -1,87 +0,0 @@ -package service - -import ( - "encoding/json" - "fmt" - - "be.ems/src/framework/vo/result" - "be.ems/src/modules/ws/model" -) - -// 订阅组指定编号为支持服务器向客户端主动推送数据 -const ( - // 组号-其他 - GROUP_OTHER = "0" - // 组号-跟踪任务数据变更 2_traceId - GROUP_TRACE = "2_" - // 组号-指标通用 10_neType_neId - GROUP_KPI = "10_" - // 组号-指标UPF 12_neId - GROUP_KPI_UPF = "12_" - // 组号-自定义KPI指标20_neType_neId - GROUP_KPI_C = "20_" - // 组号-IMS_CDR会话事件 1005_neId - GROUP_IMS_CDR = "1005_" - // 组号-SMF_CDR会话事件 1006_neId - GROUP_SMF_CDR = "1006_" - // 组号-SMSC_CDR会话事件 1007_neId - GROUP_SMSC_CDR = "1007_" - // 组号-AMF_UE会话事件 - GROUP_AMF_UE = "1010" - // 组号-MME_UE会话事件 1011_neId - GROUP_MME_UE = "1011_" -) - -// 实例化服务层 WSSendImpl 结构体 -var NewWSSendImpl = &WSSendImpl{} - -// IWSSend WebSocket消息发送处理 服务层处理 -type WSSendImpl struct{} - -// ByClientID 给已知客户端发消息 -func (s *WSSendImpl) ByClientID(clientID string, data any) error { - v, ok := wsClients.Load(clientID) - if !ok { - return fmt.Errorf("no fount client ID: %s", clientID) - } - - dataByte, err := json.Marshal(result.OkData(data)) - if err != nil { - return err - } - - client := v.(*model.WSClient) - if len(client.MsgChan) > 90 { - NewWSImpl.ClientClose(client.ID) - return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID) - } - client.MsgChan <- dataByte - return nil -} - -// ByGroupID 给订阅组的客户端发送消息 -func (s *WSSendImpl) ByGroupID(groupID string, data any) error { - clientIds, ok := wsGroup.Load(groupID) - if !ok { - return fmt.Errorf("no fount Group ID: %s", groupID) - } - - // 检查组内是否有客户端 - ids := clientIds.(*[]string) - if len(*ids) == 0 { - return fmt.Errorf("no members in the group") - } - - // 遍历给客户端发消息 - for _, clientId := range *ids { - err := s.ByClientID(clientId, map[string]any{ - "groupId": groupID, - "data": data, - }) - if err != nil { - continue - } - } - - return nil -}