perf: 优化ws模块协程资源消耗

This commit is contained in:
TsMask
2024-08-07 15:27:36 +08:00
parent 712a9fee0b
commit 0f98508169
7 changed files with 376 additions and 145 deletions

View File

@@ -20,14 +20,15 @@ import (
"github.com/gin-gonic/gin"
)
// 实例化控制层 WSController 结构体
// NewWSController 实例化控制层 WSController 结构体
var NewWSController = &WSController{
wsService: service.NewWSImpl,
wsSendService: service.NewWSSendImpl,
neHostService: neService.NewNeHostImpl,
neInfoService: neService.NewNeInfoImpl,
}
// WebSocket通信
// WSController WebSocket通信
//
// PATH /ws
type WSController struct {
@@ -37,9 +38,11 @@ type WSController struct {
wsSendService service.IWSSend
// 网元主机连接服务
neHostService neService.INeHost
// 网元信息服务
neInfoService neService.INeInfo
}
// 通用
// WS 通用
//
// GET /?subGroupIDs=0
func (s *WSController) WS(c *gin.Context) {
@@ -71,17 +74,19 @@ func (s *WSController) WS(c *gin.Context) {
}
defer conn.Close()
wsClient := s.wsService.NewClient(loginUser.UserID, subGroupIDs, conn, nil)
wsClient := s.wsService.ClientCreate(loginUser.UserID, subGroupIDs, conn, nil)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveCommont)
// 等待停止信号
for value := range wsClient.StopChan {
s.wsService.CloseClient(wsClient.ID)
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
return
}
}
// 测试
// Test 测试
//
// GET /test?clientId=&groupID=
func (s *WSController) Test(c *gin.Context) {
@@ -115,7 +120,7 @@ func (s *WSController) Test(c *gin.Context) {
c.JSON(200, result.OkData(errMsgArr))
}
// SSH终端
// SSH 终端
//
// GET /ssh?hostId=1&cols=80&rows=40
func (s *WSController) SSH(c *gin.Context) {
@@ -185,13 +190,16 @@ func (s *WSController) SSH(c *gin.Context) {
}
defer wsConn.Close()
wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession)
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveShell)
// 实时读取SSH消息直接输出
msTicker := time.NewTicker(100 * time.Millisecond)
defer msTicker.Stop()
go func() {
for ms := range msTicker.C {
for {
select {
case ms := <-msTicker.C:
outputByte := clientSession.Read()
if len(outputByte) > 0 {
outputStr := string(outputByte)
@@ -208,18 +216,15 @@ func (s *WSController) SSH(c *gin.Context) {
// return
// }
}
case <-wsClient.StopChan: // 等待停止信号
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
return
}
}()
// 等待停止信号
for value := range wsClient.StopChan {
s.wsService.CloseClient(wsClient.ID)
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
return
}
}
// Telnet终端
// Telnet 终端
//
// GET /telnet?hostId=1
func (s *WSController) Telnet(c *gin.Context) {
@@ -283,13 +288,16 @@ func (s *WSController) Telnet(c *gin.Context) {
}
defer wsConn.Close()
wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession)
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveShell)
// 实时读取Telnet消息直接输出
msTicker := time.NewTicker(100 * time.Millisecond)
defer msTicker.Stop()
go func() {
for ms := range msTicker.C {
for {
select {
case ms := <-msTicker.C:
outputByte := clientSession.Read()
if len(outputByte) > 0 {
outputStr := strings.TrimRight(string(outputByte), "\x00")
@@ -306,13 +314,88 @@ func (s *WSController) Telnet(c *gin.Context) {
// return
// }
}
case <-wsClient.StopChan: // 等待停止信号
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
return
}
}
}
// ShellView 终端交互式文件内容查看
//
// GET /view
func (s *WSController) ShellView(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var query struct {
NeType string `form:"neType" binding:"required"`
NeId string `form:"neId" binding:"required"`
Cols int `form:"cols"` // 终端单行字符数
Rows int `form:"rows"` // 终端显示行数
}
if err := c.ShouldBindQuery(&query); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
if query.Cols < 120 || query.Cols > 400 {
query.Cols = 120
}
if query.Rows < 40 || query.Rows > 1200 {
query.Rows = 40
}
// 网元主机的SSH客户端
sshClient, err := s.neInfoService.NeRunSSHClient(query.NeType, query.NeId)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
defer sshClient.Close()
// ssh连接会话
clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows)
if err != nil {
c.JSON(200, result.ErrMsg("neinfo ssh client session new err"))
return
}
defer clientSession.Close()
// 将 HTTP 连接升级为 WebSocket 连接
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
if wsConn == nil {
return
}
defer wsConn.Close()
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveShellView)
// 实时读取SSH消息直接输出
msTicker := time.NewTicker(100 * time.Millisecond)
defer msTicker.Stop()
for {
select {
case ms := <-msTicker.C:
outputByte := clientSession.Read()
if len(outputByte) > 0 {
outputStr := string(outputByte)
msgByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": fmt.Sprintf("view_%d", ms.UnixMilli()),
"data": outputStr,
}))
wsClient.MsgChan <- msgByte
}
case <-wsClient.StopChan: // 等待停止信号
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
return
}
}()
// 等待停止信号
for value := range wsClient.StopChan {
s.wsService.CloseClient(wsClient.ID)
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
return
}
}

