From 7ba111a7e9370e1597a8948666d362aad48798aa Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Thu, 10 Oct 2024 21:05:12 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B7=A5=E5=85=B7=E6=A8=A1=E5=9D=97pin?= =?UTF-8?q?g=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/modules/tool/controller/ping.go | 158 +++++++++++++++++ src/modules/tool/model/ping.go | 62 +++++++ src/modules/tool/service/ping.go | 256 ++++++++++++++++++++++++++++ src/modules/tool/tool.go | 24 +++ 4 files changed, 500 insertions(+) create mode 100644 src/modules/tool/controller/ping.go create mode 100644 src/modules/tool/model/ping.go create mode 100644 src/modules/tool/service/ping.go diff --git a/src/modules/tool/controller/ping.go b/src/modules/tool/controller/ping.go new file mode 100644 index 00000000..7af020de --- /dev/null +++ b/src/modules/tool/controller/ping.go @@ -0,0 +1,158 @@ +package controller + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/src/framework/i18n" + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/ctx" + "be.ems/src/framework/vo/result" + neService "be.ems/src/modules/network_element/service" + "be.ems/src/modules/tool/model" + "be.ems/src/modules/tool/service" + wsService "be.ems/src/modules/ws/service" + + "github.com/gin-gonic/gin" +) + +// 实例化控制层 PingController 结构体 +var NewPing = &PingController{ + pingService: service.NewPing, + wsService: wsService.NewWS, +} + +// ping ICMP网络探测工具 https://github.com/prometheus-community/pro-bing +// +// PATH /tool/ping +type PingController struct { + pingService *service.Ping // ping ICMP网络探测工具 + wsService *wsService.WS // WebSocket 服务 +} + +// ping 基本信息运行 +// +// POST / +func (s *PingController) Statistics(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body model.Ping + if err := c.ShouldBindBodyWithJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + info, err := s.pingService.Statistics(body) + if err != nil { + c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error()))) + return + } + c.JSON(200, result.OkData(info)) +} + +// ping 传统UNIX运行 +// +// GET / +func (s *PingController) StatisticsOn(c *gin.Context) { + language := ctx.AcceptLanguage(c) + // 登录用户信息 + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + // 将 HTTP 连接升级为 WebSocket 连接 + wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) + if wsConn == nil { + return + } + defer wsConn.Close() + + wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, nil) + go s.wsService.ClientWriteListen(wsClient) + go s.wsService.ClientReadListen(wsClient, s.pingService.StatisticsOn) + + // 等待停止信号 + for value := range wsClient.StopChan { + s.wsService.ClientClose(wsClient.ID) + logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value) + return + } +} + +// ping 网元端UNIX运行 +// +// GET /run +func (s *PingController) Run(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var query struct { + NeType string `form:"neType" binding:"required"` // 网元类型 + NeId string `form:"neId" binding:"required"` // 网元标识id + Cols int `form:"cols"` // 终端单行字符数 + Rows int `form:"rows"` // 终端显示行数 + } + if err := c.ShouldBindQuery(&query); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 登录用户信息 + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + // 网元主机的SSH客户端 + sshClient, err := neService.NewNeInfoImpl.NeRunSSHClient(query.NeType, query.NeId) + if err != nil { + c.JSON(200, result.ErrMsg(err.Error())) + return + } + defer sshClient.Close() + // ssh连接会话 + clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows) + if err != nil { + c.JSON(200, result.ErrMsg("neinfo ssh client session new err")) + return + } + defer clientSession.Close() + + // 将 HTTP 连接升级为 WebSocket 连接 + wsConn := s.wsService.UpgraderWs(c.Writer, c.Request) + if wsConn == nil { + return + } + defer wsConn.Close() + + wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession) + go s.wsService.ClientWriteListen(wsClient) + go s.wsService.ClientReadListen(wsClient, s.pingService.RunNE) + + // 等待1秒,排空首次消息 + time.Sleep(1 * time.Second) + _ = clientSession.Read() + + // 实时读取Run消息直接输出 + msTicker := time.NewTicker(100 * time.Millisecond) + defer msTicker.Stop() + for { + select { + case ms := <-msTicker.C: + outputByte := clientSession.Read() + if len(outputByte) > 0 { + outputStr := string(outputByte) + msgByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": fmt.Sprintf("ping_%d", ms.UnixMilli()), + "data": outputStr, + })) + wsClient.MsgChan <- msgByte + } + case <-wsClient.StopChan: // 等待停止信号 + s.wsService.ClientClose(wsClient.ID) + logger.Infof("ws Stop Client UID %s", wsClient.BindUid) + return + } + } +} diff --git a/src/modules/tool/model/ping.go b/src/modules/tool/model/ping.go new file mode 100644 index 00000000..9a00b1ff --- /dev/null +++ b/src/modules/tool/model/ping.go @@ -0,0 +1,62 @@ +package model + +import ( + "runtime" + "time" + + probing "github.com/prometheus-community/pro-bing" +) + +// Ping 探针发包参数 +type Ping struct { + DesAddr string `json:"desAddr" binding:"required"` // 目的 IP 地址(字符串类型,必填) + SrcAddr string `json:"srcAddr"` // 源 IP 地址(字符串类型,可选) + Interval int `json:"interval"` // 发包间隔(整数类型,可选,单位:秒,取值范围:1-60,默认值:1) + TTL int `json:"ttl"` // TTL(整数类型,可选,取值范围:1-255,默认值:255) + Count int `json:"count"` // 发包数(整数类型,可选,取值范围:1-65535,默认值:5) + Size int `json:"size"` // 报文大小(整数类型,可选,取值范围:36-8192,默认值:36) + Timeout int `json:"timeout"` // 报文超时时间(整数类型,可选,单位:秒,取值范围:1-60,默认值:2) +} + +// setDefaultValue 设置默认值 +func (p *Ping) setDefaultValue() { + if p.Interval < 1 || p.Interval > 10 { + p.Interval = 1 + } + if p.TTL < 1 || p.TTL > 255 { + p.TTL = 255 + } + if p.Count < 1 || p.Count > 65535 { + p.Count = 5 + } + if p.Size < 36 || p.Size > 8192 { + p.Size = 36 + } + if p.Timeout < 1 || p.Timeout > 60 { + p.Timeout = 2 + } +} + +// NewPinger ping对象 +func (p *Ping) NewPinger() (*probing.Pinger, error) { + p.setDefaultValue() + + pinger, err := probing.NewPinger(p.DesAddr) + if err != nil { + return nil, err + } + if p.SrcAddr != "" { + pinger.Source = p.SrcAddr + } + pinger.Interval = time.Duration(p.Interval) * time.Second + pinger.TTL = p.TTL + pinger.Count = p.Count + pinger.Size = p.Size + pinger.Timeout = time.Duration(p.Timeout) * time.Second + + // 设置特权模式(需要管理员权限) + if runtime.GOOS == "windows" { + pinger.SetPrivileged(true) + } + return pinger, nil +} diff --git a/src/modules/tool/service/ping.go b/src/modules/tool/service/ping.go new file mode 100644 index 00000000..100ef9db --- /dev/null +++ b/src/modules/tool/service/ping.go @@ -0,0 +1,256 @@ +package service + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "time" + + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/ssh" + "be.ems/src/framework/vo/result" + neService "be.ems/src/modules/network_element/service" + "be.ems/src/modules/tool/model" + wsModel "be.ems/src/modules/ws/model" + probing "github.com/prometheus-community/pro-bing" +) + +// 实例化服务层 Ping 结构体 +var NewPing = &Ping{ + neInfoService: neService.NewNeInfoImpl, +} + +// Ping 网络性能测试工具 服务层处理 +type Ping struct { + // 网元信息服务 + neInfoService neService.INeInfo +} + +// Statistics ping基本信息 +func (s *Ping) Statistics(ping model.Ping) (map[string]any, error) { + pinger, err := ping.NewPinger() + if err != nil { + return nil, err + } + if err = pinger.Run(); err != nil { + return nil, err + } + defer pinger.Stop() + stats := pinger.Statistics() + return map[string]any{ + "minTime": stats.MinRtt.Microseconds(), // 最小时延(整数类型,可选,单位:微秒) + "maxTime": stats.MaxRtt.Microseconds(), // 最大时延(整数类型,可选,单位:微秒) + "avgTime": stats.AvgRtt.Microseconds(), // 平均时延(整数类型,可选,单位:微秒) + "lossRate": int64(stats.PacketLoss), // 丢包率(整数类型,可选,单位:%) + "jitter": stats.StdDevRtt.Microseconds(), // 时延抖动(整数类型,可选,单位:微秒) + }, nil +} + +// StatisticsOn ping模拟传统UNIX +func (s *Ping) StatisticsOn(client *wsModel.WSClient, reqMsg wsModel.WSRequest) { + // 必传requestId确认消息 + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "close": + // 主动关闭 + resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) + client.MsgChan <- resultByte + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + return + case "ping": + msgByte, _ := json.Marshal(reqMsg.Data) + var ping model.Ping + if errj := json.Unmarshal(msgByte, &ping); errj != nil { + err = fmt.Errorf("query data structure error") + } + var pinger *probing.Pinger + pinger, errp := ping.NewPinger() + if errp != nil { + logger.Warnf("ws pinger new err: %s", errp.Error()) + err = fmt.Errorf("pinger error") + } + defer pinger.Stop() + + // 接收的数据包 + pinger.OnRecv = func(pkt *probing.Packet) { + resultByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": reqMsg.RequestID, + "data": fmt.Sprintf("%d bytes from %s: icmp_seq=%d time=%v\\r\\n", pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt), + })) + client.MsgChan <- resultByte + } + // 已接收过的数据包 + pinger.OnDuplicateRecv = func(pkt *probing.Packet) { + resultByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": reqMsg.RequestID, + "data": fmt.Sprintf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\\r\\n", pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.TTL), + })) + client.MsgChan <- resultByte + } + // 接收结束 + pinger.OnFinish = func(stats *probing.Statistics) { + end1 := fmt.Sprintf("\\r\\n--- %s ping statistics ---\\r\\n", stats.Addr) + end2 := fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\\r\\n", stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss) + end3 := fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\\r\\n", stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt) + resultByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": reqMsg.RequestID, + "data": fmt.Sprintf("%s%s%s", end1, end2, end3), + })) + client.MsgChan <- resultByte + } + resultByte, _ := json.Marshal(result.Ok(map[string]any{ + "requestId": reqMsg.RequestID, + "data": fmt.Sprintf("PING %s (%s) %d bytes of data.\\r\\n", pinger.Addr(), pinger.IPAddr(), pinger.Size), + })) + client.MsgChan <- resultByte + if errp := pinger.Run(); errp != nil { + logger.Warnf("ws pinger run err: %s", errp.Error()) + err = fmt.Errorf("pinger error") + } + default: + err = fmt.Errorf("message type %s not supported", reqMsg.Type) + } + + if err != nil { + logger.Warnf("ws ping run UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + if err == io.EOF { + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + } + return + } + if len(resByte) > 0 { + client.MsgChan <- resByte + } +} + +// RunNE 接收ping终端交互业务处理 +func (s *Ping) RunNE(client *wsModel.WSClient, reqMsg wsModel.WSRequest) { + // 必传requestId确认消息 + if reqMsg.RequestID == "" { + msg := "message requestId is required" + logger.Infof("ws ping run UID %s err: %s", client.BindUid, msg) + msgByte, _ := json.Marshal(result.ErrMsg(msg)) + client.MsgChan <- msgByte + return + } + + var resByte []byte + var err error + + switch reqMsg.Type { + case "close": + // 主动关闭 + resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) + client.MsgChan <- resultByte + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + return + case "ping": + // SSH会话消息接收写入会话 + var command string + command, err = s.parseOptions(reqMsg.Data) + if command != "" && err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write(command) + } + case "ctrl-c": + // 模拟按下 Ctrl+C + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + _, err = sshClientSession.Write("\u0003\n") + case "resize": + // 会话窗口重置 + msgByte, _ := json.Marshal(reqMsg.Data) + var data struct { + Cols int `json:"cols"` + Rows int `json:"rows"` + } + err = json.Unmarshal(msgByte, &data) + if err == nil { + sshClientSession := client.ChildConn.(*ssh.SSHClientSession) + err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) + } + default: + err = fmt.Errorf("message type %s not supported", reqMsg.Type) + } + + if err != nil { + logger.Warnf("ws ping run UID %s err: %s", client.BindUid, err.Error()) + msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) + client.MsgChan <- msgByte + if err == io.EOF { + // 等待1s后关闭连接 + time.Sleep(1 * time.Second) + client.StopChan <- struct{}{} + } + return + } + if len(resByte) > 0 { + client.MsgChan <- resByte + } +} + +// parseOptions 解析拼装ping命令 ping [options] +func (s *Ping) parseOptions(reqData any) (string, error) { + msgByte, _ := json.Marshal(reqData) + var data struct { + Command string `json:"command"` // 命令字符串 + DesAddr string `json:"desAddr"` // dns name or ip address + // Options + Interval int `json:"interval"` // seconds between sending each packet + TTL int `json:"ttl"` // define time to live + Cunt int `json:"count"` // 次回复后停止 + Size int `json:"size"` // 使用 作为要发送的数据字节数 + Timeout int `json:"timeout"` // time to wait for response + } + if err := json.Unmarshal(msgByte, &data); err != nil { + logger.Warnf("ws processor parseClient err: %s", err.Error()) + return "", fmt.Errorf("query data structure error") + } + + command := []string{"ping"} + // 命令字符串高优先级 + if data.Command != "" { + command = append(command, data.Command) + command = append(command, "\n") + return strings.Join(command, " "), nil + } + + // Options + if data.Interval > 0 { + command = append(command, fmt.Sprintf("-i %d", data.Interval)) + } + if data.TTL > 0 { + command = append(command, fmt.Sprintf("-t %d", data.TTL)) + } + if data.Cunt > 0 { + command = append(command, fmt.Sprintf("-c %d", data.Cunt)) + } + if data.Size > 0 { + command = append(command, fmt.Sprintf("-s %d", data.Size)) + } + if data.Timeout > 0 { + command = append(command, fmt.Sprintf("-w %d", data.Timeout)) + } + + command = append(command, data.DesAddr) + command = append(command, "\n") + return strings.Join(command, " "), nil +} diff --git a/src/modules/tool/tool.go b/src/modules/tool/tool.go index 6d91e856..7a92d1d9 100644 --- a/src/modules/tool/tool.go +++ b/src/modules/tool/tool.go @@ -25,5 +25,29 @@ func Setup(router *gin.Engine) { collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.iperf", collectlogs.BUSINESS_TYPE_OTHER)), controller.NewIPerf.Install, ) + iperfGroup.GET("/run", + middleware.PreAuthorize(nil), + collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.iperf", collectlogs.BUSINESS_TYPE_OTHER)), + controller.NewIPerf.Run, + ) + } + + // ping ICMP网络探测工具 + pingGroup := router.Group("/tool/ping") + { + pingGroup.POST("", + middleware.PreAuthorize(nil), + controller.NewPing.Statistics, + ) + pingGroup.GET("", + middleware.PreAuthorize(nil), + collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ping", collectlogs.BUSINESS_TYPE_OTHER)), + controller.NewPing.StatisticsOn, + ) + pingGroup.GET("/run", + middleware.PreAuthorize(nil), + collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ping", collectlogs.BUSINESS_TYPE_OTHER)), + controller.NewPing.Run, + ) } }