package controller import ( "encoding/json" "fmt" "strconv" "strings" "time" neService "be.ems/src/modules/network_element/service" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/utils/ctx" "be.ems/src/framework/utils/parse" "be.ems/src/framework/utils/ssh" "be.ems/src/framework/utils/telnet" "be.ems/src/framework/vo/result" "be.ems/src/modules/ws/service" "github.com/gin-gonic/gin" ) // 实例化控制层 WSController 结构体 var NewWSController = &WSController{ wsService: service.NewWSImpl, wsSendService: service.NewWSSendImpl, neHostService: neService.NewNeHostImpl, } // WebSocket通信 // // PATH /ws type WSController struct { // WebSocket 服务 wsService service.IWS // WebSocket消息发送 服务 wsSendService service.IWSSend // 网元主机连接服务 neHostService neService.INeHost } // 通用 // // GET /?subGroupIDs=0 func (s *WSController) WS(c *gin.Context) { language := ctx.AcceptLanguage(c) // 登录用户信息 loginUser, err := ctx.LoginUser(c) if err != nil { c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) return } // 订阅消息组 var subGroupIDs []string subGroupIDStr := c.Query("subGroupID") if subGroupIDStr != "" { // 处理字符转id数组后去重 ids := strings.Split(subGroupIDStr, ",") uniqueIDs := parse.RemoveDuplicates(ids) if len(uniqueIDs) > 0 { subGroupIDs = uniqueIDs } } // 将 HTTP 连接升级为 WebSocket 连接 conn := s.wsService.UpgraderWs(c.Writer, c.Request) if conn == nil { return } defer conn.Close() wsClient := s.wsService.NewClient(loginUser.UserID, subGroupIDs, conn, nil) // 等待停止信号 for value := range wsClient.StopChan { s.wsService.CloseClient(wsClient.ID) logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) return } } // 测试 // // GET /test?clientId=&groupID= func (s *WSController) Test(c *gin.Context) { language := ctx.AcceptLanguage(c) // 登录用户信息 loginUser, err := ctx.LoginUser(c) if err != nil { c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) return } errMsgArr := []string{} clientId := c.Query("clientId") if clientId != "" { err := s.wsSendService.ByClientID(c.Query("clientId"), loginUser) if err != nil { errMsgArr = append(errMsgArr, "clientId: "+err.Error()) } } groupID := c.Query("groupID") if groupID != "" { err := s.wsSendService.ByGroupID(c.Query("groupID"), loginUser) if err != nil { errMsgArr = append(errMsgArr, "groupID: "+err.Error()) } } c.JSON(200, result.OkData(errMsgArr)) } // SSH终端 // // GET /ssh?hostId=1&cols=80&rows=40 func (s *WSController) SSH(c *gin.Context) { language := ctx.AcceptLanguage(c) // 登录用户信息 loginUser, err := ctx.LoginUser(c) if err != nil { c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) return } // 连接主机ID hostId := c.Query("hostId") if hostId == "" { c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) return } neHost := s.neHostService.SelectById(hostId) if neHost.HostID != hostId || neHost.HostType != "ssh" { // 没有可访问主机信息数据! c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData"))) return } // 创建链接SSH客户端 var connSSH ssh.ConnSSH neHost.CopyTo(&connSSH) var client *ssh.ConnSSH var clientErr error if neHost.AuthMode == "2" { client, clientErr = connSSH.NewClientByLocalPrivate() } else { client, clientErr = connSSH.NewClient() } if clientErr != nil { // 连接主机失败,请检查连接参数后重试 c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) return } defer client.Close() // 终端单行字符数 cols, err := strconv.Atoi(c.Query("cols")) if err != nil { cols = 80 } // 终端显示行数 rows, err := strconv.Atoi(c.Query("rows")) if err != nil { rows = 40 } // 创建SSH客户端会话 clientSession, err := client.NewClientSession(cols, rows) if err != nil { // 连接主机失败,请检查连接参数后重试 c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) return } defer clientSession.Close() // 将 HTTP 连接升级为 WebSocket 连接 wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) if wsConn == nil { return } defer wsConn.Close() wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession) // 实时读取SSH消息直接输出 msTicker := time.NewTicker(100 * time.Millisecond) defer msTicker.Stop() go func() { for ms := range msTicker.C { outputByte := clientSession.Read() if len(outputByte) > 0 { outputStr := string(outputByte) msgByte, _ := json.Marshal(result.Ok(map[string]any{ "requestId": fmt.Sprintf("ssh_%s_%d", hostId, ms.UnixMilli()), "data": outputStr, })) wsClient.MsgChan <- msgByte // 退出ssh登录 // if strings.LastIndex(outputStr, "logout\r\n") != -1 { // time.Sleep(1 * time.Second) // s.wsService.CloseClient(wsClient.ID) // return // } } } }() // 等待停止信号 for value := range wsClient.StopChan { s.wsService.CloseClient(wsClient.ID) logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) return } } // Telnet终端 // // GET /telnet?hostId=1 func (s *WSController) Telnet(c *gin.Context) { language := ctx.AcceptLanguage(c) // 登录用户信息 loginUser, err := ctx.LoginUser(c) if err != nil { c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) return } // 连接主机ID hostId := c.Query("hostId") if hostId == "" { c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) return } neHost := s.neHostService.SelectById(hostId) if neHost.HostID != hostId || neHost.HostType != "telnet" { // 没有可访问主机信息数据! c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData"))) return } // 创建链接Telnet客户端 var connTelnet telnet.ConnTelnet neHost.CopyTo(&connTelnet) client, err := connTelnet.NewClient() if err != nil { // 连接主机失败,请检查连接参数后重试 c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) return } defer client.Close() // 终端单行字符数 cols, err := strconv.Atoi(c.Query("cols")) if err != nil || cols > 254 { cols = 80 } // 终端显示行数 rows, err := strconv.Atoi(c.Query("rows")) if err != nil || cols > rows { rows = 40 } // 创建Telnet客户端会话 clientSession, err := client.NewClientSession(uint8(cols), uint8(rows)) if err != nil { // 连接主机失败,请检查连接参数后重试 c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo"))) return } defer clientSession.Close() // 将 HTTP 连接升级为 WebSocket 连接 wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) if wsConn == nil { return } defer wsConn.Close() wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession) // 实时读取Telnet消息直接输出 msTicker := time.NewTicker(100 * time.Millisecond) defer msTicker.Stop() go func() { for ms := range msTicker.C { outputByte := clientSession.Read() if len(outputByte) > 0 { outputStr := strings.TrimRight(string(outputByte), "\x00") msgByte, _ := json.Marshal(result.Ok(map[string]any{ "requestId": fmt.Sprintf("telnet_%s_%d", hostId, ms.UnixMilli()), "data": outputStr, })) wsClient.MsgChan <- msgByte // 退出telnet登录 // if strings.LastIndex(outputStr, "logout\r\n") != -1 { // time.Sleep(1 * time.Second) // s.wsService.CloseClient(wsClient.ID) // return // } } } }() // 等待停止信号 for value := range wsClient.StopChan { s.wsService.CloseClient(wsClient.ID) logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) return } }