diff --git a/src/modules/ws/service/ws.impl.go b/src/modules/ws/service/ws.impl.go index d210d4bf..350979d3 100644 --- a/src/modules/ws/service/ws.impl.go +++ b/src/modules/ws/service/ws.impl.go @@ -100,12 +100,6 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) } } - defer func() { - // 在这里处理 Panic 协程异常 - if err := recover(); err != nil { - logger.Errorf("ws Panic Error: %s => %v", clientID, err) - } - }() go s.clientRead(wsClient) go s.clientWrite(wsClient) @@ -120,6 +114,11 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) // clientRead 客户端读取消息 func (s *WSImpl) clientRead(wsClient *model.WSClient) { + defer func() { + if err := recover(); err != nil { + logger.Errorf("ws ReadMessage Panic Error: %v", err) + } + }() for { // 读取消息 messageType, msg, err := wsClient.Conn.ReadMessage() @@ -138,12 +137,7 @@ func (s *WSImpl) clientRead(wsClient *model.WSClient) { msgByte, _ := json.Marshal(result.ErrMsg("message format not supported")) wsClient.MsgChan <- msgByte } else { - err := NewWSReceiveImpl.Receive(wsClient, reqMsg) - if err != nil { - logger.Warnf("ws ReceiveMessage UID %s err: %s", wsClient.BindUid, err.Error()) - msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) - wsClient.MsgChan <- msgByte - } + go NewWSReceiveImpl.Receive(wsClient, reqMsg) } } } @@ -151,6 +145,11 @@ func (s *WSImpl) clientRead(wsClient *model.WSClient) { // clientWrite 客户端写入消息 func (s *WSImpl) clientWrite(wsClient *model.WSClient) { + defer func() { + if err := recover(); err != nil { + logger.Errorf("ws WriteMessage Panic Error: %v", err) + } + }() for msg := range wsClient.MsgChan { // 发送消息 err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg) diff --git a/src/modules/ws/service/ws_receive.impl.go b/src/modules/ws/service/ws_receive.impl.go index 697d9880..454878dd 100644 --- a/src/modules/ws/service/ws_receive.impl.go +++ b/src/modules/ws/service/ws_receive.impl.go @@ -1,8 +1,11 @@ package service import ( + "encoding/json" "fmt" + "ems.agt/src/framework/logger" + "ems.agt/src/framework/vo/result" "ems.agt/src/modules/ws/model" "ems.agt/src/modules/ws/processor" ) @@ -14,9 +17,13 @@ var NewWSReceiveImpl = &WSReceiveImpl{} type WSReceiveImpl struct{} // Receive 接收处理 -func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) error { +func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) { if reqMsg.RequestID == "" { - return fmt.Errorf("message requestId is required") + msg := "message requestId is required" + logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return } var resByte []byte @@ -36,12 +43,14 @@ func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) case "ne_state": resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data) default: - return fmt.Errorf("message type not supported") + err = fmt.Errorf("message type not supported") } if err != nil { - return err + logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + return } client.MsgChan <- resByte - return nil }