package controller import ( "encoding/json" "fmt" "strings" "time" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/reqctx" "be.ems/src/framework/resp" neService "be.ems/src/modules/ne/service" "be.ems/src/modules/tool/service" wsService "be.ems/src/modules/ws/service" "github.com/gin-gonic/gin" ) // 实例化控制层 IPerfController 结构体 var NewIPerf = &IPerfController{ iperfService: service.NewIPerf, wsService: wsService.NewWS, } // iperf 网络性能测试工具 https://iperf.fr/iperf-download.php // // PATH /tool/iperf type IPerfController struct { iperfService *service.IPerf // IPerf3 网络性能测试工具服务 wsService *wsService.WS // WebSocket 服务 } // iperf 版本信息 // // GET /v // // @Tags tool/iperf // @Accept json // @Produce json // @Param neType query string true "NE Type" Enums(IMS,AMF,AUSF,UDM,SMF,PCF,NSSF,NRF,UPF,MME,CBC,OMC,SGWC,SMSC) // @Param neId query string true "NE ID" default(001) // @Param version query string true "Version" Enums(V2, V3) // @Success 200 {object} object "Response Results" // @Security TokenAuth // @Summary iperf version information // @Description iperf version information // @Router /tool/iperf/v [get] func (s *IPerfController) Version(c *gin.Context) { language := reqctx.AcceptLanguage(c) var query struct { NeType string `form:"neType" binding:"required"` // 网元类型 NeId string `form:"neId" binding:"required"` // 网元ID Version string `form:"version" binding:"required,oneof=V2 V3"` // 版本 } if err := c.ShouldBindQuery(&query); err != nil { errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) return } output, err := s.iperfService.Version(query.NeType, query.NeId, query.Version) if err != nil { c.JSON(200, resp.ErrMsg(i18n.TKey(language, err.Error()))) return } data := strings.Split(output, "\n") c.JSON(200, resp.OkData(data)) } // iperf 软件安装 // // POST /i // // @Tags tool/iperf // @Accept json // @Produce json // @Param data body object true "Request Param" // @Success 200 {object} object "Response Results" // @Security TokenAuth // @Summary iperf software installation // @Description iperf software installation // @Router /tool/iperf/i [post] func (s *IPerfController) Install(c *gin.Context) { language := reqctx.AcceptLanguage(c) var body struct { NeType string `json:"neType" binding:"required"` // 网元类型 NeId string `json:"neId" binding:"required"` // 网元ID Version string `form:"version" binding:"required,oneof=V2 V3"` // 版本 } if err := c.ShouldBindBodyWithJSON(&body); err != nil { errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) return } if err := s.iperfService.Install(body.NeType, body.NeId, body.Version); err != nil { c.JSON(200, resp.ErrMsg(i18n.TKey(language, err.Error()))) return } c.JSON(200, resp.Ok(nil)) } // iperf 软件运行 // // GET /run // // @Tags tool/iperf // @Accept json // @Produce json // @Param neType query string true "NE Type" Enums(IMS,AMF,AUSF,UDM,SMF,PCF,NSSF,NRF,UPF,MME,CBC,OMC,SGWC,SMSC) // @Param neId query string true "NE ID" default(001) // @Param cols query number false "Terminal line characters" default(120) // @Param rows query number false "Terminal display lines" default(40) // @Param access_token query string true "Authorization" // @Success 200 {object} object "Response Results" // @Security TokenAuth // @Summary (ws://) iperf software running // @Description (ws://) iperf software running // @Router /tool/iperf/run [get] func (s *IPerfController) Run(c *gin.Context) { language := reqctx.AcceptLanguage(c) var query struct { NeType string `form:"neType" binding:"required"` // 网元类型 NeId string `form:"neId" binding:"required"` // 网元标识id Cols int `form:"cols"` // 终端单行字符数 Rows int `form:"rows"` // 终端显示行数 } if err := c.ShouldBindQuery(&query); err != nil { errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) return } // 登录用户信息 loginUser, err := reqctx.LoginUser(c) if err != nil { c.JSON(401, resp.CodeMsg(resp.CODE_AUTH_INVALID, i18n.TKey(language, err.Error()))) return } // 网元主机的SSH客户端 sshClient, err := neService.NewNeInfo.NeRunSSHClient(query.NeType, query.NeId) if err != nil { c.JSON(200, resp.ErrMsg(err.Error())) return } defer sshClient.Close() // ssh连接会话 clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows) if err != nil { c.JSON(200, resp.ErrMsg("neinfo ssh client session new err")) return } defer clientSession.Close() // 将 HTTP 连接升级为 WebSocket 连接 wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) if wsConn == nil { return } defer wsConn.Close() wsClient := s.wsService.ClientCreate(loginUser.UserId, nil, wsConn, clientSession) go s.wsService.ClientWriteListen(wsClient) go s.wsService.ClientReadListen(wsClient, s.iperfService.Run) // 等待1秒,排空首次消息 time.Sleep(1 * time.Second) _ = clientSession.Read() // 实时读取Run消息直接输出 msTicker := time.NewTicker(100 * time.Millisecond) defer msTicker.Stop() for { select { case ms := <-msTicker.C: outputByte := clientSession.Read() if len(outputByte) > 0 { outputStr := string(outputByte) msgByte, _ := json.Marshal(resp.Ok(map[string]any{ "requestId": fmt.Sprintf("iperf3_%d", ms.UnixMilli()), "data": outputStr, })) wsClient.MsgChan <- msgByte } case <-wsClient.StopChan: // 等待停止信号 s.wsService.ClientClose(wsClient.ID) logger.Infof("ws Stop Client UID %d", wsClient.BindUid) return } } }