feat: ws模块新增ssh和telnet类型连接
This commit is contained in:
@@ -12,9 +12,14 @@ type IWS interface {
|
||||
// UpgraderWs http升级ws请求
|
||||
UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn
|
||||
|
||||
// NewClient 新建客户端 uid 登录用户ID
|
||||
NewClient(uid string, gids []string, conn *websocket.Conn) *model.WSClient
|
||||
// NewClient 新建客户端
|
||||
//
|
||||
// uid 登录用户ID
|
||||
// groupIDs 用户订阅组
|
||||
// conn ws连接实例
|
||||
// childConn 子连接实例
|
||||
NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient
|
||||
|
||||
// CloseClient 客户端关闭
|
||||
// CloseClient 关闭客户端
|
||||
CloseClient(clientID string)
|
||||
}
|
||||
|
||||
@@ -50,8 +50,13 @@ func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.C
|
||||
return conn
|
||||
}
|
||||
|
||||
// NewClient 新建客户端 uid 登录用户ID
|
||||
func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) *model.WSClient {
|
||||
// NewClient 新建客户端
|
||||
//
|
||||
// uid 登录用户ID
|
||||
// groupIDs 用户订阅组
|
||||
// conn ws连接实例
|
||||
// childConn 子连接实例
|
||||
func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
|
||||
// clientID也可以用其他方式生成,只要能保证在所有服务端中都能保证唯一即可
|
||||
clientID := generate.Code(16)
|
||||
|
||||
@@ -63,6 +68,7 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn)
|
||||
SubGroup: groupIDs,
|
||||
MsgChan: make(chan []byte, 100),
|
||||
StopChan: make(chan struct{}, 1), // 卡死循环标记
|
||||
ChildConn: childConn,
|
||||
}
|
||||
|
||||
// 存入客户端
|
||||
@@ -127,17 +133,18 @@ func (s *WSImpl) clientRead(wsClient *model.WSClient) {
|
||||
s.CloseClient(wsClient.ID)
|
||||
return
|
||||
}
|
||||
// fmt.Println(messageType, string(msg))
|
||||
|
||||
// 文本和二进制类型,只处理文本json
|
||||
if messageType == websocket.TextMessage {
|
||||
var reqMsg model.WSRequest
|
||||
err := json.Unmarshal(msg, &reqMsg)
|
||||
// fmt.Println(messageType, string(msg))
|
||||
if err != nil {
|
||||
msgByte, _ := json.Marshal(result.ErrMsg("message format not supported"))
|
||||
wsClient.MsgChan <- msgByte
|
||||
} else {
|
||||
go NewWSReceiveImpl.Receive(wsClient, reqMsg)
|
||||
// 协程异步处理
|
||||
go NewWSReceiveImpl.AsyncReceive(wsClient, reqMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -173,8 +180,8 @@ func (s *WSImpl) CloseClient(clientID string) {
|
||||
defer func() {
|
||||
client.Conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
client.Conn.Close()
|
||||
client.StopChan <- struct{}{}
|
||||
WsClients.Delete(clientID)
|
||||
client.StopChan <- struct{}{}
|
||||
}()
|
||||
|
||||
// 客户端断线时自动踢出Uid绑定列表
|
||||
|
||||
@@ -4,6 +4,6 @@ import "ems.agt/src/modules/ws/model"
|
||||
|
||||
// IWSReceive WebSocket消息接收处理 服务层接口
|
||||
type IWSReceive interface {
|
||||
// Receive 接收处理
|
||||
Receive(client *model.WSClient, reqMsg model.WSRequest) error
|
||||
// AsyncReceive 接收业务异步处理
|
||||
AsyncReceive(client *model.WSClient, reqMsg model.WSRequest)
|
||||
}
|
||||
|
||||
@@ -3,8 +3,11 @@ package service
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"ems.agt/src/framework/logger"
|
||||
"ems.agt/src/framework/utils/ssh"
|
||||
"ems.agt/src/framework/utils/telnet"
|
||||
"ems.agt/src/framework/vo/result"
|
||||
"ems.agt/src/modules/ws/model"
|
||||
"ems.agt/src/modules/ws/processor"
|
||||
@@ -16,11 +19,12 @@ var NewWSReceiveImpl = &WSReceiveImpl{}
|
||||
// WSReceiveImpl WebSocket消息接收处理 服务层处理
|
||||
type WSReceiveImpl struct{}
|
||||
|
||||
// Receive 接收处理
|
||||
func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
// AsyncReceive 接收业务异步处理
|
||||
func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
// 必传requestId确认消息
|
||||
if reqMsg.RequestID == "" {
|
||||
msg := "message requestId is required"
|
||||
logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, msg)
|
||||
logger.Infof("ws AsyncReceive UID %s err: %s", client.BindUid, msg)
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(msg))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
@@ -30,6 +34,35 @@ func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest)
|
||||
var err error
|
||||
|
||||
switch reqMsg.Type {
|
||||
case "close":
|
||||
// 主动关闭
|
||||
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
|
||||
client.MsgChan <- resultByte
|
||||
// 等待1s后关闭连接
|
||||
time.Sleep(1 * time.Second)
|
||||
client.StopChan <- struct{}{}
|
||||
case "ssh":
|
||||
// SSH会话消息接收直接写入会话
|
||||
command := reqMsg.Data.(string)
|
||||
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
|
||||
_, err = sshClientSession.Write(command)
|
||||
case "ssh_resize":
|
||||
// SSH会话窗口重置
|
||||
msgByte, _ := json.Marshal(reqMsg.Data)
|
||||
var data struct {
|
||||
Cols int `json:"cols"`
|
||||
Rows int `json:"rows"`
|
||||
}
|
||||
err = json.Unmarshal(msgByte, &data)
|
||||
if err == nil {
|
||||
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
|
||||
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
|
||||
}
|
||||
case "telnet":
|
||||
// Telnet会话消息接收直接写入会话
|
||||
command := reqMsg.Data.(string)
|
||||
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
|
||||
_, err = telnetClientSession.Write(command)
|
||||
case "ps":
|
||||
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
|
||||
case "net":
|
||||
@@ -47,10 +80,12 @@ func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, err.Error())
|
||||
logger.Warnf("ws AsyncReceive UID %s err: %s", client.BindUid, err.Error())
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
}
|
||||
client.MsgChan <- resByte
|
||||
if len(resByte) > 0 {
|
||||
client.MsgChan <- resByte
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user