fix: ws接收消息处理改为函数回调易于扩展
This commit is contained in:
@@ -1,44 +1,32 @@
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
neService "be.ems/src/modules/network_element/service"
|
|
||||||
|
|
||||||
"be.ems/src/framework/i18n"
|
"be.ems/src/framework/i18n"
|
||||||
"be.ems/src/framework/logger"
|
"be.ems/src/framework/logger"
|
||||||
"be.ems/src/framework/telnet"
|
|
||||||
"be.ems/src/framework/utils/ctx"
|
"be.ems/src/framework/utils/ctx"
|
||||||
"be.ems/src/framework/utils/parse"
|
"be.ems/src/framework/utils/parse"
|
||||||
"be.ems/src/framework/utils/ssh"
|
|
||||||
"be.ems/src/framework/vo/result"
|
"be.ems/src/framework/vo/result"
|
||||||
"be.ems/src/modules/ws/service"
|
"be.ems/src/modules/ws/service"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewWSController 实例化控制层 WSController 结构体
|
// NewWSController 实例化控制层 WSController 结构体
|
||||||
var NewWSController = &WSController{
|
var NewWSController = &WSController{
|
||||||
wsService: service.NewWS,
|
wsService: service.NewWS,
|
||||||
wsSendService: service.NewWSSend,
|
wsSendService: service.NewWSSend,
|
||||||
neHostService: neService.NewNeHostImpl,
|
wsReceiveService: service.NewWSReceive,
|
||||||
neInfoService: neService.NewNeInfoImpl,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WSController WebSocket通信
|
// WSController WebSocket通信
|
||||||
//
|
//
|
||||||
// PATH /ws
|
// PATH /ws
|
||||||
type WSController struct {
|
type WSController struct {
|
||||||
// WebSocket 服务
|
wsService *service.WS // WebSocket 服务
|
||||||
wsService *service.WS
|
wsSendService *service.WSSend // WebSocket消息发送 服务
|
||||||
// WebSocket消息发送 服务
|
wsReceiveService *service.WSReceive // WebSocket消息接收 服务
|
||||||
wsSendService *service.WSSend
|
|
||||||
// 网元主机连接服务
|
|
||||||
neHostService neService.INeHost
|
|
||||||
// 网元信息服务
|
|
||||||
neInfoService neService.INeInfo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WS 通用
|
// WS 通用
|
||||||
@@ -75,7 +63,7 @@ func (s *WSController) WS(c *gin.Context) {
|
|||||||
|
|
||||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, subGroupIDs, conn, nil)
|
wsClient := s.wsService.ClientCreate(loginUser.UserID, subGroupIDs, conn, nil)
|
||||||
go s.wsService.ClientWriteListen(wsClient)
|
go s.wsService.ClientWriteListen(wsClient)
|
||||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveCommont)
|
go s.wsService.ClientReadListen(wsClient, s.wsReceiveService.Commont)
|
||||||
|
|
||||||
// 等待停止信号
|
// 等待停止信号
|
||||||
for value := range wsClient.StopChan {
|
for value := range wsClient.StopChan {
|
||||||
@@ -118,282 +106,3 @@ func (s *WSController) Test(c *gin.Context) {
|
|||||||
|
|
||||||
c.JSON(200, result.OkData(errMsgArr))
|
c.JSON(200, result.OkData(errMsgArr))
|
||||||
}
|
}
|
||||||
|
|
||||||
// SSH 终端
|
|
||||||
//
|
|
||||||
// GET /ssh?hostId=1&cols=80&rows=40
|
|
||||||
func (s *WSController) SSH(c *gin.Context) {
|
|
||||||
language := ctx.AcceptLanguage(c)
|
|
||||||
var query struct {
|
|
||||||
HostId string `form:"hostId" binding:"required"` // 连接主机ID
|
|
||||||
Cols int `form:"cols"` // 终端单行字符数
|
|
||||||
Rows int `form:"rows"` // 终端显示行数
|
|
||||||
}
|
|
||||||
if err := c.ShouldBindQuery(&query); err != nil {
|
|
||||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if query.Cols < 80 || query.Cols > 400 {
|
|
||||||
query.Cols = 80
|
|
||||||
}
|
|
||||||
if query.Rows < 40 || query.Rows > 1200 {
|
|
||||||
query.Rows = 40
|
|
||||||
}
|
|
||||||
|
|
||||||
// 登录用户信息
|
|
||||||
loginUser, err := ctx.LoginUser(c)
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
neHost := s.neHostService.SelectById(query.HostId)
|
|
||||||
if neHost.HostID != query.HostId || neHost.HostType != "ssh" {
|
|
||||||
// 没有可访问主机信息数据!
|
|
||||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建链接SSH客户端
|
|
||||||
var connSSH ssh.ConnSSH
|
|
||||||
neHost.CopyTo(&connSSH)
|
|
||||||
var client *ssh.ConnSSH
|
|
||||||
var clientErr error
|
|
||||||
if neHost.AuthMode == "2" {
|
|
||||||
client, clientErr = connSSH.NewClientByLocalPrivate()
|
|
||||||
} else {
|
|
||||||
client, clientErr = connSSH.NewClient()
|
|
||||||
}
|
|
||||||
if clientErr != nil {
|
|
||||||
// 连接主机失败,请检查连接参数后重试
|
|
||||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// 创建SSH客户端会话
|
|
||||||
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
|
|
||||||
if err != nil {
|
|
||||||
// 连接主机失败,请检查连接参数后重试
|
|
||||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer clientSession.Close()
|
|
||||||
|
|
||||||
// 将 HTTP 连接升级为 WebSocket 连接
|
|
||||||
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
|
|
||||||
if wsConn == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer wsConn.Close()
|
|
||||||
|
|
||||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
|
||||||
go s.wsService.ClientWriteListen(wsClient)
|
|
||||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveShell)
|
|
||||||
|
|
||||||
// 实时读取SSH消息直接输出
|
|
||||||
msTicker := time.NewTicker(100 * time.Millisecond)
|
|
||||||
defer msTicker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case ms := <-msTicker.C:
|
|
||||||
outputByte := clientSession.Read()
|
|
||||||
if len(outputByte) > 0 {
|
|
||||||
outputStr := string(outputByte)
|
|
||||||
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
|
||||||
"requestId": fmt.Sprintf("ssh_%s_%d", neHost.HostID, ms.UnixMilli()),
|
|
||||||
"data": outputStr,
|
|
||||||
}))
|
|
||||||
wsClient.MsgChan <- msgByte
|
|
||||||
|
|
||||||
// 退出ssh登录
|
|
||||||
// if strings.LastIndex(outputStr, "logout\r\n") != -1 {
|
|
||||||
// time.Sleep(1 * time.Second)
|
|
||||||
// s.wsService.CloseClient(wsClient.ID)
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
case <-wsClient.StopChan: // 等待停止信号
|
|
||||||
s.wsService.ClientClose(wsClient.ID)
|
|
||||||
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Telnet 终端
|
|
||||||
//
|
|
||||||
// GET /telnet?hostId=1
|
|
||||||
func (s *WSController) Telnet(c *gin.Context) {
|
|
||||||
language := ctx.AcceptLanguage(c)
|
|
||||||
var query struct {
|
|
||||||
HostId string `form:"hostId" binding:"required"` // 连接主机ID
|
|
||||||
Cols int `form:"cols"` // 终端单行字符数
|
|
||||||
Rows int `form:"rows"` // 终端显示行数
|
|
||||||
}
|
|
||||||
if err := c.ShouldBindQuery(&query); err != nil {
|
|
||||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if query.Cols < 120 || query.Cols > 400 {
|
|
||||||
query.Cols = 120
|
|
||||||
}
|
|
||||||
if query.Rows < 128 || query.Rows > 1200 {
|
|
||||||
query.Rows = 128
|
|
||||||
}
|
|
||||||
|
|
||||||
// 登录用户信息
|
|
||||||
loginUser, err := ctx.LoginUser(c)
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
neHost := s.neHostService.SelectById(query.HostId)
|
|
||||||
if neHost.HostID != query.HostId || neHost.HostType != "telnet" {
|
|
||||||
// 没有可访问主机信息数据!
|
|
||||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建链接Telnet客户端
|
|
||||||
var connTelnet telnet.ConnTelnet
|
|
||||||
neHost.CopyTo(&connTelnet)
|
|
||||||
client, err := connTelnet.NewClient()
|
|
||||||
if err != nil {
|
|
||||||
// 连接主机失败,请检查连接参数后重试
|
|
||||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer client.Close()
|
|
||||||
// 创建Telnet客户端会话
|
|
||||||
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
|
|
||||||
if err != nil {
|
|
||||||
// 连接主机失败,请检查连接参数后重试
|
|
||||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer clientSession.Close()
|
|
||||||
|
|
||||||
// 将 HTTP 连接升级为 WebSocket 连接
|
|
||||||
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
|
|
||||||
if wsConn == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer wsConn.Close()
|
|
||||||
|
|
||||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
|
||||||
go s.wsService.ClientWriteListen(wsClient)
|
|
||||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveTelnet)
|
|
||||||
|
|
||||||
// 实时读取Telnet消息直接输出
|
|
||||||
msTicker := time.NewTicker(100 * time.Millisecond)
|
|
||||||
defer msTicker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case ms := <-msTicker.C:
|
|
||||||
outputByte := clientSession.Read()
|
|
||||||
if len(outputByte) > 0 {
|
|
||||||
outputStr := string(outputByte)
|
|
||||||
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
|
||||||
"requestId": fmt.Sprintf("telnet_%s_%d", neHost.HostID, ms.UnixMilli()),
|
|
||||||
"data": outputStr,
|
|
||||||
}))
|
|
||||||
wsClient.MsgChan <- msgByte
|
|
||||||
|
|
||||||
// 退出telnet登录
|
|
||||||
// if strings.LastIndex(outputStr, "logout\r\n") != -1 {
|
|
||||||
// time.Sleep(1 * time.Second)
|
|
||||||
// s.wsService.CloseClient(wsClient.ID)
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
case <-wsClient.StopChan: // 等待停止信号
|
|
||||||
s.wsService.ClientClose(wsClient.ID)
|
|
||||||
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ShellView 终端交互式文件内容查看
|
|
||||||
//
|
|
||||||
// GET /view
|
|
||||||
func (s *WSController) ShellView(c *gin.Context) {
|
|
||||||
language := ctx.AcceptLanguage(c)
|
|
||||||
var query struct {
|
|
||||||
NeType string `form:"neType" binding:"required"`
|
|
||||||
NeId string `form:"neId" binding:"required"`
|
|
||||||
Cols int `form:"cols"` // 终端单行字符数
|
|
||||||
Rows int `form:"rows"` // 终端显示行数
|
|
||||||
}
|
|
||||||
if err := c.ShouldBindQuery(&query); err != nil {
|
|
||||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if query.Cols < 120 || query.Cols > 400 {
|
|
||||||
query.Cols = 120
|
|
||||||
}
|
|
||||||
if query.Rows < 40 || query.Rows > 1200 {
|
|
||||||
query.Rows = 40
|
|
||||||
}
|
|
||||||
|
|
||||||
// 登录用户信息
|
|
||||||
loginUser, err := ctx.LoginUser(c)
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 网元主机的SSH客户端
|
|
||||||
sshClient, err := s.neInfoService.NeRunSSHClient(query.NeType, query.NeId)
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(200, result.ErrMsg(err.Error()))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer sshClient.Close()
|
|
||||||
// ssh连接会话
|
|
||||||
clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows)
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(200, result.ErrMsg("neinfo ssh client session new err"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer clientSession.Close()
|
|
||||||
|
|
||||||
// 将 HTTP 连接升级为 WebSocket 连接
|
|
||||||
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
|
|
||||||
if wsConn == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer wsConn.Close()
|
|
||||||
|
|
||||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
|
||||||
go s.wsService.ClientWriteListen(wsClient)
|
|
||||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveShellView)
|
|
||||||
|
|
||||||
// 等待1秒,排空首次消息
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
_ = clientSession.Read()
|
|
||||||
|
|
||||||
// 实时读取SSH消息直接输出
|
|
||||||
msTicker := time.NewTicker(100 * time.Millisecond)
|
|
||||||
defer msTicker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case ms := <-msTicker.C:
|
|
||||||
outputByte := clientSession.Read()
|
|
||||||
if len(outputByte) > 0 {
|
|
||||||
outputStr := string(outputByte)
|
|
||||||
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
|
||||||
"requestId": fmt.Sprintf("view_%d", ms.UnixMilli()),
|
|
||||||
"data": outputStr,
|
|
||||||
}))
|
|
||||||
wsClient.MsgChan <- msgByte
|
|
||||||
}
|
|
||||||
case <-wsClient.StopChan: // 等待停止信号
|
|
||||||
s.wsService.ClientClose(wsClient.ID)
|
|
||||||
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
118
src/modules/ws/controller/ws_ssh.go
Normal file
118
src/modules/ws/controller/ws_ssh.go
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"be.ems/src/framework/i18n"
|
||||||
|
"be.ems/src/framework/logger"
|
||||||
|
"be.ems/src/framework/utils/ctx"
|
||||||
|
"be.ems/src/framework/utils/ssh"
|
||||||
|
"be.ems/src/framework/vo/result"
|
||||||
|
neService "be.ems/src/modules/network_element/service"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SSH 终端
|
||||||
|
//
|
||||||
|
// GET /ssh?hostId=1&cols=80&rows=40
|
||||||
|
func (s *WSController) SSH(c *gin.Context) {
|
||||||
|
language := ctx.AcceptLanguage(c)
|
||||||
|
var query struct {
|
||||||
|
HostId string `form:"hostId" binding:"required"` // 连接主机ID
|
||||||
|
Cols int `form:"cols"` // 终端单行字符数
|
||||||
|
Rows int `form:"rows"` // 终端显示行数
|
||||||
|
}
|
||||||
|
if err := c.ShouldBindQuery(&query); err != nil {
|
||||||
|
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if query.Cols < 80 || query.Cols > 400 {
|
||||||
|
query.Cols = 80
|
||||||
|
}
|
||||||
|
if query.Rows < 40 || query.Rows > 1200 {
|
||||||
|
query.Rows = 40
|
||||||
|
}
|
||||||
|
|
||||||
|
// 登录用户信息
|
||||||
|
loginUser, err := ctx.LoginUser(c)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
neHost := neService.NewNeHostImpl.SelectById(query.HostId)
|
||||||
|
if neHost.HostID != query.HostId || neHost.HostType != "ssh" {
|
||||||
|
// 没有可访问主机信息数据!
|
||||||
|
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建链接SSH客户端
|
||||||
|
var connSSH ssh.ConnSSH
|
||||||
|
neHost.CopyTo(&connSSH)
|
||||||
|
var client *ssh.ConnSSH
|
||||||
|
var clientErr error
|
||||||
|
if neHost.AuthMode == "2" {
|
||||||
|
client, clientErr = connSSH.NewClientByLocalPrivate()
|
||||||
|
} else {
|
||||||
|
client, clientErr = connSSH.NewClient()
|
||||||
|
}
|
||||||
|
if clientErr != nil {
|
||||||
|
// 连接主机失败,请检查连接参数后重试
|
||||||
|
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// 创建SSH客户端会话
|
||||||
|
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
|
||||||
|
if err != nil {
|
||||||
|
// 连接主机失败,请检查连接参数后重试
|
||||||
|
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer clientSession.Close()
|
||||||
|
|
||||||
|
// 将 HTTP 连接升级为 WebSocket 连接
|
||||||
|
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
|
||||||
|
if wsConn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer wsConn.Close()
|
||||||
|
|
||||||
|
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
||||||
|
go s.wsService.ClientWriteListen(wsClient)
|
||||||
|
go s.wsService.ClientReadListen(wsClient, s.wsReceiveService.Shell)
|
||||||
|
|
||||||
|
// 实时读取SSH消息直接输出
|
||||||
|
msTicker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer msTicker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ms := <-msTicker.C:
|
||||||
|
outputByte := clientSession.Read()
|
||||||
|
if len(outputByte) > 0 {
|
||||||
|
outputStr := string(outputByte)
|
||||||
|
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
||||||
|
"requestId": fmt.Sprintf("ssh_%s_%d", neHost.HostID, ms.UnixMilli()),
|
||||||
|
"data": outputStr,
|
||||||
|
}))
|
||||||
|
wsClient.MsgChan <- msgByte
|
||||||
|
|
||||||
|
// 退出ssh登录
|
||||||
|
// if strings.LastIndex(outputStr, "logout\r\n") != -1 {
|
||||||
|
// time.Sleep(1 * time.Second)
|
||||||
|
// s.wsService.CloseClient(wsClient.ID)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
case <-wsClient.StopChan: // 等待停止信号
|
||||||
|
s.wsService.ClientClose(wsClient.ID)
|
||||||
|
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
111
src/modules/ws/controller/ws_telnet.go
Normal file
111
src/modules/ws/controller/ws_telnet.go
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"be.ems/src/framework/i18n"
|
||||||
|
"be.ems/src/framework/logger"
|
||||||
|
"be.ems/src/framework/telnet"
|
||||||
|
"be.ems/src/framework/utils/ctx"
|
||||||
|
"be.ems/src/framework/vo/result"
|
||||||
|
neService "be.ems/src/modules/network_element/service"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Telnet 终端
|
||||||
|
//
|
||||||
|
// GET /telnet?hostId=1
|
||||||
|
func (s *WSController) Telnet(c *gin.Context) {
|
||||||
|
language := ctx.AcceptLanguage(c)
|
||||||
|
var query struct {
|
||||||
|
HostId string `form:"hostId" binding:"required"` // 连接主机ID
|
||||||
|
Cols int `form:"cols"` // 终端单行字符数
|
||||||
|
Rows int `form:"rows"` // 终端显示行数
|
||||||
|
}
|
||||||
|
if err := c.ShouldBindQuery(&query); err != nil {
|
||||||
|
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if query.Cols < 120 || query.Cols > 400 {
|
||||||
|
query.Cols = 120
|
||||||
|
}
|
||||||
|
if query.Rows < 128 || query.Rows > 1200 {
|
||||||
|
query.Rows = 128
|
||||||
|
}
|
||||||
|
|
||||||
|
// 登录用户信息
|
||||||
|
loginUser, err := ctx.LoginUser(c)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
neHost := neService.NewNeHostImpl.SelectById(query.HostId)
|
||||||
|
if neHost.HostID != query.HostId || neHost.HostType != "telnet" {
|
||||||
|
// 没有可访问主机信息数据!
|
||||||
|
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建链接Telnet客户端
|
||||||
|
var connTelnet telnet.ConnTelnet
|
||||||
|
neHost.CopyTo(&connTelnet)
|
||||||
|
client, err := connTelnet.NewClient()
|
||||||
|
if err != nil {
|
||||||
|
// 连接主机失败,请检查连接参数后重试
|
||||||
|
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
// 创建Telnet客户端会话
|
||||||
|
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
|
||||||
|
if err != nil {
|
||||||
|
// 连接主机失败,请检查连接参数后重试
|
||||||
|
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer clientSession.Close()
|
||||||
|
|
||||||
|
// 将 HTTP 连接升级为 WebSocket 连接
|
||||||
|
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
|
||||||
|
if wsConn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer wsConn.Close()
|
||||||
|
|
||||||
|
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
||||||
|
go s.wsService.ClientWriteListen(wsClient)
|
||||||
|
go s.wsService.ClientReadListen(wsClient, s.wsReceiveService.Telnet)
|
||||||
|
|
||||||
|
// 实时读取Telnet消息直接输出
|
||||||
|
msTicker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer msTicker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ms := <-msTicker.C:
|
||||||
|
outputByte := clientSession.Read()
|
||||||
|
if len(outputByte) > 0 {
|
||||||
|
outputStr := string(outputByte)
|
||||||
|
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
||||||
|
"requestId": fmt.Sprintf("telnet_%s_%d", neHost.HostID, ms.UnixMilli()),
|
||||||
|
"data": outputStr,
|
||||||
|
}))
|
||||||
|
wsClient.MsgChan <- msgByte
|
||||||
|
|
||||||
|
// 退出telnet登录
|
||||||
|
// if strings.LastIndex(outputStr, "logout\r\n") != -1 {
|
||||||
|
// time.Sleep(1 * time.Second)
|
||||||
|
// s.wsService.CloseClient(wsClient.ID)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
case <-wsClient.StopChan: // 等待停止信号
|
||||||
|
s.wsService.ClientClose(wsClient.ID)
|
||||||
|
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
97
src/modules/ws/controller/ws_view.go
Normal file
97
src/modules/ws/controller/ws_view.go
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"be.ems/src/framework/i18n"
|
||||||
|
"be.ems/src/framework/logger"
|
||||||
|
"be.ems/src/framework/utils/ctx"
|
||||||
|
"be.ems/src/framework/vo/result"
|
||||||
|
neService "be.ems/src/modules/network_element/service"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ShellView 终端交互式文件内容查看
|
||||||
|
//
|
||||||
|
// GET /view
|
||||||
|
func (s *WSController) ShellView(c *gin.Context) {
|
||||||
|
language := ctx.AcceptLanguage(c)
|
||||||
|
var query struct {
|
||||||
|
NeType string `form:"neType" binding:"required"` // 网元类型
|
||||||
|
NeId string `form:"neId" binding:"required"` // 网元标识id
|
||||||
|
Cols int `form:"cols"` // 终端单行字符数
|
||||||
|
Rows int `form:"rows"` // 终端显示行数
|
||||||
|
}
|
||||||
|
if err := c.ShouldBindQuery(&query); err != nil {
|
||||||
|
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if query.Cols < 120 || query.Cols > 400 {
|
||||||
|
query.Cols = 120
|
||||||
|
}
|
||||||
|
if query.Rows < 40 || query.Rows > 1200 {
|
||||||
|
query.Rows = 40
|
||||||
|
}
|
||||||
|
|
||||||
|
// 登录用户信息
|
||||||
|
loginUser, err := ctx.LoginUser(c)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 网元主机的SSH客户端
|
||||||
|
sshClient, err := neService.NewNeInfoImpl.NeRunSSHClient(query.NeType, query.NeId)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(200, result.ErrMsg(err.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer sshClient.Close()
|
||||||
|
// ssh连接会话
|
||||||
|
clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(200, result.ErrMsg("neinfo ssh client session new err"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer clientSession.Close()
|
||||||
|
|
||||||
|
// 将 HTTP 连接升级为 WebSocket 连接
|
||||||
|
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
|
||||||
|
if wsConn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer wsConn.Close()
|
||||||
|
|
||||||
|
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
||||||
|
go s.wsService.ClientWriteListen(wsClient)
|
||||||
|
go s.wsService.ClientReadListen(wsClient, s.wsReceiveService.ShellView)
|
||||||
|
|
||||||
|
// 等待1秒,排空首次消息
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
_ = clientSession.Read()
|
||||||
|
|
||||||
|
// 实时读取SSH消息直接输出
|
||||||
|
msTicker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer msTicker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ms := <-msTicker.C:
|
||||||
|
outputByte := clientSession.Read()
|
||||||
|
if len(outputByte) > 0 {
|
||||||
|
outputStr := string(outputByte)
|
||||||
|
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
||||||
|
"requestId": fmt.Sprintf("view_%d", ms.UnixMilli()),
|
||||||
|
"data": outputStr,
|
||||||
|
}))
|
||||||
|
wsClient.MsgChan <- msgByte
|
||||||
|
}
|
||||||
|
case <-wsClient.StopChan: // 等待停止信号
|
||||||
|
s.wsService.ClientClose(wsClient.ID)
|
||||||
|
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -149,8 +149,8 @@ func (s *WS) ClientClose(clientID string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ClientReadListen 客户端读取消息监听
|
// ClientReadListen 客户端读取消息监听
|
||||||
// receiveType 根据接收类型进行消息处理
|
// receiveFn 接收函数进行消息处理
|
||||||
func (s *WS) ClientReadListen(wsClient *model.WSClient, receiveType int) {
|
func (s *WS) ClientReadListen(wsClient *model.WSClient, receiveFn func(*model.WSClient, model.WSRequest)) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
logger.Errorf("ws ReadMessage Panic Error: %v", err)
|
logger.Errorf("ws ReadMessage Panic Error: %v", err)
|
||||||
@@ -175,21 +175,13 @@ func (s *WS) ClientReadListen(wsClient *model.WSClient, receiveType int) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// 接收器处理
|
// 接收器处理
|
||||||
switch receiveType {
|
go receiveFn(wsClient, reqMsg)
|
||||||
case ReceiveCommont:
|
|
||||||
go NewWSReceive.Commont(wsClient, reqMsg)
|
|
||||||
case ReceiveShell:
|
|
||||||
go NewWSReceive.Shell(wsClient, reqMsg)
|
|
||||||
case ReceiveShellView:
|
|
||||||
go NewWSReceive.ShellView(wsClient, reqMsg)
|
|
||||||
case ReceiveTelnet:
|
|
||||||
go NewWSReceive.Telnet(wsClient, reqMsg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClientWriteListen 客户端写入消息监听
|
// ClientWriteListen 客户端写入消息监听
|
||||||
|
// wsClient.MsgChan <- msgByte 写入消息
|
||||||
func (s *WS) ClientWriteListen(wsClient *model.WSClient) {
|
func (s *WS) ClientWriteListen(wsClient *model.WSClient) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
|
|||||||
@@ -14,13 +14,6 @@ import (
|
|||||||
"be.ems/src/modules/ws/processor"
|
"be.ems/src/modules/ws/processor"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
ReceiveCommont = iota // Commont 接收通用业务处理
|
|
||||||
ReceiveShell // Shell 接收终端交互业务处理
|
|
||||||
ReceiveShellView // ShellView 接收查看文件终端交互业务处理
|
|
||||||
ReceiveTelnet // Telnet 接收终端交互业务处理
|
|
||||||
)
|
|
||||||
|
|
||||||
// 实例化服务层 WSReceive 结构体
|
// 实例化服务层 WSReceive 结构体
|
||||||
var NewWSReceive = &WSReceive{}
|
var NewWSReceive = &WSReceive{}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user