View File

@@ -12,14 +12,21 @@ type IWS interface {
// UpgraderWs http升级ws请求
UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn
// NewClient 新建客户端
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient
ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient
// CloseClient 关闭客户端
CloseClient(clientID string)
// ClientClose 客户端关闭
ClientClose(clientID string)
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
ClientReadListen(wsClient *model.WSClient, receiveType int)
// ClientWriteListen 客户端写入消息监听
ClientWriteListen(wsClient *model.WSClient)
}

View File

@@ -14,15 +14,12 @@ import (
)
var (
// ws客户端 [clientId: client]
wsClients sync.Map
// ws用户对应的多个客户端id [uid:clientIds]
wsUsers sync.Map
// ws组对应的多个用户id [groupID:uids]
wsGroup sync.Map
wsClients sync.Map // ws客户端 [clientId: client]
wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds]
wsGroup sync.Map // ws对应的多个用户id [groupID:uids]
)
// 实例化服务层 WSImpl 结构体
// NewWSImpl 实例化服务层 WSImpl 结构体
var NewWSImpl = &WSImpl{}
// WSImpl WebSocket通信 服务层处理
@@ -50,13 +47,13 @@ func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.C
return conn
}
// NewClient 新建客户端
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
func (s *WSImpl) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
// clientID也可以用其他方式生成只要能保证在所有服务端中都能保证唯一即可
clientID := generate.Code(16)
@@ -106,76 +103,11 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn,
}
}
go s.clientRead(wsClient)
go s.clientWrite(wsClient)
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": clientID,
}))
wsClient.MsgChan <- msgByte
return wsClient
}
// clientRead 客户端读取消息
func (s *WSImpl) clientRead(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws ReadMessage Panic Error: %v", err)
}
}()
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.CloseClient(wsClient.ID)
return
}
// fmt.Println(messageType, string(msg))
// 文本和二进制类型只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
err := json.Unmarshal(msg, &reqMsg)
if err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format not supported"))
wsClient.MsgChan <- msgByte
} else {
// 协程异步处理
go NewWSReceiveImpl.AsyncReceive(wsClient, reqMsg)
}
}
}
}
// clientWrite 客户端写入消息
func (s *WSImpl) clientWrite(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws WriteMessage Panic Error: %v", err)
}
}()
for msg := range wsClient.MsgChan {
// 关闭句柄
if string(msg) == "ws:close" {
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.CloseClient(wsClient.ID)
return
}
wsClient.LastHeartbeat = time.Now().UnixMilli()
}
}
// CloseClient 客户端关闭
func (s *WSImpl) CloseClient(clientID string) {
// ClientClose 客户端关闭
func (s *WSImpl) ClientClose(clientID string) {
v, ok := wsClients.Load(clientID)
if !ok {
return
@@ -226,3 +158,72 @@ func (s *WSImpl) CloseClient(clientID string) {
}
}
}
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
func (s *WSImpl) ClientReadListen(wsClient *model.WSClient, receiveType int) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws ReadMessage Panic Error: %v", err)
}
}()
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
// fmt.Println(messageType, string(msg))
// 文本 只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
if err := json.Unmarshal(msg, &reqMsg); err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format not supported"))
wsClient.MsgChan <- msgByte
continue
}
// 接收器处理
switch receiveType {
case ReceiveCommont:
go NewWSReceiveImpl.Commont(wsClient, reqMsg)
case ReceiveShell:
go NewWSReceiveImpl.Shell(wsClient, reqMsg)
case ReceiveShellView:
go NewWSReceiveImpl.ShellView(wsClient, reqMsg)
}
}
}
}
// ClientWriteListen 客户端写入消息监听
func (s *WSImpl) ClientWriteListen(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws WriteMessage Panic Error: %v", err)
}
}()
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": wsClient.ID,
}))
wsClient.MsgChan <- msgByte
// 消息发送监听
for msg := range wsClient.MsgChan {
// 关闭句柄
if string(msg) == "ws:close" {
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
wsClient.LastHeartbeat = time.Now().UnixMilli()
}
}

