diff --git a/src/modules/tool/controller/iperf.go b/src/modules/tool/controller/iperf.go index 57fe7cc8..c908d584 100644 --- a/src/modules/tool/controller/iperf.go +++ b/src/modules/tool/controller/iperf.go @@ -1,29 +1,34 @@ package controller -// https://iperf.fr/iperf-download.php -// https://launchpad.net/ubuntu/jammy/amd64/libsctp1/1.0.19+dfsg-1build1 - import ( + "encoding/json" + "fmt" "strings" + "time" "be.ems/src/framework/i18n" + "be.ems/src/framework/logger" "be.ems/src/framework/utils/ctx" "be.ems/src/framework/vo/result" + neService "be.ems/src/modules/network_element/service" "be.ems/src/modules/tool/service" + wsService "be.ems/src/modules/ws/service" + "github.com/gin-gonic/gin" ) // 实例化控制层 IPerfController 结构体 var NewIPerf = &IPerfController{ iperfService: service.NewIPerf, + wsService: wsService.NewWS, } -// iperf 网络性能测试工具 +// iperf 网络性能测试工具 https://iperf.fr/iperf-download.php // // PATH /tool/iperf type IPerfController struct { - // IPerf3 网络性能测试工具服务 - iperfService *service.IPerf + iperfService *service.IPerf // IPerf3 网络性能测试工具服务 + wsService *wsService.WS // WebSocket 服务 } // iperf 版本信息 @@ -51,7 +56,7 @@ func (s *IPerfController) Version(c *gin.Context) { // iperf 软件安装 // -// GET /i +// POST /i func (s *IPerfController) Install(c *gin.Context) { language := ctx.AcceptLanguage(c) var body struct { @@ -67,5 +72,81 @@ func (s *IPerfController) Install(c *gin.Context) { c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error()))) return } - c.JSON(200, result.OkData(nil)) + c.JSON(200, result.Ok(nil)) +} + +// iperf 软件运行 +// +// GET /run +func (s *IPerfController) Run(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var query struct { + NeType string `form:"neType" binding:"required"` // 网元类型 + NeId string `form:"neId" binding:"required"` // 网元标识id + Cols int `form:"cols"` // 终端单行字符数 + Rows int `form:"rows"` // 终端显示行数 + } + if err := c.ShouldBindQuery(&query); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 登录用户信息 + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + // 网元主机的SSH客户端 + sshClient, err := neService.NewNeInfoImpl.NeRunSSHClient(query.NeType, query.NeId) + if err != nil { + c.JSON(200, result.ErrMsg(err.Error())) + return + } + defer sshClient.Close() + // ssh连接会话 + clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows) + if err != nil { + c.JSON(200, result.ErrMsg("neinfo ssh client session new err")) + return + } + defer clientSession.Close() + + // 将 HTTP 连接升级为 WebSocket 连接 + wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) + if wsConn == nil { + return + } + defer wsConn.Close() + + wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession) + go s.wsService.ClientWriteListen(wsClient) + go s.wsService.ClientReadListen(wsClient, s.iperfService.Run) + + // 等待1秒,排空首次消息 + time.Sleep(1 * time.Second) + _ = clientSession.Read() + + // 实时读取Run消息直接输出 + msTicker := time.NewTicker(100 * time.Millisecond) + defer msTicker.Stop() + for { + select { + case ms := <-msTicker.C: + outputByte := clientSession.Read() + if len(outputByte) > 0 { + outputStr := string(outputByte) + msgByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": fmt.Sprintf("iperf3_%d", ms.UnixMilli()), + "data": outputStr, + })) + wsClient.MsgChan <- msgByte + } + case <-wsClient.StopChan: // 等待停止信号 + s.wsService.ClientClose(wsClient.ID) + logger.Infof("ws Stop Client UID %s", wsClient.BindUid) + return + } + } } diff --git a/src/modules/tool/model/iperf.go b/src/modules/tool/model/iperf.go deleted file mode 100644 index 54950d43..00000000 --- a/src/modules/tool/model/iperf.go +++ /dev/null @@ -1,8 +0,0 @@ -package model - -// IPerf G6关系图数据对象 chart_graph -type IPerf struct { - RowID int64 `json:"rowId,omitempty" gorm:"column:row_id;primaryKey;autoIncrement"` // 记录ID - RowType string `json:"rowType,omitempty" gorm:"column:row_type"` // 记录类型(node/edge/combo) - RowGroup string `json:"rowGroup,omitempty" gorm:"column:row_group"` // 记录组名 -} diff --git a/src/modules/tool/service/iperf.go b/src/modules/tool/service/iperf.go index 947d419a..ae204513 100644 --- a/src/modules/tool/service/iperf.go +++ b/src/modules/tool/service/iperf.go @@ -1,12 +1,18 @@ package service import ( + "encoding/json" "fmt" "io" "strings" + "time" "be.ems/src/framework/config" + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/ssh" + "be.ems/src/framework/vo/result" neService "be.ems/src/modules/network_element/service" + wsModel "be.ems/src/modules/ws/model" ) // 实例化服务层 IPerf 结构体 @@ -108,3 +114,139 @@ func (s *IPerf) Install(meType, neId string) error { } return err } + +// Run 接收IPerf3终端交互业务处理 +func (s *IPerf) Run(client *wsModel.WSClient, reqMsg wsModel.WSRequest) { + // 必传requestId确认消息 + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Infof("ws IPerf3 Run UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "close": + // 主动关闭 + resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) + client.MsgChan <- resultByte + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + return + case "iperf3": + // SSH会话消息接收写入会话 + var command string + command, err = s.parseOptions(reqMsg.Data) + if command != "" && err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write(command) + } + case "ctrl-c": + // 模拟按下 Ctrl+C + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write("\u0003\n") + case "resize": + // 会话窗口重置 + msgByte, _ := json.Marshal(reqMsg.Data) + var data struct { + Cols int `json:"cols"` + Rows int `json:"rows"` + } + err = json.Unmarshal(msgByte, &data) + if err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) + } + default: + err = fmt.Errorf("message type %s not supported", reqMsg.Type) + } + + if err != nil { + logger.Warnf("ws IPerf3 Run UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + if err == io.EOF { + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + } + return + } + if len(resByte) > 0 { + client.MsgChan <- resByte + } +} + +// parseOptions 解析拼装iperf3命令 iperf [-s|-c host] [options] +func (s *IPerf) parseOptions(reqData any) (string, error) { + msgByte, _ := json.Marshal(reqData) + var data struct { + Command string `json:"command"` // 命令字符串 + Client bool `json:"client"` // 服务端或客户端,默认服务端 + Host string `json:"host"` // 客户端连接到的服务端IP地址 + // Server or Client + Port int `json:"port"` // 服务端口 + Interval int `json:"interval"` // 每次报告之间的时间间隔,单位为秒 + // Server + OneOff bool `json:"oneOff"` // 只进行一次连接 + // Client + UDP bool `json:"udp"` // use UDP rather than TCP + Time int `json:"time"` // 以秒为单位的传输时间(默认为 10 秒) + Reverse bool `json:"reverse"` // 以反向模式运行(服务器发送,客户端接收) + Window string `json:"window"` // 设置窗口大小/套接字缓冲区大小 + } + if err := json.Unmarshal(msgByte, &data); err != nil { + logger.Warnf("ws processor parseClient err: %s", err.Error()) + return "", fmt.Errorf("query data structure error") + } + + command := []string{"iperf3"} + // 命令字符串高优先级 + if data.Command != "" { + command = append(command, data.Command) + command = append(command, "\n") + return strings.Join(command, " "), nil + } + + if data.Client && data.Host == "" { + return "", fmt.Errorf("query data client host empty") + } + if !data.Client { + command = append(command, "-s") + // Server + if data.OneOff { + command = append(command, "-1") + } + } else { + command = append(command, "-c") + command = append(command, data.Host) + // Client + if data.UDP { + command = append(command, "-u") + } + if data.Time > 0 { + command = append(command, fmt.Sprintf("-t %d", data.Time)) + } + if data.Reverse { + command = append(command, "-R") + } + if data.Window != "" { + command = append(command, fmt.Sprintf("-w %s", data.Window)) + } + } + + // Server or Client + if data.Port > 0 { + command = append(command, fmt.Sprintf("-p %d", data.Port)) + } + if data.Interval > 0 { + command = append(command, fmt.Sprintf("-i %d", data.Interval)) + } + command = append(command, "\n") + return strings.Join(command, " "), nil +}