style: 变更ws模块函数实例命名

This commit is contained in:
TsMask
2024-09-24 11:51:46 +08:00
parent 0287852470
commit 67caba4379
11 changed files with 558 additions and 618 deletions

View File

@@ -96,7 +96,7 @@ func (r *TraceTask) CreateUDP() error {
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
}
})
@@ -145,7 +145,7 @@ func (r *TraceTask) CreateUDP() error {
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
}
})
return nil

View File

@@ -21,8 +21,8 @@ import (
// NewWSController 实例化控制层 WSController 结构体
var NewWSController = &WSController{
wsService: service.NewWSImpl,
wsSendService: service.NewWSSendImpl,
wsService: service.NewWS,
wsSendService: service.NewWSSend,
neHostService: neService.NewNeHostImpl,
neInfoService: neService.NewNeInfoImpl,
}
@@ -32,9 +32,9 @@ var NewWSController = &WSController{
// PATH /ws
type WSController struct {
// WebSocket 服务
wsService service.IWS
wsService *service.WS
// WebSocket消息发送 服务
wsSendService service.IWSSend
wsSendService *service.WSSend
// 网元主机连接服务
neHostService neService.INeHost
// 网元信息服务

View File

@@ -1,32 +1,220 @@
package service
import (
"encoding/json"
"net/http"
"sync"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/generate"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"github.com/gorilla/websocket"
)
// IWS WebSocket通信 服务层接口
type IWS interface {
// UpgraderWs http升级ws请求
UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn
var (
wsClients sync.Map // ws客户端 [clientId: client]
wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds]
wsGroup sync.Map // ws组对应的多个客户端id [groupId:clientIds]
)
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient
// NewWS 实例化服务层 WS 结构体
var NewWS = &WS{}
// ClientClose 客户端关闭
ClientClose(clientID string)
// WS WebSocket通信 服务层处理
type WS struct{}
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
ClientReadListen(wsClient *model.WSClient, receiveType int)
// ClientWriteListen 客户端写入消息监听
ClientWriteListen(wsClient *model.WSClient)
// UpgraderWs http升级ws请求
func (s *WS) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn {
wsUpgrader := websocket.Upgrader{
Subprotocols: []string{"omc-ws"},
// 设置消息发送缓冲区大小byte如果这个值设置得太小可能会导致服务端在发送大型消息时遇到问题
WriteBufferSize: 1024,
// 消息包启用压缩
EnableCompression: true,
// ws握手超时时间
HandshakeTimeout: 5 * time.Second,
// ws握手过程中允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("ws Upgrade err: %s", err.Error())
}
return conn
}
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
func (s *WS) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
// clientID也可以用其他方式生成只要能保证在所有服务端中都能保证唯一即可
clientID := generate.Code(16)
wsClient := &model.WSClient{
ID: clientID,
Conn: conn,
LastHeartbeat: time.Now().UnixMilli(),
BindUid: uid,
SubGroup: groupIDs,
MsgChan: make(chan []byte, 100),
StopChan: make(chan struct{}, 1), // 卡死循环标记
ChildConn: childConn,
}
// 存入客户端
wsClients.Store(clientID, wsClient)
// 存入用户持有客户端
if uid != "" {
if v, ok := wsUsers.Load(uid); ok {
uidClientIds := v.(*[]string)
*uidClientIds = append(*uidClientIds, clientID)
} else {
wsUsers.Store(uid, &[]string{clientID})
}
}
// 存入用户订阅组
if uid != "" && len(groupIDs) > 0 {
for _, groupID := range groupIDs {
if v, ok := wsGroup.Load(groupID); ok {
groupClientIds := v.(*[]string)
*groupClientIds = append(*groupClientIds, clientID)
} else {
wsGroup.Store(groupID, &[]string{clientID})
}
}
}
return wsClient
}
// ClientClose 客户端关闭
func (s *WS) ClientClose(clientID string) {
v, ok := wsClients.Load(clientID)
if !ok {
return
}
client := v.(*model.WSClient)
defer func() {
client.MsgChan <- []byte("ws:close")
client.StopChan <- struct{}{}
client.Conn.Close()
wsClients.Delete(clientID)
}()
// 客户端断线时自动踢出Uid绑定列表
if client.BindUid != "" {
if v, ok := wsUsers.Load(client.BindUid); ok {
uidClientIds := v.(*[]string)
if len(*uidClientIds) > 0 {
tempClientIds := make([]string, 0, len(*uidClientIds))
for _, v := range *uidClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*uidClientIds = tempClientIds
}
}
}
// 客户端断线时自动踢出已加入的组
if len(client.SubGroup) > 0 {
for _, groupID := range client.SubGroup {
v, ok := wsGroup.Load(groupID)
if !ok {
continue
}
groupClientIds := v.(*[]string)
if len(*groupClientIds) > 0 {
tempClientIds := make([]string, 0, len(*groupClientIds))
for _, v := range *groupClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*groupClientIds = tempClientIds
}
}
}
}
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
func (s *WS) ClientReadListen(wsClient *model.WSClient, receiveType int) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws ReadMessage Panic Error: %v", err)
}
}()
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
// fmt.Println(messageType, string(msg))
// 文本 只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
if err := json.Unmarshal(msg, &reqMsg); err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format json error"))
wsClient.MsgChan <- msgByte
continue
}
// 接收器处理
switch receiveType {
case ReceiveCommont:
go NewWSReceive.Commont(wsClient, reqMsg)
case ReceiveShell:
go NewWSReceive.Shell(wsClient, reqMsg)
case ReceiveShellView:
go NewWSReceive.ShellView(wsClient, reqMsg)
case ReceiveTelnet:
go NewWSReceive.Telnet(wsClient, reqMsg)
}
}
}
}
// ClientWriteListen 客户端写入消息监听
func (s *WS) ClientWriteListen(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws WriteMessage Panic Error: %v", err)
}
}()
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": wsClient.ID,
}))
wsClient.MsgChan <- msgByte
// 消息发送监听
for msg := range wsClient.MsgChan {
// 关闭句柄
if string(msg) == "ws:close" {
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
wsClient.LastHeartbeat = time.Now().UnixMilli()
}
}