View File

@@ -2,8 +2,20 @@ package service
import "be.ems/src/modules/ws/model"
const (
ReceiveCommont = iota // Commont 接收通用业务处理
ReceiveShell // Shell 接收终端交互业务处理
ReceiveShellView // ShellView 接收查看文件终端交互业务处理
)
// IWSReceive WebSocket消息接收处理 服务层接口
type IWSReceive interface {
// AsyncReceive 接收业务异步处理
AsyncReceive(client *model.WSClient, reqMsg model.WSRequest)
// Commont 接收通用业务处理
Commont(client *model.WSClient, reqMsg model.WSRequest)
// Shell 接收终端交互业务处理
Shell(client *model.WSClient, reqMsg model.WSRequest)
// ShellView 接收查看文件终端交互业务处理
ShellView(client *model.WSClient, reqMsg model.WSRequest)
}

View File

@@ -20,12 +20,22 @@ var NewWSReceiveImpl = &WSReceiveImpl{}
// WSReceiveImpl WebSocket消息接收处理 服务层处理
type WSReceiveImpl struct{}
// AsyncReceive 接收业务异步处理
func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequest) {
// Commont 接收通用业务处理
func (s *WSReceiveImpl) close(client *model.WSClient) {
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
NewWSImpl.ClientClose(client.ID)
}
// Commont 接收通用业务处理
func (s *WSReceiveImpl) Commont(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws AsyncReceive UID %s err: %s", client.BindUid, msg)
logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
@@ -36,34 +46,8 @@ func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequ
switch reqMsg.Type {
case "close":
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
case "ssh":
// SSH会话消息接收直接写入会话
command := reqMsg.Data.(string)
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
case "ssh_resize":
// SSH会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
case "telnet":
// Telnet会话消息接收直接写入会话
command := reqMsg.Data.(string)
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
_, err = telnetClientSession.Write(command)
s.close(client)
return
case "ps":
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
case "net":
@@ -85,7 +69,146 @@ func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequ
}
if err != nil {
logger.Warnf("ws AsyncReceive UID %s err: %s", client.BindUid, err.Error())
logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Shell 接收终端交互业务处理
func (s *WSReceiveImpl) Shell(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ssh":
// SSH会话消息接收写入会话
if client.ChildConn == nil {
err = fmt.Errorf("message type ssh not connected")
} else {
command := reqMsg.Data.(string)
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ssh_resize":
// SSH会话窗口重置
if client.ChildConn == nil {
err = fmt.Errorf("message type ssh not connected")
} else {
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
}
case "telnet":
// Telnet会话消息接收写入会话
if client.ChildConn == nil {
err = fmt.Errorf("message type telnet not connected")
} else {
command := reqMsg.Data.(string)
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
_, err = telnetClientSession.Write(command)
}
default:
err = fmt.Errorf("message type not supported")
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// ShellView 接收查看文件终端交互业务处理
func (s *WSReceiveImpl) ShellView(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ssh":
// SSH会话消息接收写入会话
if client.ChildConn == nil {
err = fmt.Errorf("message type ssh not connected")
} else {
command := reqMsg.Data.(string)
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ssh_resize":
// SSH会话窗口重置
if client.ChildConn == nil {
err = fmt.Errorf("message type ssh not connected")
} else {
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
}
case "telnet":
// Telnet会话消息接收写入会话
if client.ChildConn == nil {
err = fmt.Errorf("message type telnet not connected")
} else {
command := reqMsg.Data.(string)
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
_, err = telnetClientSession.Write(command)
}
default:
err = fmt.Errorf("message type not supported")
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {

View File

@@ -46,7 +46,7 @@ func (s *WSSendImpl) ByClientID(clientID string, data any) error {
client := v.(*model.WSClient)
if len(client.MsgChan) > 90 {
NewWSImpl.CloseClient(client.ID)
NewWSImpl.ClientClose(client.ID)
return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID)
}
client.MsgChan <- dataByte

View File

@@ -21,6 +21,10 @@ func Setup(router *gin.Engine) {
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewWSController.WS,
)
wsGroup.GET("/test",
middleware.PreAuthorize(nil),
controller.NewWSController.Test,
)
wsGroup.GET("/ssh",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
@@ -31,9 +35,10 @@ func Setup(router *gin.Engine) {
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewWSController.Telnet,
)
wsGroup.GET("/test",
wsGroup.GET("/view",
middleware.PreAuthorize(nil),
controller.NewWSController.Test,
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewWSController.ShellView,
)
}
}