From 299eb9d24ab02fb3a2ce619fbc99c37c569dcea4 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Mon, 26 Feb 2024 12:02:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20ws=E6=A8=A1=E5=9D=97=E6=96=B0=E5=A2=9Es?= =?UTF-8?q?sh=E5=92=8Ctelnet=E7=B1=BB=E5=9E=8B=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/modules/ws/controller/ws.go | 199 +++++++++++++++++++++- src/modules/ws/model/ws.go | 3 +- src/modules/ws/service/ws.go | 11 +- src/modules/ws/service/ws.impl.go | 17 +- src/modules/ws/service/ws_receive.go | 4 +- src/modules/ws/service/ws_receive.impl.go | 45 ++++- src/modules/ws/ws.go | 13 +- 7 files changed, 273 insertions(+), 19 deletions(-) diff --git a/src/modules/ws/controller/ws.go b/src/modules/ws/controller/ws.go index 1257f594..45786f74 100644 --- a/src/modules/ws/controller/ws.go +++ b/src/modules/ws/controller/ws.go @@ -1,12 +1,20 @@ package controller import ( + "encoding/json" + "fmt" + "strconv" "strings" + "time" + + neService "ems.agt/src/modules/network_element/service" "ems.agt/src/framework/i18n" "ems.agt/src/framework/logger" "ems.agt/src/framework/utils/ctx" "ems.agt/src/framework/utils/parse" + "ems.agt/src/framework/utils/ssh" + "ems.agt/src/framework/utils/telnet" "ems.agt/src/framework/vo/result" "ems.agt/src/modules/ws/service" "github.com/gin-gonic/gin" @@ -16,6 +24,7 @@ import ( var NewWSController = &WSController{ wsService: service.NewWSImpl, wsSendService: service.NewWSSendImpl, + neHostService: neService.NewNeHostImpl, } // WebSocket通信 @@ -26,6 +35,8 @@ type WSController struct { wsService service.IWS // WebSocket消息发送 服务 wsSendService service.IWSSend + // 网元主机连接服务 + neHostService neService.INeHost } // 通用 @@ -60,10 +71,11 @@ func (s *WSController) WS(c *gin.Context) { } defer conn.Close() - wsClient := s.wsService.NewClient(loginUser.UserID, subGroupIDs, conn) + wsClient := s.wsService.NewClient(loginUser.UserID, subGroupIDs, conn, nil) // 等待停止信号 for value := range wsClient.StopChan { + s.wsService.CloseClient(wsClient.ID) logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) return } @@ -102,3 +114,188 @@ func (s *WSController) Test(c *gin.Context) { c.JSON(200, result.OkData(errMsgArr)) } + +// SSH终端 +// +// GET /ssh?hostId=1&cols=80&rows=40 +func (s *WSController) SSH(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + // 登录用户信息 + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + // 连接主机ID + hostId := c.Query("hostId") + if hostId == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + neHost := s.neHostService.SelectById(hostId) + if neHost.HostID != hostId || neHost.HostType != "ssh" { + // 没有可访问主机信息数据! + c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData"))) + return + } + + // 创建链接SSH客户端 + var connSSH ssh.ConnSSH + neHost.CopyTo(&connSSH) + client, err := connSSH.NewClient() + if err != nil { + // 连接主机失败,请检查连接参数后重试 + c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) + return + } + defer client.Close() + + // 终端单行字符数 + cols, err := strconv.Atoi(c.Query("cols")) + if err != nil { + cols = 80 + } + // 终端显示行数 + rows, err := strconv.Atoi(c.Query("rows")) + if err != nil { + rows = 40 + } + + // 创建SSH客户端会话 + clientSession, err := client.NewClientSession(cols, rows) + if err != nil { + // 连接主机失败,请检查连接参数后重试 + c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) + return + } + defer clientSession.Close() + + // 将 HTTP 连接升级为 WebSocket 连接 + wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) + if wsConn == nil { + return + } + defer wsConn.Close() + + wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession) + + // 实时读取SSH消息直接输出 + msTicker := time.NewTicker(100 * time.Millisecond) + defer msTicker.Stop() + go func() { + for ms := range msTicker.C { + outputByte := clientSession.Read() + if len(outputByte) > 0 { + outputStr := string(outputByte) + msgByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": fmt.Sprintf("ssh_%s_%d", hostId, ms.UnixMilli()), + "data": outputStr, + })) + wsClient.MsgChan <- msgByte + + // 退出ssh登录 + if strings.LastIndex(outputStr, "logout\r\n") != -1 { + time.Sleep(1 * time.Second) + s.wsService.CloseClient(wsClient.ID) + return + } + } + } + }() + + // 等待停止信号 + for value := range wsClient.StopChan { + s.wsService.CloseClient(wsClient.ID) + logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) + return + } +} + +// Telnet终端 +// +// GET /telnet?hostId=1 +func (s *WSController) Telnet(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + // 登录用户信息 + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + // 连接主机ID + hostId := c.Query("hostId") + if hostId == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + neHost := s.neHostService.SelectById(hostId) + if neHost.HostID != hostId || neHost.HostType != "telnet" { + // 没有可访问主机信息数据! + c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData"))) + return + } + + // 创建链接Telnet客户端 + var connTelnet telnet.ConnTelnet + neHost.CopyTo(&connTelnet) + client, err := connTelnet.NewClient() + if err != nil { + // 连接主机失败,请检查连接参数后重试 + c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) + return + } + defer client.Close() + + // 创建Telnet客户端会话 + clientSession, err := client.NewClientSession() + if err != nil { + // 连接主机失败,请检查连接参数后重试 + c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) + return + } + defer clientSession.Close() + + // 将 HTTP 连接升级为 WebSocket 连接 + wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) + if wsConn == nil { + return + } + defer wsConn.Close() + + wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession) + + // 实时读取Telnet消息直接输出 + msTicker := time.NewTicker(100 * time.Millisecond) + defer msTicker.Stop() + go func() { + for ms := range msTicker.C { + outputByte := clientSession.Read() + if len(outputByte) > 0 { + outputStr := strings.TrimRight(string(outputByte), "\x00") + msgByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": fmt.Sprintf("telnet_%s_%d", hostId, ms.UnixMilli()), + "data": outputStr, + })) + wsClient.MsgChan <- msgByte + + // 退出telnet登录 + if strings.LastIndex(outputStr, "logout\r\n") != -1 { + time.Sleep(1 * time.Second) + s.wsService.CloseClient(wsClient.ID) + return + } + } + } + }() + + // 等待停止信号 + for value := range wsClient.StopChan { + s.wsService.CloseClient(wsClient.ID) + logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) + return + } +} diff --git a/src/modules/ws/model/ws.go b/src/modules/ws/model/ws.go index 130419c5..759195ff 100644 --- a/src/modules/ws/model/ws.go +++ b/src/modules/ws/model/ws.go @@ -4,13 +4,14 @@ import "github.com/gorilla/websocket" // WSClient ws客户端 type WSClient struct { - ID string // 连接ID-随机字符串16位 + ID string // 客户端连接ID-随机字符串16位 Conn *websocket.Conn // 连接实例 LastHeartbeat int64 // 最近一次心跳消息(毫秒) BindUid string // 绑定登录用户ID SubGroup []string // 订阅组ID MsgChan chan []byte // 消息通道 StopChan chan struct{} // 停止信号-退出协程 + ChildConn any // 子连接实例-携带某种连接会话 } // WSRequest ws消息接收 diff --git a/src/modules/ws/service/ws.go b/src/modules/ws/service/ws.go index f404c145..92aea1bf 100644 --- a/src/modules/ws/service/ws.go +++ b/src/modules/ws/service/ws.go @@ -12,9 +12,14 @@ type IWS interface { // UpgraderWs http升级ws请求 UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn - // NewClient 新建客户端 uid 登录用户ID - NewClient(uid string, gids []string, conn *websocket.Conn) *model.WSClient + // NewClient 新建客户端 + // + // uid 登录用户ID + // groupIDs 用户订阅组 + // conn ws连接实例 + // childConn 子连接实例 + NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient - // CloseClient 客户端关闭 + // CloseClient 关闭客户端 CloseClient(clientID string) } diff --git a/src/modules/ws/service/ws.impl.go b/src/modules/ws/service/ws.impl.go index 350979d3..7d6d4b7d 100644 --- a/src/modules/ws/service/ws.impl.go +++ b/src/modules/ws/service/ws.impl.go @@ -50,8 +50,13 @@ func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.C return conn } -// NewClient 新建客户端 uid 登录用户ID -func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) *model.WSClient { +// NewClient 新建客户端 +// +// uid 登录用户ID +// groupIDs 用户订阅组 +// conn ws连接实例 +// childConn 子连接实例 +func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient { // clientID也可以用其他方式生成,只要能保证在所有服务端中都能保证唯一即可 clientID := generate.Code(16) @@ -63,6 +68,7 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) SubGroup: groupIDs, MsgChan: make(chan []byte, 100), StopChan: make(chan struct{}, 1), // 卡死循环标记 + ChildConn: childConn, } // 存入客户端 @@ -127,17 +133,18 @@ func (s *WSImpl) clientRead(wsClient *model.WSClient) { s.CloseClient(wsClient.ID) return } + // fmt.Println(messageType, string(msg)) // 文本和二进制类型,只处理文本json if messageType == websocket.TextMessage { var reqMsg model.WSRequest err := json.Unmarshal(msg, &reqMsg) - // fmt.Println(messageType, string(msg)) if err != nil { msgByte, _ := json.Marshal(result.ErrMsg("message format not supported")) wsClient.MsgChan <- msgByte } else { - go NewWSReceiveImpl.Receive(wsClient, reqMsg) + // 协程异步处理 + go NewWSReceiveImpl.AsyncReceive(wsClient, reqMsg) } } } @@ -173,8 +180,8 @@ func (s *WSImpl) CloseClient(clientID string) { defer func() { client.Conn.WriteMessage(websocket.CloseMessage, []byte{}) client.Conn.Close() - client.StopChan <- struct{}{} WsClients.Delete(clientID) + client.StopChan <- struct{}{} }() // 客户端断线时自动踢出Uid绑定列表 diff --git a/src/modules/ws/service/ws_receive.go b/src/modules/ws/service/ws_receive.go index 0a1d8e02..a6aaf2b7 100644 --- a/src/modules/ws/service/ws_receive.go +++ b/src/modules/ws/service/ws_receive.go @@ -4,6 +4,6 @@ import "ems.agt/src/modules/ws/model" // IWSReceive WebSocket消息接收处理 服务层接口 type IWSReceive interface { - // Receive 接收处理 - Receive(client *model.WSClient, reqMsg model.WSRequest) error + // AsyncReceive 接收业务异步处理 + AsyncReceive(client *model.WSClient, reqMsg model.WSRequest) } diff --git a/src/modules/ws/service/ws_receive.impl.go b/src/modules/ws/service/ws_receive.impl.go index 454878dd..cceb680b 100644 --- a/src/modules/ws/service/ws_receive.impl.go +++ b/src/modules/ws/service/ws_receive.impl.go @@ -3,8 +3,11 @@ package service import ( "encoding/json" "fmt" + "time" "ems.agt/src/framework/logger" + "ems.agt/src/framework/utils/ssh" + "ems.agt/src/framework/utils/telnet" "ems.agt/src/framework/vo/result" "ems.agt/src/modules/ws/model" "ems.agt/src/modules/ws/processor" @@ -16,11 +19,12 @@ var NewWSReceiveImpl = &WSReceiveImpl{} // WSReceiveImpl WebSocket消息接收处理 服务层处理 type WSReceiveImpl struct{} -// Receive 接收处理 -func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) { +// AsyncReceive 接收业务异步处理 +func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequest) { + // 必传requestId确认消息 if reqMsg.RequestID == "" { msg := "message requestId is required" - logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, msg) + logger.Infof("ws AsyncReceive UID %s err: %s", client.BindUid, msg) msgByte, _ := json.Marshal(result.ErrMsg(msg)) client.MsgChan <- msgByte return @@ -30,6 +34,35 @@ func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) var err error switch reqMsg.Type { + case "close": + // 主动关闭 + resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) + client.MsgChan <- resultByte + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + case "ssh": + // SSH会话消息接收直接写入会话 + command := reqMsg.Data.(string) + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write(command) + case "ssh_resize": + // SSH会话窗口重置 + msgByte, _ := json.Marshal(reqMsg.Data) + var data struct { + Cols int `json:"cols"` + Rows int `json:"rows"` + } + err = json.Unmarshal(msgByte, &data) + if err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) + } + case "telnet": + // Telnet会话消息接收直接写入会话 + command := reqMsg.Data.(string) + telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession) + _, err = telnetClientSession.Write(command) case "ps": resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data) case "net": @@ -47,10 +80,12 @@ func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) } if err != nil { - logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, err.Error()) + logger.Warnf("ws AsyncReceive UID %s err: %s", client.BindUid, err.Error()) msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) client.MsgChan <- msgByte return } - client.MsgChan <- resByte + if len(resByte) > 0 { + client.MsgChan <- resByte + } } diff --git a/src/modules/ws/ws.go b/src/modules/ws/ws.go index ee83ea3f..26850d60 100644 --- a/src/modules/ws/ws.go +++ b/src/modules/ws/ws.go @@ -18,10 +18,19 @@ func Setup(router *gin.Engine) { { wsGroup.GET("", middleware.PreAuthorize(nil), - collectlogs.OperateLog(collectlogs.OptionNew("WS Subscription", collectlogs.BUSINESS_TYPE_OTHER)), + collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)), controller.NewWSController.WS, ) - + wsGroup.GET("/ssh", + middleware.PreAuthorize(nil), + collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)), + controller.NewWSController.SSH, + ) + wsGroup.GET("/telnet", + middleware.PreAuthorize(nil), + collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)), + controller.NewWSController.Telnet, + ) wsGroup.GET("/test", middleware.PreAuthorize(nil), controller.NewWSController.Test,