View File

@@ -1,220 +0,0 @@
package service
import (
"encoding/json"
"net/http"
"sync"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/generate"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"github.com/gorilla/websocket"
)
var (
wsClients sync.Map // ws客户端 [clientId: client]
wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds]
wsGroup sync.Map // ws组对应的多个客户端id [groupId:clientIds]
)
// NewWSImpl 实例化服务层 WSImpl 结构体
var NewWSImpl = &WSImpl{}
// WSImpl WebSocket通信 服务层处理
type WSImpl struct{}
// UpgraderWs http升级ws请求
func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn {
wsUpgrader := websocket.Upgrader{
Subprotocols: []string{"omc-ws"},
// 设置消息发送缓冲区大小byte如果这个值设置得太小可能会导致服务端在发送大型消息时遇到问题
WriteBufferSize: 1024,
// 消息包启用压缩
EnableCompression: true,
// ws握手超时时间
HandshakeTimeout: 5 * time.Second,
// ws握手过程中允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("ws Upgrade err: %s", err.Error())
}
return conn
}
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
func (s *WSImpl) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
// clientID也可以用其他方式生成只要能保证在所有服务端中都能保证唯一即可
clientID := generate.Code(16)
wsClient := &model.WSClient{
ID: clientID,
Conn: conn,
LastHeartbeat: time.Now().UnixMilli(),
BindUid: uid,
SubGroup: groupIDs,
MsgChan: make(chan []byte, 100),
StopChan: make(chan struct{}, 1), // 卡死循环标记
ChildConn: childConn,
}
// 存入客户端
wsClients.Store(clientID, wsClient)
// 存入用户持有客户端
if uid != "" {
if v, ok := wsUsers.Load(uid); ok {
uidClientIds := v.(*[]string)
*uidClientIds = append(*uidClientIds, clientID)
} else {
wsUsers.Store(uid, &[]string{clientID})
}
}
// 存入用户订阅组
if uid != "" && len(groupIDs) > 0 {
for _, groupID := range groupIDs {
if v, ok := wsGroup.Load(groupID); ok {
groupClientIds := v.(*[]string)
*groupClientIds = append(*groupClientIds, clientID)
} else {
wsGroup.Store(groupID, &[]string{clientID})
}
}
}
return wsClient
}
// ClientClose 客户端关闭
func (s *WSImpl) ClientClose(clientID string) {
v, ok := wsClients.Load(clientID)
if !ok {
return
}
client := v.(*model.WSClient)
defer func() {
client.MsgChan <- []byte("ws:close")
client.StopChan <- struct{}{}
client.Conn.Close()
wsClients.Delete(clientID)
}()
// 客户端断线时自动踢出Uid绑定列表
if client.BindUid != "" {
if v, ok := wsUsers.Load(client.BindUid); ok {
uidClientIds := v.(*[]string)
if len(*uidClientIds) > 0 {
tempClientIds := make([]string, 0, len(*uidClientIds))
for _, v := range *uidClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*uidClientIds = tempClientIds
}
}
}
// 客户端断线时自动踢出已加入的组
if len(client.SubGroup) > 0 {
for _, groupID := range client.SubGroup {
v, ok := wsGroup.Load(groupID)
if !ok {
continue
}
groupClientIds := v.(*[]string)
if len(*groupClientIds) > 0 {
tempClientIds := make([]string, 0, len(*groupClientIds))
for _, v := range *groupClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*groupClientIds = tempClientIds
}
}
}
}
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
func (s *WSImpl) ClientReadListen(wsClient *model.WSClient, receiveType int) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws ReadMessage Panic Error: %v", err)
}
}()
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
// fmt.Println(messageType, string(msg))
// 文本 只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
if err := json.Unmarshal(msg, &reqMsg); err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format json error"))
wsClient.MsgChan <- msgByte
continue
}
// 接收器处理
switch receiveType {
case ReceiveCommont:
go NewWSReceiveImpl.Commont(wsClient, reqMsg)
case ReceiveShell:
go NewWSReceiveImpl.Shell(wsClient, reqMsg)
case ReceiveShellView:
go NewWSReceiveImpl.ShellView(wsClient, reqMsg)
case ReceiveTelnet:
go NewWSReceiveImpl.Telnet(wsClient, reqMsg)
}
}
}
}
// ClientWriteListen 客户端写入消息监听
func (s *WSImpl) ClientWriteListen(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws WriteMessage Panic Error: %v", err)
}
}()
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": wsClient.ID,
}))
wsClient.MsgChan <- msgByte
// 消息发送监听
for msg := range wsClient.MsgChan {
// 关闭句柄
if string(msg) == "ws:close" {
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
wsClient.LastHeartbeat = time.Now().UnixMilli()
}
}

