package service import ( "encoding/json" "fmt" "io" "strings" "time" "be.ems/src/framework/config" "be.ems/src/framework/logger" "be.ems/src/framework/ssh" "be.ems/src/framework/vo/result" neService "be.ems/src/modules/network_element/service" wsModel "be.ems/src/modules/ws/model" ) // 实例化服务层 IPerf 结构体 var NewIPerf = &IPerf{} // IPerf 网络性能测试工具 服务层处理 type IPerf struct{} // Version 查询版本信息 func (s *IPerf) Version(meType, neId, version string) (string, error) { if version != "V2" && version != "V3" { return "", fmt.Errorf("iperf version is required V2 or V3") } cmd := "iperf3 --version" if version == "V2" { cmd = "iperf -v" } // 网元主机的SSH客户端 sshClient, err := neService.NewNeInfo.NeRunSSHClient(meType, neId) if err != nil { return "", err } defer sshClient.Close() // 检查是否安装iperf output, err := sshClient.RunCMD(cmd) if err != nil { if version == "V2" && strings.HasSuffix(err.Error(), "status 1") { // V2 版本 return strings.TrimSpace(output), nil } return "", fmt.Errorf("iperf %s not installed", version) } return strings.TrimSpace(output), err } // Install 安装iperf3 func (s *IPerf) Install(meType, neId, version string) error { if version != "V2" && version != "V3" { return fmt.Errorf("iperf version is required V2 or V3") } // 网元主机的SSH客户端 sshClient, err := neService.NewNeInfo.NeRunSSHClient(meType, neId) if err != nil { return err } defer sshClient.Close() // 网元主机的SSH客户端进行文件传输 sftpClient, err := sshClient.NewClientSFTP() if err != nil { return err } defer sftpClient.Close() nePath := "/tmp" depPkg := "sudo dpkg -i" depDir := "assets/dependency/iperf3/deb" // 检查平台类型 if _, err := sshClient.RunCMD("sudo dpkg --version"); err == nil { depPkg = "sudo dpkg -i" depDir = "assets/dependency/iperf3/deb" // sudo apt remove iperf3 libiperf0 libsctp1 libsctp-dev lksctp-tools } else if _, err := sshClient.RunCMD("sudo yum --version"); err == nil { depPkg = "sudo rpm -Uvh --nosignature --reinstall --force" depDir = "assets/dependency/iperf3/rpm" // yum remove iperf3 iperf3-help.noarch } else { return fmt.Errorf("iperf %s not supported install", version) } // V2版本和V3版本的安装包路径不同 if version == "V2" { depDir = strings.Replace(depDir, "iperf3", "iperf", 1) } // 从 embed.FS 中读取默认配置文件内容 assetsDir := config.GetAssetsDirFS() fsDirEntrys, err := assetsDir.ReadDir(depDir) if err != nil { return err } neFilePaths := []string{} for _, d := range fsDirEntrys { // 打开本地文件 localFile, err := assetsDir.Open(fmt.Sprintf("%s/%s", depDir, d.Name())) if err != nil { return fmt.Errorf("iperf %s file local error", version) } defer localFile.Close() // 创建远程文件 remotePath := fmt.Sprintf("%s/%s", nePath, d.Name()) remoteFile, err := sftpClient.Client.Create(remotePath) if err != nil { return fmt.Errorf("iperf %s file remote error", version) } defer remoteFile.Close() // 使用 io.Copy 将嵌入的文件内容复制到目标文件 if _, err := io.Copy(remoteFile, localFile); err != nil { return fmt.Errorf("iperf %s file copy error", version) } neFilePaths = append(neFilePaths, remotePath) } // 删除软件包 defer func() { pkgRemove := fmt.Sprintf("sudo rm %s", strings.Join(neFilePaths, " ")) sshClient.RunCMD(pkgRemove) }() // 安装软件包 pkgInstall := fmt.Sprintf("%s %s", depPkg, strings.Join(neFilePaths, " ")) if _, err := sshClient.RunCMD(pkgInstall); err != nil { return fmt.Errorf("iperf %s install error", version) } return err } // Run 接收IPerf3终端交互业务处理 func (s *IPerf) Run(client *wsModel.WSClient, reqMsg wsModel.WSRequest) { // 必传requestId确认消息 if reqMsg.RequestID == "" { msg := "message requestId is required" logger.Infof("ws IPerf Run UID %s err: %s", client.BindUid, msg) msgByte, _ := json.Marshal(result.ErrMsg(msg)) client.MsgChan <- msgByte return } var resByte []byte var err error switch reqMsg.Type { case "close": // 主动关闭 resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) client.MsgChan <- resultByte // 等待1s后关闭连接 time.Sleep(1 * time.Second) client.StopChan <- struct{}{} return case "iperf": // SSH会话消息接收写入会话 var command string command, err = s.parseOptions(reqMsg.Data) if command != "" && err == nil { sshClientSession := client.ChildConn.(*ssh.SSHClientSession) _, err = sshClientSession.Write(command) } case "ctrl-c": // 模拟按下 Ctrl+C sshClientSession := client.ChildConn.(*ssh.SSHClientSession) _, err = sshClientSession.Write("\u0003\n") case "resize": // 会话窗口重置 msgByte, _ := json.Marshal(reqMsg.Data) var data struct { Cols int `json:"cols"` Rows int `json:"rows"` } err = json.Unmarshal(msgByte, &data) if err == nil { sshClientSession := client.ChildConn.(*ssh.SSHClientSession) err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) } default: err = fmt.Errorf("message type %s not supported", reqMsg.Type) } if err != nil { logger.Warnf("ws IPerf Run UID %s err: %s", client.BindUid, err.Error()) msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) client.MsgChan <- msgByte if err == io.EOF { // 等待1s后关闭连接 time.Sleep(1 * time.Second) client.StopChan <- struct{}{} } return } if len(resByte) > 0 { client.MsgChan <- resByte } } // parseOptions 解析拼装iperf3命令 iperf [-s|-c host] [options] func (s *IPerf) parseOptions(reqData any) (string, error) { msgByte, _ := json.Marshal(reqData) var data struct { Command string `json:"command"` // 命令字符串 Version string `json:"version"` // 服务版本,默认V3 Mode string `json:"mode"` // 服务端或客户端,默认客户端client Host string `json:"host"` // 客户端连接到的服务端IP地址 // Server or Client Port int `json:"port"` // 服务端口 Interval int `json:"interval"` // 每次报告之间的时间间隔,单位为秒 // Server OneOff bool `json:"oneOff"` // 只进行一次连接 // Client UDP bool `json:"udp"` // use UDP rather than TCP Time int `json:"time"` // 以秒为单位的传输时间(默认为 10 秒) Reverse bool `json:"reverse"` // 以反向模式运行(服务器发送,客户端接收) Window string `json:"window"` // 设置窗口大小/套接字缓冲区大小 Parallel int `json:"parallel"` // 运行的并行客户端流数量 Bitrate int `json:"bitrate"` // 以比特/秒为单位(0 表示无限制) } if err := json.Unmarshal(msgByte, &data); err != nil { logger.Warnf("ws processor parseClient err: %s", err.Error()) return "", fmt.Errorf("query data structure error") } if data.Version != "V3" && data.Version != "V2" { return "", fmt.Errorf("query data version support V3 or V2") } command := []string{"iperf3"} if data.Version == "V2" { command = []string{"iperf"} } // 命令字符串高优先级 if data.Command != "" { command = append(command, data.Command) command = append(command, "\n") return strings.Join(command, " "), nil } if data.Mode != "client" && data.Mode != "server" { return "", fmt.Errorf("query data mode support client or server") } if data.Mode == "client" && data.Host == "" { return "", fmt.Errorf("query data client host empty") } if data.Mode == "client" { command = append(command, "-c") command = append(command, data.Host) // Client if data.UDP { command = append(command, "-u") } if data.Time > 0 { command = append(command, fmt.Sprintf("-t %d", data.Time)) } if data.Bitrate > 0 { command = append(command, fmt.Sprintf("-b %d", data.Bitrate)) } if data.Parallel > 0 { command = append(command, fmt.Sprintf("-P %d", data.Parallel)) } if data.Reverse { command = append(command, "-R") } if data.Window != "" { command = append(command, fmt.Sprintf("-w %s", data.Window)) } } if data.Mode == "server" { command = append(command, "-s") // Server if data.OneOff { command = append(command, "-1") } } // Server or Client if data.Port > 0 { command = append(command, fmt.Sprintf("-p %d", data.Port)) } if data.Interval > 0 { command = append(command, fmt.Sprintf("-i %d", data.Interval)) } command = append(command, "\n") return strings.Join(command, " "), nil }