fix: ws 处理 Panic 协程异常/协程处理消息结果
This commit is contained in:
@@ -100,12 +100,6 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn)
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// 在这里处理 Panic 协程异常
|
||||
if err := recover(); err != nil {
|
||||
logger.Errorf("ws Panic Error: %s => %v", clientID, err)
|
||||
}
|
||||
}()
|
||||
go s.clientRead(wsClient)
|
||||
go s.clientWrite(wsClient)
|
||||
|
||||
@@ -120,6 +114,11 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn)
|
||||
|
||||
// clientRead 客户端读取消息
|
||||
func (s *WSImpl) clientRead(wsClient *model.WSClient) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Errorf("ws ReadMessage Panic Error: %v", err)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
// 读取消息
|
||||
messageType, msg, err := wsClient.Conn.ReadMessage()
|
||||
@@ -138,12 +137,7 @@ func (s *WSImpl) clientRead(wsClient *model.WSClient) {
|
||||
msgByte, _ := json.Marshal(result.ErrMsg("message format not supported"))
|
||||
wsClient.MsgChan <- msgByte
|
||||
} else {
|
||||
err := NewWSReceiveImpl.Receive(wsClient, reqMsg)
|
||||
if err != nil {
|
||||
logger.Warnf("ws ReceiveMessage UID %s err: %s", wsClient.BindUid, err.Error())
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
|
||||
wsClient.MsgChan <- msgByte
|
||||
}
|
||||
go NewWSReceiveImpl.Receive(wsClient, reqMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -151,6 +145,11 @@ func (s *WSImpl) clientRead(wsClient *model.WSClient) {
|
||||
|
||||
// clientWrite 客户端写入消息
|
||||
func (s *WSImpl) clientWrite(wsClient *model.WSClient) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Errorf("ws WriteMessage Panic Error: %v", err)
|
||||
}
|
||||
}()
|
||||
for msg := range wsClient.MsgChan {
|
||||
// 发送消息
|
||||
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"ems.agt/src/framework/logger"
|
||||
"ems.agt/src/framework/vo/result"
|
||||
"ems.agt/src/modules/ws/model"
|
||||
"ems.agt/src/modules/ws/processor"
|
||||
)
|
||||
@@ -14,9 +17,13 @@ var NewWSReceiveImpl = &WSReceiveImpl{}
|
||||
type WSReceiveImpl struct{}
|
||||
|
||||
// Receive 接收处理
|
||||
func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) error {
|
||||
func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
if reqMsg.RequestID == "" {
|
||||
return fmt.Errorf("message requestId is required")
|
||||
msg := "message requestId is required"
|
||||
logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, msg)
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(msg))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
}
|
||||
|
||||
var resByte []byte
|
||||
@@ -36,12 +43,14 @@ func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest)
|
||||
case "ne_state":
|
||||
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
|
||||
default:
|
||||
return fmt.Errorf("message type not supported")
|
||||
err = fmt.Errorf("message type not supported")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, err.Error())
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
}
|
||||
client.MsgChan <- resByte
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user