feat: 工具模块iperf功能

This commit is contained in:
TsMask
2024-10-10 21:05:35 +08:00
parent 7ba111a7e9
commit c1fa37731f
3 changed files with 231 additions and 16 deletions

View File

@@ -1,12 +1,18 @@
package service
import (
"encoding/json"
"fmt"
"io"
"strings"
"time"
"be.ems/src/framework/config"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/vo/result"
neService "be.ems/src/modules/network_element/service"
wsModel "be.ems/src/modules/ws/model"
)
// 实例化服务层 IPerf 结构体
@@ -108,3 +114,139 @@ func (s *IPerf) Install(meType, neId string) error {
}
return err
}
// Run 接收IPerf3终端交互业务处理
func (s *IPerf) Run(client *wsModel.WSClient, reqMsg wsModel.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws IPerf3 Run UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
return
case "iperf3":
// SSH会话消息接收写入会话
var command string
command, err = s.parseOptions(reqMsg.Data)
if command != "" && err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ctrl-c":
// 模拟按下 Ctrl+C
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write("\u0003\n")
case "resize":
// 会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws IPerf3 Run UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// parseOptions 解析拼装iperf3命令 iperf [-s|-c host] [options]
func (s *IPerf) parseOptions(reqData any) (string, error) {
msgByte, _ := json.Marshal(reqData)
var data struct {
Command string `json:"command"` // 命令字符串
Client bool `json:"client"` // 服务端或客户端,默认服务端
Host string `json:"host"` // 客户端连接到的服务端IP地址
// Server or Client
Port int `json:"port"` // 服务端口
Interval int `json:"interval"` // 每次报告之间的时间间隔,单位为秒
// Server
OneOff bool `json:"oneOff"` // 只进行一次连接
// Client
UDP bool `json:"udp"` // use UDP rather than TCP
Time int `json:"time"` // 以秒为单位的传输时间(默认为 10 秒)
Reverse bool `json:"reverse"` // 以反向模式运行(服务器发送,客户端接收)
Window string `json:"window"` // 设置窗口大小/套接字缓冲区大小
}
if err := json.Unmarshal(msgByte, &data); err != nil {
logger.Warnf("ws processor parseClient err: %s", err.Error())
return "", fmt.Errorf("query data structure error")
}
command := []string{"iperf3"}
// 命令字符串高优先级
if data.Command != "" {
command = append(command, data.Command)
command = append(command, "\n")
return strings.Join(command, " "), nil
}
if data.Client && data.Host == "" {
return "", fmt.Errorf("query data client host empty")
}
if !data.Client {
command = append(command, "-s")
// Server
if data.OneOff {
command = append(command, "-1")
}
} else {
command = append(command, "-c")
command = append(command, data.Host)
// Client
if data.UDP {
command = append(command, "-u")
}
if data.Time > 0 {
command = append(command, fmt.Sprintf("-t %d", data.Time))
}
if data.Reverse {
command = append(command, "-R")
}
if data.Window != "" {
command = append(command, fmt.Sprintf("-w %s", data.Window))
}
}
// Server or Client
if data.Port > 0 {
command = append(command, fmt.Sprintf("-p %d", data.Port))
}
if data.Interval > 0 {
command = append(command, fmt.Sprintf("-i %d", data.Interval))
}
command = append(command, "\n")
return strings.Join(command, " "), nil
}