Files
be.ems/src/modules/ws/controller/ws.go

404 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package controller
import (
"encoding/json"
"fmt"
"strings"
"time"
neService "be.ems/src/modules/network_element/service"
"be.ems/src/framework/i18n"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/ctx"
"be.ems/src/framework/utils/parse"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/utils/telnet"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/service"
"github.com/gin-gonic/gin"
)
// NewWSController 实例化控制层 WSController 结构体
var NewWSController = &WSController{
wsService: service.NewWSImpl,
wsSendService: service.NewWSSendImpl,
neHostService: neService.NewNeHostImpl,
neInfoService: neService.NewNeInfoImpl,
}
// WSController WebSocket通信
//
// PATH /ws
type WSController struct {
// WebSocket 服务
wsService service.IWS
// WebSocket消息发送 服务
wsSendService service.IWSSend
// 网元主机连接服务
neHostService neService.INeHost
// 网元信息服务
neInfoService neService.INeInfo
}
// WS 通用
//
// GET /?subGroupIDs=0
func (s *WSController) WS(c *gin.Context) {
language := ctx.AcceptLanguage(c)
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
// 订阅消息组
var subGroupIDs []string
subGroupIDStr := c.Query("subGroupID")
if subGroupIDStr != "" {
// 处理字符转id数组后去重
ids := strings.Split(subGroupIDStr, ",")
uniqueIDs := parse.RemoveDuplicates(ids)
if len(uniqueIDs) > 0 {
subGroupIDs = uniqueIDs
}
}
// 将 HTTP 连接升级为 WebSocket 连接
conn := s.wsService.UpgraderWs(c.Writer, c.Request)
if conn == nil {
return
}
defer conn.Close()
wsClient := s.wsService.ClientCreate(loginUser.UserID, subGroupIDs, conn, nil)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveCommont)
// 等待停止信号
for value := range wsClient.StopChan {
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
return
}
}
// Test 测试
//
// GET /test?clientId=xxx&groupID=xxx
func (s *WSController) Test(c *gin.Context) {
language := ctx.AcceptLanguage(c)
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
errMsgArr := []string{}
clientId := c.Query("clientId")
if clientId != "" {
err := s.wsSendService.ByClientID(c.Query("clientId"), loginUser)
if err != nil {
errMsgArr = append(errMsgArr, "clientId: "+err.Error())
}
}
groupID := c.Query("groupID")
if groupID != "" {
err := s.wsSendService.ByGroupID(c.Query("groupID"), loginUser)
if err != nil {
errMsgArr = append(errMsgArr, "groupID: "+err.Error())
}
}
c.JSON(200, result.OkData(errMsgArr))
}
// SSH 终端
//
// GET /ssh?hostId=1&cols=80&rows=40
func (s *WSController) SSH(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var query struct {
HostId string `form:"hostId" binding:"required"` // 连接主机ID
Cols int `form:"cols"` // 终端单行字符数
Rows int `form:"rows"` // 终端显示行数
}
if err := c.ShouldBindQuery(&query); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if query.Cols < 80 || query.Cols > 400 {
query.Cols = 80
}
if query.Rows < 40 || query.Rows > 1200 {
query.Rows = 40
}
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
neHost := s.neHostService.SelectById(query.HostId)
if neHost.HostID != query.HostId || neHost.HostType != "ssh" {
// 没有可访问主机信息数据!
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
return
}
// 创建链接SSH客户端
var connSSH ssh.ConnSSH
neHost.CopyTo(&connSSH)
var client *ssh.ConnSSH
var clientErr error
if neHost.AuthMode == "2" {
client, clientErr = connSSH.NewClientByLocalPrivate()
} else {
client, clientErr = connSSH.NewClient()
}
if clientErr != nil {
// 连接主机失败,请检查连接参数后重试
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
return
}
defer client.Close()
// 创建SSH客户端会话
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
if err != nil {
// 连接主机失败,请检查连接参数后重试
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
return
}
defer clientSession.Close()
// 将 HTTP 连接升级为 WebSocket 连接
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
if wsConn == nil {
return
}
defer wsConn.Close()
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveShell)
// 实时读取SSH消息直接输出
msTicker := time.NewTicker(100 * time.Millisecond)
defer msTicker.Stop()
for {
select {
case ms := <-msTicker.C:
outputByte := clientSession.Read()
if len(outputByte) > 0 {
outputStr := string(outputByte)
msgByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": fmt.Sprintf("ssh_%s_%d", neHost.HostID, ms.UnixMilli()),
"data": outputStr,
}))
wsClient.MsgChan <- msgByte
// 退出ssh登录
// if strings.LastIndex(outputStr, "logout\r\n") != -1 {
// time.Sleep(1 * time.Second)
// s.wsService.CloseClient(wsClient.ID)
// return
// }
}
case <-wsClient.StopChan: // 等待停止信号
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
return
}
}
}
// Telnet 终端
//
// GET /telnet?hostId=1
func (s *WSController) Telnet(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var query struct {
HostId string `form:"hostId" binding:"required"` // 连接主机ID
Cols int `form:"cols"` // 终端单行字符数
Rows int `form:"rows"` // 终端显示行数
}
if err := c.ShouldBindQuery(&query); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if query.Cols < 120 || query.Cols > 400 {
query.Cols = 120
}
if query.Rows < 128 || query.Rows > 1200 {
query.Rows = 128
}
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
neHost := s.neHostService.SelectById(query.HostId)
if neHost.HostID != query.HostId || neHost.HostType != "telnet" {
// 没有可访问主机信息数据!
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
return
}
// 创建链接Telnet客户端
var connTelnet telnet.ConnTelnet
neHost.CopyTo(&connTelnet)
client, err := connTelnet.NewClient()
if err != nil {
// 连接主机失败,请检查连接参数后重试
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
return
}
defer client.Close()
// 创建Telnet客户端会话
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
if err != nil {
// 连接主机失败,请检查连接参数后重试
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
return
}
defer clientSession.Close()
// 将 HTTP 连接升级为 WebSocket 连接
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
if wsConn == nil {
return
}
defer wsConn.Close()
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveTelnet)
// 等待1秒排空首次消息
time.Sleep(1 * time.Second)
_ = clientSession.Read()
// 实时读取Telnet消息直接输出
msTicker := time.NewTicker(100 * time.Millisecond)
defer msTicker.Stop()
for {
select {
case ms := <-msTicker.C:
outputByte := clientSession.Read()
if len(outputByte) > 0 {
outputStr := strings.TrimRight(string(outputByte), "\x00")
msgByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": fmt.Sprintf("telnet_%s_%d", neHost.HostID, ms.UnixMilli()),
"data": outputStr,
}))
wsClient.MsgChan <- msgByte
// 退出telnet登录
// if strings.LastIndex(outputStr, "logout\r\n") != -1 {
// time.Sleep(1 * time.Second)
// s.wsService.CloseClient(wsClient.ID)
// return
// }
}
case <-wsClient.StopChan: // 等待停止信号
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
return
}
}
}
// ShellView 终端交互式文件内容查看
//
// GET /view
func (s *WSController) ShellView(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var query struct {
NeType string `form:"neType" binding:"required"`
NeId string `form:"neId" binding:"required"`
Cols int `form:"cols"` // 终端单行字符数
Rows int `form:"rows"` // 终端显示行数
}
if err := c.ShouldBindQuery(&query); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if query.Cols < 120 || query.Cols > 400 {
query.Cols = 120
}
if query.Rows < 40 || query.Rows > 1200 {
query.Rows = 40
}
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
// 网元主机的SSH客户端
sshClient, err := s.neInfoService.NeRunSSHClient(query.NeType, query.NeId)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
defer sshClient.Close()
// ssh连接会话
clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows)
if err != nil {
c.JSON(200, result.ErrMsg("neinfo ssh client session new err"))
return
}
defer clientSession.Close()
// 将 HTTP 连接升级为 WebSocket 连接
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
if wsConn == nil {
return
}
defer wsConn.Close()
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, service.ReceiveShellView)
// 等待1秒排空首次消息
time.Sleep(1 * time.Second)
_ = clientSession.Read()
// 实时读取SSH消息直接输出
msTicker := time.NewTicker(100 * time.Millisecond)
defer msTicker.Stop()
for {
select {
case ms := <-msTicker.C:
outputByte := clientSession.Read()
if len(outputByte) > 0 {
outputStr := string(outputByte)
msgByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": fmt.Sprintf("view_%d", ms.UnixMilli()),
"data": outputStr,
}))
wsClient.MsgChan <- msgByte
}
case <-wsClient.StopChan: // 等待停止信号
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
return
}
}
}