feat: ws 请求消息体判断reqid和和结构序列化异常信息

This commit is contained in:
TsMask
2024-01-23 20:00:05 +08:00
parent ec9a30d78c
commit aba5e48005
4 changed files with 55 additions and 38 deletions

View File

@@ -15,6 +15,7 @@ type WSClient struct {
// WSRequest ws消息接收 // WSRequest ws消息接收
type WSRequest struct { type WSRequest struct {
Type string `json:"type"` RequestID string `json:"requestId"` // 请求ID
Data any `json:"data"` Type string `json:"type"` // 业务类型
Data any `json:"data"` // 查询结构
} }

View File

@@ -2,26 +2,32 @@ package processor
import ( import (
"encoding/json" "encoding/json"
"fmt"
"strings" "strings"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model" "ems.agt/src/modules/ws/model"
"github.com/shirou/gopsutil/v3/net" "github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process" "github.com/shirou/gopsutil/v3/process"
) )
// GetNetConnections 获取网络连接进程 // GetNetConnections 获取网络连接进程
func GetNetConnections(data any) ([]byte, error) { func GetNetConnections(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data) msgByte, _ := json.Marshal(data)
var query model.NetConnectQuery var query model.NetConnectQuery
err := json.Unmarshal(msgByte, &query) err := json.Unmarshal(msgByte, &query)
if err != nil { if err != nil {
return nil, err logger.Warnf("ws processor GetNetConnections err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
} }
result := []model.NetConnectData{} dataArr := []model.NetConnectData{}
for _, netType := range [...]string{"tcp", "udp"} { for _, netType := range [...]string{"tcp", "udp"} {
connections, _ := net.Connections(netType) connections, err := net.Connections(netType)
if err == nil { if err != nil {
continue
}
for _, conn := range connections { for _, conn := range connections {
if query.ProcessID > 0 && query.ProcessID != conn.Pid { if query.ProcessID > 0 && query.ProcessID != conn.Pid {
continue continue
@@ -35,7 +41,7 @@ func GetNetConnections(data any) ([]byte, error) {
if query.Port > 0 && query.Port != int32(conn.Laddr.Port) && query.Port != int32(conn.Raddr.Port) { if query.Port > 0 && query.Port != int32(conn.Laddr.Port) && query.Port != int32(conn.Raddr.Port) {
continue continue
} }
result = append(result, model.NetConnectData{ dataArr = append(dataArr, model.NetConnectData{
Type: netType, Type: netType,
Status: conn.Status, Status: conn.Status,
Laddr: conn.Laddr, Laddr: conn.Laddr,
@@ -44,10 +50,12 @@ func GetNetConnections(data any) ([]byte, error) {
Name: name, Name: name,
}) })
} }
}
}
} resultByte, err := json.Marshal(result.Ok(map[string]any{
} "requestID": requestID,
} "data": dataArr,
resultByte, err := json.Marshal(result) }))
return resultByte, err return resultByte, err
} }

View File

@@ -7,19 +7,22 @@ import (
"strings" "strings"
"sync" "sync"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/utils/date" "ems.agt/src/framework/utils/date"
"ems.agt/src/framework/utils/parse" "ems.agt/src/framework/utils/parse"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model" "ems.agt/src/modules/ws/model"
"github.com/shirou/gopsutil/v3/process" "github.com/shirou/gopsutil/v3/process"
) )
// GetProcessData 获取进程数据 // GetProcessData 获取进程数据
func GetProcessData(data any) ([]byte, error) { func GetProcessData(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data) msgByte, _ := json.Marshal(data)
var query model.PsProcessQuery var query model.PsProcessQuery
err := json.Unmarshal(msgByte, &query) err := json.Unmarshal(msgByte, &query)
if err != nil { if err != nil {
return nil, err logger.Warnf("ws processor GetNetConnections err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
} }
var processes []*process.Process var processes []*process.Process
@@ -29,7 +32,7 @@ func GetProcessData(data any) ([]byte, error) {
} }
var ( var (
result = []model.PsProcessData{} dataArr = []model.PsProcessData{}
resultMutex sync.Mutex resultMutex sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
numWorkers = 4 numWorkers = 4
@@ -104,7 +107,7 @@ func GetProcessData(data any) ([]byte, error) {
procData.Envs, _ = proc.Environ() procData.Envs, _ = proc.Environ()
resultMutex.Lock() resultMutex.Lock()
result = append(result, procData) dataArr = append(dataArr, procData)
resultMutex.Unlock() resultMutex.Unlock()
} }
@@ -127,10 +130,13 @@ func GetProcessData(data any) ([]byte, error) {
wg.Wait() wg.Wait()
sort.Slice(result, func(i, j int) bool { sort.Slice(dataArr, func(i, j int) bool {
return result[i].PID < result[j].PID return dataArr[i].PID < dataArr[j].PID
}) })
resultByte, err := json.Marshal(result) resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestID": requestID,
"data": dataArr,
}))
return resultByte, err return resultByte, err
} }

View File

@@ -15,16 +15,18 @@ type WSReceiveImpl struct{}
// Receive 接收处理 // Receive 接收处理
func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) error { func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) error {
fmt.Println(client.ID, reqMsg) if reqMsg.RequestID == "" {
return fmt.Errorf("message requestId is required")
}
switch reqMsg.Type { switch reqMsg.Type {
case "ps": case "ps":
res, err := processor.GetProcessData(reqMsg.Data) res, err := processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
if err != nil { if err != nil {
return err return err
} }
client.MsgChan <- res client.MsgChan <- res
case "net": case "net":
res, err := processor.GetNetConnections(reqMsg.Data) res, err := processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
if err != nil { if err != nil {
return err return err
} }