package service import ( "encoding/json" "fmt" "io" "strings" "time" "be.ems/src/framework/logger" "be.ems/src/framework/utils/ssh" "be.ems/src/framework/vo/result" neService "be.ems/src/modules/network_element/service" "be.ems/src/modules/tool/model" wsModel "be.ems/src/modules/ws/model" probing "github.com/prometheus-community/pro-bing" ) // 实例化服务层 Ping 结构体 var NewPing = &Ping{} // Ping 网络性能测试工具 服务层处理 type Ping struct{} // Statistics ping基本信息 func (s *Ping) Statistics(ping model.Ping) (map[string]any, error) { pinger, err := ping.NewPinger() if err != nil { return nil, err } if err = pinger.Run(); err != nil { return nil, err } defer pinger.Stop() stats := pinger.Statistics() return map[string]any{ "minTime": stats.MinRtt.Microseconds(), // 最小时延(整数类型,可选,单位:微秒) "maxTime": stats.MaxRtt.Microseconds(), // 最大时延(整数类型,可选,单位:微秒) "avgTime": stats.AvgRtt.Microseconds(), // 平均时延(整数类型,可选,单位:微秒) "lossRate": int64(stats.PacketLoss), // 丢包率(整数类型,可选,单位:%) "jitter": stats.StdDevRtt.Microseconds(), // 时延抖动(整数类型,可选,单位:微秒) }, nil } // StatisticsOn ping模拟传统UNIX func (s *Ping) StatisticsOn(client *wsModel.WSClient, reqMsg wsModel.WSRequest) { // 必传requestId确认消息 if reqMsg.RequestID == "" { msg := "message requestId is required" logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg) msgByte, _ := json.Marshal(result.ErrMsg(msg)) client.MsgChan <- msgByte return } var resByte []byte var err error switch reqMsg.Type { case "close": // 主动关闭 resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) client.MsgChan <- resultByte // 等待1s后关闭连接 time.Sleep(1 * time.Second) client.StopChan <- struct{}{} return case "ping": msgByte, _ := json.Marshal(reqMsg.Data) var ping model.Ping if errj := json.Unmarshal(msgByte, &ping); errj != nil { err = fmt.Errorf("query data structure error") } var pinger *probing.Pinger pinger, errp := ping.NewPinger() if errp != nil { logger.Warnf("ws pinger new err: %s", errp.Error()) err = fmt.Errorf("pinger error") } defer pinger.Stop() // 接收的数据包 pinger.OnRecv = func(pkt *probing.Packet) { resultByte, _ := json.Marshal(result.Ok(map[string]any{ "requestId": reqMsg.RequestID, "data": fmt.Sprintf("%d bytes from %s: icmp_seq=%d time=%v\\r\\n", pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt), })) client.MsgChan <- resultByte } // 已接收过的数据包 pinger.OnDuplicateRecv = func(pkt *probing.Packet) { resultByte, _ := json.Marshal(result.Ok(map[string]any{ "requestId": reqMsg.RequestID, "data": fmt.Sprintf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\\r\\n", pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.TTL), })) client.MsgChan <- resultByte } // 接收结束 pinger.OnFinish = func(stats *probing.Statistics) { end1 := fmt.Sprintf("\\r\\n--- %s ping statistics ---\\r\\n", stats.Addr) end2 := fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\\r\\n", stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss) end3 := fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\\r\\n", stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt) resultByte, _ := json.Marshal(result.Ok(map[string]any{ "requestId": reqMsg.RequestID, "data": fmt.Sprintf("%s%s%s", end1, end2, end3), })) client.MsgChan <- resultByte } resultByte, _ := json.Marshal(result.Ok(map[string]any{ "requestId": reqMsg.RequestID, "data": fmt.Sprintf("PING %s (%s) %d bytes of data.\\r\\n", pinger.Addr(), pinger.IPAddr(), pinger.Size), })) client.MsgChan <- resultByte if errp := pinger.Run(); errp != nil { logger.Warnf("ws pinger run err: %s", errp.Error()) err = fmt.Errorf("pinger error") } default: err = fmt.Errorf("message type %s not supported", reqMsg.Type) } if err != nil { logger.Warnf("ws ping run UID %s err: %s", client.BindUid, err.Error()) msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) client.MsgChan <- msgByte if err == io.EOF { // 等待1s后关闭连接 time.Sleep(1 * time.Second) client.StopChan <- struct{}{} } return } if len(resByte) > 0 { client.MsgChan <- resByte } } // Version 查询版本信息 func (s *Ping) Version(meType, neId string) (string, error) { // 检查是否安装ping output, err := neService.NewNeInfo.NeRunSSHCmd(meType, neId, "ping -V") if err != nil { return "", fmt.Errorf("ping not installed") } return strings.TrimSpace(output), err } // Run 接收ping终端交互业务处理 func (s *Ping) Run(client *wsModel.WSClient, reqMsg wsModel.WSRequest) { // 必传requestId确认消息 if reqMsg.RequestID == "" { msg := "message requestId is required" logger.Infof("ws ping run UID %s err: %s", client.BindUid, msg) msgByte, _ := json.Marshal(result.ErrMsg(msg)) client.MsgChan <- msgByte return } var resByte []byte var err error switch reqMsg.Type { case "close": // 主动关闭 resultByte, _ := json.Marshal(result.OkMsg("user initiated closure")) client.MsgChan <- resultByte // 等待1s后关闭连接 time.Sleep(1 * time.Second) client.StopChan <- struct{}{} return case "ping": // SSH会话消息接收写入会话 var command string command, err = s.parseOptions(reqMsg.Data) if command != "" && err == nil { sshClientSession := client.ChildConn.(*ssh.SSHClientSession) _, err = sshClientSession.Write(command) } case "ctrl-c": // 模拟按下 Ctrl+C sshClientSession := client.ChildConn.(*ssh.SSHClientSession) _, err = sshClientSession.Write("\u0003\n") case "resize": // 会话窗口重置 msgByte, _ := json.Marshal(reqMsg.Data) var data struct { Cols int `json:"cols"` Rows int `json:"rows"` } err = json.Unmarshal(msgByte, &data) if err == nil { sshClientSession := client.ChildConn.(*ssh.SSHClientSession) err = sshClientSession.Session.WindowChange(data.Rows, data.Cols) } default: err = fmt.Errorf("message type %s not supported", reqMsg.Type) } if err != nil { logger.Warnf("ws ping run UID %s err: %s", client.BindUid, err.Error()) msgByte, _ := json.Marshal(result.ErrMsg(err.Error())) client.MsgChan <- msgByte if err == io.EOF { // 等待1s后关闭连接 time.Sleep(1 * time.Second) client.StopChan <- struct{}{} } return } if len(resByte) > 0 { client.MsgChan <- resByte } } // parseOptions 解析拼装ping命令 ping [options] func (s *Ping) parseOptions(reqData any) (string, error) { msgByte, _ := json.Marshal(reqData) var data struct { Command string `json:"command"` // 命令字符串 DesAddr string `json:"desAddr"` // dns name or ip address // Options Interval int `json:"interval"` // seconds between sending each packet TTL int `json:"ttl"` // define time to live Cunt int `json:"count"` // 次回复后停止 Size int `json:"size"` // 使用 作为要发送的数据字节数 Timeout int `json:"timeout"` // time to wait for response } if err := json.Unmarshal(msgByte, &data); err != nil { logger.Warnf("ws processor parseClient err: %s", err.Error()) return "", fmt.Errorf("query data structure error") } command := []string{"ping"} // 命令字符串高优先级 if data.Command != "" { command = append(command, data.Command) command = append(command, "\n") return strings.Join(command, " "), nil } // Options if data.Interval > 0 { command = append(command, fmt.Sprintf("-i %d", data.Interval)) } if data.TTL > 0 { command = append(command, fmt.Sprintf("-t %d", data.TTL)) } if data.Cunt > 0 { command = append(command, fmt.Sprintf("-c %d", data.Cunt)) } if data.Size > 0 { command = append(command, fmt.Sprintf("-s %d", data.Size)) } if data.Timeout > 0 { command = append(command, fmt.Sprintf("-w %d", data.Timeout)) } command = append(command, data.DesAddr) command = append(command, "\n") return strings.Join(command, " "), nil }