View File

@@ -1,6 +1,18 @@
package service
import "be.ems/src/modules/ws/model"
import (
"encoding/json"
"fmt"
"io"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/telnet"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"be.ems/src/modules/ws/processor"
)
const (
ReceiveCommont = iota // Commont 接收通用业务处理
@@ -9,17 +21,245 @@ const (
ReceiveTelnet // Telnet 接收终端交互业务处理
)
// IWSReceive WebSocket消息接收处理 服务层接口
type IWSReceive interface {
// Commont 接收通用业务处理
Commont(client *model.WSClient, reqMsg model.WSRequest)
// 实例化服务层 WSReceive 结构体
var NewWSReceive = &WSReceive{}
// Shell 接收终端交互业务处理
Shell(client *model.WSClient, reqMsg model.WSRequest)
// WSReceive WebSocket消息接收处理 服务层处理
type WSReceive struct{}
// ShellView 接收查看文件终端交互业务处理
ShellView(client *model.WSClient, reqMsg model.WSRequest)
// Telnet 接收终端交互业务处理
Telnet(client *model.WSClient, reqMsg model.WSRequest)
// close 关闭服务连接
func (s *WSReceive) close(client *model.WSClient) {
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
NewWS.ClientClose(client.ID)
}
// Commont 接收通用业务处理
func (s *WSReceive) Commont(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ps":
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
case "net":
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
case "ims_cdr":
resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data)
case "smf_cdr":
resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data)
case "smsc_cdr":
resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data)
case "amf_ue":
resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data)
case "mme_ue":
resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data)
case "upf_tf":
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
case "ne_state":
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Shell 接收终端交互业务处理
func (s *WSReceive) Shell(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ssh":
// SSH会话消息接收写入会话
command := reqMsg.Data.(string)
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
case "ssh_resize":
// SSH会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// ShellView 接收查看文件终端交互业务处理
func (s *WSReceive) ShellView(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws ShellView UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "cat", "tail":
var command string
if reqMsg.Type == "cat" {
command, err = processor.ParseCat(reqMsg.Data)
}
if reqMsg.Type == "tail" {
command, err = processor.ParseTail(reqMsg.Data)
}
if command != "" && err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ctrl-c":
// 模拟按下 Ctrl+C
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write("\u0003\n")
case "resize":
// 会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws ShellView UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Telnet 接收终端交互业务处理
func (s *WSReceive) Telnet(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "telnet":
// Telnet会话消息接收写入会话
command := reqMsg.Data.(string)
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
_, err = telnetClientSession.Write(command)
case "telnet_resize":
// Telnet会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
// telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
// _ = telnetClientSession.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}

View File

@@ -1,258 +0,0 @@
package service
import (
"encoding/json"
"fmt"
"io"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/telnet"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"be.ems/src/modules/ws/processor"
)
// 实例化服务层 WSReceiveImpl 结构体
var NewWSReceiveImpl = &WSReceiveImpl{}
// WSReceiveImpl WebSocket消息接收处理 服务层处理
type WSReceiveImpl struct{}
// Commont 接收通用业务处理
func (s *WSReceiveImpl) close(client *model.WSClient) {
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
NewWSImpl.ClientClose(client.ID)
}
// Commont 接收通用业务处理
func (s *WSReceiveImpl) Commont(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ps":
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
case "net":
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
case "ims_cdr":
resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data)
case "smf_cdr":
resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data)
case "smsc_cdr":
resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data)
case "amf_ue":
resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data)
case "mme_ue":
resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data)
case "upf_tf":
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
case "ne_state":
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Shell 接收终端交互业务处理
func (s *WSReceiveImpl) Shell(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ssh":
// SSH会话消息接收写入会话
command := reqMsg.Data.(string)
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
case "ssh_resize":
// SSH会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// ShellView 接收查看文件终端交互业务处理
func (s *WSReceiveImpl) ShellView(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws ShellView UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "cat", "tail":
var command string
if reqMsg.Type == "cat" {
command, err = processor.ParseCat(reqMsg.Data)
}
if reqMsg.Type == "tail" {
command, err = processor.ParseTail(reqMsg.Data)
}
if command != "" && err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ctrl-c":
// 模拟按下 Ctrl+C
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write("\u0003\n")
case "resize":
// 会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws ShellView UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Telnet 接收终端交互业务处理
func (s *WSReceiveImpl) Telnet(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "telnet":
// Telnet会话消息接收写入会话
command := reqMsg.Data.(string)
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
_, err = telnetClientSession.Write(command)
case "telnet_resize":
// Telnet会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
// telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
// _ = telnetClientSession.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}

View File

@@ -1,10 +1,87 @@
package service
// IWSSend WebSocket消息发送处理 服务层接口
type IWSSend interface {
// ByClientID 给已知客户端发消息
ByClientID(clientID string, data any) error
import (
"encoding/json"
"fmt"
// ByGroupID 给订阅组的客户端发送消息
ByGroupID(gid string, data any) error
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
)
// 订阅组指定编号为支持服务器向客户端主动推送数据
const (
// 组号-其他
GROUP_OTHER = "0"
// 组号-跟踪任务数据变更 2_traceId
GROUP_TRACE = "2_"
// 组号-指标通用 10_neType_neId
GROUP_KPI = "10_"
// 组号-指标UPF 12_neId
GROUP_KPI_UPF = "12_"
// 组号-自定义KPI指标20_neType_neId
GROUP_KPI_C = "20_"
// 组号-IMS_CDR会话事件 1005_neId
GROUP_IMS_CDR = "1005_"
// 组号-SMF_CDR会话事件 1006_neId
GROUP_SMF_CDR = "1006_"
// 组号-SMSC_CDR会话事件 1007_neId
GROUP_SMSC_CDR = "1007_"
// 组号-AMF_UE会话事件
GROUP_AMF_UE = "1010"
// 组号-MME_UE会话事件 1011_neId
GROUP_MME_UE = "1011_"
)
// 实例化服务层 WSSend 结构体
var NewWSSend = &WSSend{}
// WSSend WebSocket消息发送处理 服务层处理
type WSSend struct{}
// ByClientID 给已知客户端发消息
func (s *WSSend) ByClientID(clientID string, data any) error {
v, ok := wsClients.Load(clientID)
if !ok {
return fmt.Errorf("no fount client ID: %s", clientID)
}
dataByte, err := json.Marshal(result.OkData(data))
if err != nil {
return err
}
client := v.(*model.WSClient)
if len(client.MsgChan) > 90 {
NewWS.ClientClose(client.ID)
return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID)
}
client.MsgChan <- dataByte
return nil
}
// ByGroupID 给订阅组的客户端发送消息
func (s *WSSend) ByGroupID(groupID string, data any) error {
clientIds, ok := wsGroup.Load(groupID)
if !ok {
return fmt.Errorf("no fount Group ID: %s", groupID)
}
// 检查组内是否有客户端
ids := clientIds.(*[]string)
if len(*ids) == 0 {
return fmt.Errorf("no members in the group")
}
// 遍历给客户端发消息
for _, clientId := range *ids {
err := s.ByClientID(clientId, map[string]any{
"groupId": groupID,
"data": data,
})
if err != nil {
continue
}
}
return nil
}

View File

@@ -1,87 +0,0 @@
package service
import (
"encoding/json"
"fmt"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
)
// 订阅组指定编号为支持服务器向客户端主动推送数据
const (
// 组号-其他
GROUP_OTHER = "0"
// 组号-跟踪任务数据变更 2_traceId
GROUP_TRACE = "2_"
// 组号-指标通用 10_neType_neId
GROUP_KPI = "10_"
// 组号-指标UPF 12_neId
GROUP_KPI_UPF = "12_"
// 组号-自定义KPI指标20_neType_neId
GROUP_KPI_C = "20_"
// 组号-IMS_CDR会话事件 1005_neId
GROUP_IMS_CDR = "1005_"
// 组号-SMF_CDR会话事件 1006_neId
GROUP_SMF_CDR = "1006_"
// 组号-SMSC_CDR会话事件 1007_neId
GROUP_SMSC_CDR = "1007_"
// 组号-AMF_UE会话事件
GROUP_AMF_UE = "1010"
// 组号-MME_UE会话事件 1011_neId
GROUP_MME_UE = "1011_"
)
// 实例化服务层 WSSendImpl 结构体
var NewWSSendImpl = &WSSendImpl{}
// IWSSend WebSocket消息发送处理 服务层处理
type WSSendImpl struct{}
// ByClientID 给已知客户端发消息
func (s *WSSendImpl) ByClientID(clientID string, data any) error {
v, ok := wsClients.Load(clientID)
if !ok {
return fmt.Errorf("no fount client ID: %s", clientID)
}
dataByte, err := json.Marshal(result.OkData(data))
if err != nil {
return err
}
client := v.(*model.WSClient)
if len(client.MsgChan) > 90 {
NewWSImpl.ClientClose(client.ID)
return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID)
}
client.MsgChan <- dataByte
return nil
}
// ByGroupID 给订阅组的客户端发送消息
func (s *WSSendImpl) ByGroupID(groupID string, data any) error {
clientIds, ok := wsGroup.Load(groupID)
if !ok {
return fmt.Errorf("no fount Group ID: %s", groupID)
}
// 检查组内是否有客户端
ids := clientIds.(*[]string)
if len(*ids) == 0 {
return fmt.Errorf("no members in the group")
}
// 遍历给客户端发消息
for _, clientId := range *ids {
err := s.ByClientID(clientId, map[string]any{
"groupId": groupID,
"data": data,
})
if err != nil {
continue
}
}
return nil
}