feat: 工具模块ping功能

This commit is contained in:
TsMask
2024-10-10 21:05:12 +08:00
parent 9127865b12
commit 7ba111a7e9
4 changed files with 500 additions and 0 deletions

View File

@@ -0,0 +1,158 @@
package controller
import (
"encoding/json"
"fmt"
"time"
"be.ems/src/framework/i18n"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/ctx"
"be.ems/src/framework/vo/result"
neService "be.ems/src/modules/network_element/service"
"be.ems/src/modules/tool/model"
"be.ems/src/modules/tool/service"
wsService "be.ems/src/modules/ws/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 PingController 结构体
var NewPing = &PingController{
pingService: service.NewPing,
wsService: wsService.NewWS,
}
// ping ICMP网络探测工具 https://github.com/prometheus-community/pro-bing
//
// PATH /tool/ping
type PingController struct {
pingService *service.Ping // ping ICMP网络探测工具
wsService *wsService.WS // WebSocket 服务
}
// ping 基本信息运行
//
// POST /
func (s *PingController) Statistics(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body model.Ping
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
info, err := s.pingService.Statistics(body)
if err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return
}
c.JSON(200, result.OkData(info))
}
// ping 传统UNIX运行
//
// GET /
func (s *PingController) StatisticsOn(c *gin.Context) {
language := ctx.AcceptLanguage(c)
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
// 将 HTTP 连接升级为 WebSocket 连接
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
if wsConn == nil {
return
}
defer wsConn.Close()
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, nil)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, s.pingService.StatisticsOn)
// 等待停止信号
for value := range wsClient.StopChan {
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
return
}
}
// ping 网元端UNIX运行
//
// GET /run
func (s *PingController) Run(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var query struct {
NeType string `form:"neType" binding:"required"` // 网元类型
NeId string `form:"neId" binding:"required"` // 网元标识id
Cols int `form:"cols"` // 终端单行字符数
Rows int `form:"rows"` // 终端显示行数
}
if err := c.ShouldBindQuery(&query); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
// 网元主机的SSH客户端
sshClient, err := neService.NewNeInfoImpl.NeRunSSHClient(query.NeType, query.NeId)
if err != nil {
c.JSON(200, result.ErrMsg(err.Error()))
return
}
defer sshClient.Close()
// ssh连接会话
clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows)
if err != nil {
c.JSON(200, result.ErrMsg("neinfo ssh client session new err"))
return
}
defer clientSession.Close()
// 将 HTTP 连接升级为 WebSocket 连接
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
if wsConn == nil {
return
}
defer wsConn.Close()
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
go s.wsService.ClientWriteListen(wsClient)
go s.wsService.ClientReadListen(wsClient, s.pingService.RunNE)
// 等待1秒排空首次消息
time.Sleep(1 * time.Second)
_ = clientSession.Read()
// 实时读取Run消息直接输出
msTicker := time.NewTicker(100 * time.Millisecond)
defer msTicker.Stop()
for {
select {
case ms := <-msTicker.C:
outputByte := clientSession.Read()
if len(outputByte) > 0 {
outputStr := string(outputByte)
msgByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": fmt.Sprintf("ping_%d", ms.UnixMilli()),
"data": outputStr,
}))
wsClient.MsgChan <- msgByte
}
case <-wsClient.StopChan: // 等待停止信号
s.wsService.ClientClose(wsClient.ID)
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
return
}
}
}

View File

@@ -0,0 +1,62 @@
package model
import (
"runtime"
"time"
probing "github.com/prometheus-community/pro-bing"
)
// Ping 探针发包参数
type Ping struct {
DesAddr string `json:"desAddr" binding:"required"` // 目的 IP 地址(字符串类型,必填)
SrcAddr string `json:"srcAddr"` // 源 IP 地址(字符串类型,可选)
Interval int `json:"interval"` // 发包间隔整数类型可选单位取值范围1-60默认值1
TTL int `json:"ttl"` // TTL整数类型可选取值范围1-255默认值255
Count int `json:"count"` // 发包数整数类型可选取值范围1-65535默认值5
Size int `json:"size"` // 报文大小整数类型可选取值范围36-8192默认值36
Timeout int `json:"timeout"` // 报文超时时间整数类型可选单位取值范围1-60默认值2
}
// setDefaultValue 设置默认值
func (p *Ping) setDefaultValue() {
if p.Interval < 1 || p.Interval > 10 {
p.Interval = 1
}
if p.TTL < 1 || p.TTL > 255 {
p.TTL = 255
}
if p.Count < 1 || p.Count > 65535 {
p.Count = 5
}
if p.Size < 36 || p.Size > 8192 {
p.Size = 36
}
if p.Timeout < 1 || p.Timeout > 60 {
p.Timeout = 2
}
}
// NewPinger ping对象
func (p *Ping) NewPinger() (*probing.Pinger, error) {
p.setDefaultValue()
pinger, err := probing.NewPinger(p.DesAddr)
if err != nil {
return nil, err
}
if p.SrcAddr != "" {
pinger.Source = p.SrcAddr
}
pinger.Interval = time.Duration(p.Interval) * time.Second
pinger.TTL = p.TTL
pinger.Count = p.Count
pinger.Size = p.Size
pinger.Timeout = time.Duration(p.Timeout) * time.Second
// 设置特权模式(需要管理员权限)
if runtime.GOOS == "windows" {
pinger.SetPrivileged(true)
}
return pinger, nil
}

View File

@@ -0,0 +1,256 @@
package service
import (
"encoding/json"
"fmt"
"io"
"strings"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/vo/result"
neService "be.ems/src/modules/network_element/service"
"be.ems/src/modules/tool/model"
wsModel "be.ems/src/modules/ws/model"
probing "github.com/prometheus-community/pro-bing"
)
// 实例化服务层 Ping 结构体
var NewPing = &Ping{
neInfoService: neService.NewNeInfoImpl,
}
// Ping 网络性能测试工具 服务层处理
type Ping struct {
// 网元信息服务
neInfoService neService.INeInfo
}
// Statistics ping基本信息
func (s *Ping) Statistics(ping model.Ping) (map[string]any, error) {
pinger, err := ping.NewPinger()
if err != nil {
return nil, err
}
if err = pinger.Run(); err != nil {
return nil, err
}
defer pinger.Stop()
stats := pinger.Statistics()
return map[string]any{
"minTime": stats.MinRtt.Microseconds(), // 最小时延(整数类型,可选,单位:微秒)
"maxTime": stats.MaxRtt.Microseconds(), // 最大时延(整数类型,可选,单位:微秒)
"avgTime": stats.AvgRtt.Microseconds(), // 平均时延(整数类型,可选,单位:微秒)
"lossRate": int64(stats.PacketLoss), // 丢包率(整数类型,可选,单位:%
"jitter": stats.StdDevRtt.Microseconds(), // 时延抖动(整数类型,可选,单位:微秒)
}, nil
}
// StatisticsOn ping模拟传统UNIX
func (s *Ping) StatisticsOn(client *wsModel.WSClient, reqMsg wsModel.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
return
case "ping":
msgByte, _ := json.Marshal(reqMsg.Data)
var ping model.Ping
if errj := json.Unmarshal(msgByte, &ping); errj != nil {
err = fmt.Errorf("query data structure error")
}
var pinger *probing.Pinger
pinger, errp := ping.NewPinger()
if errp != nil {
logger.Warnf("ws pinger new err: %s", errp.Error())
err = fmt.Errorf("pinger error")
}
defer pinger.Stop()
// 接收的数据包
pinger.OnRecv = func(pkt *probing.Packet) {
resultByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": reqMsg.RequestID,
"data": fmt.Sprintf("%d bytes from %s: icmp_seq=%d time=%v\\r\\n", pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt),
}))
client.MsgChan <- resultByte
}
// 已接收过的数据包
pinger.OnDuplicateRecv = func(pkt *probing.Packet) {
resultByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": reqMsg.RequestID,
"data": fmt.Sprintf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\\r\\n", pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.TTL),
}))
client.MsgChan <- resultByte
}
// 接收结束
pinger.OnFinish = func(stats *probing.Statistics) {
end1 := fmt.Sprintf("\\r\\n--- %s ping statistics ---\\r\\n", stats.Addr)
end2 := fmt.Sprintf("%d packets transmitted, %d packets received, %v%% packet loss\\r\\n", stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
end3 := fmt.Sprintf("round-trip min/avg/max/stddev = %v/%v/%v/%v\\r\\n", stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
resultByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": reqMsg.RequestID,
"data": fmt.Sprintf("%s%s%s", end1, end2, end3),
}))
client.MsgChan <- resultByte
}
resultByte, _ := json.Marshal(result.Ok(map[string]any{
"requestId": reqMsg.RequestID,
"data": fmt.Sprintf("PING %s (%s) %d bytes of data.\\r\\n", pinger.Addr(), pinger.IPAddr(), pinger.Size),
}))
client.MsgChan <- resultByte
if errp := pinger.Run(); errp != nil {
logger.Warnf("ws pinger run err: %s", errp.Error())
err = fmt.Errorf("pinger error")
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws ping run UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// RunNE 接收ping终端交互业务处理
func (s *Ping) RunNE(client *wsModel.WSClient, reqMsg wsModel.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws ping run UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
return
case "ping":
// SSH会话消息接收写入会话
var command string
command, err = s.parseOptions(reqMsg.Data)
if command != "" && err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ctrl-c":
// 模拟按下 Ctrl+C
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write("\u0003\n")
case "resize":
// 会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws ping run UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// parseOptions 解析拼装ping命令 ping [options] <destination>
func (s *Ping) parseOptions(reqData any) (string, error) {
msgByte, _ := json.Marshal(reqData)
var data struct {
Command string `json:"command"` // 命令字符串
DesAddr string `json:"desAddr"` // dns name or ip address
// Options
Interval int `json:"interval"` // seconds between sending each packet
TTL int `json:"ttl"` // define time to live
Cunt int `json:"count"` // <count> 次回复后停止
Size int `json:"size"` // 使用 <size> 作为要发送的数据字节数
Timeout int `json:"timeout"` // time to wait for response
}
if err := json.Unmarshal(msgByte, &data); err != nil {
logger.Warnf("ws processor parseClient err: %s", err.Error())
return "", fmt.Errorf("query data structure error")
}
command := []string{"ping"}
// 命令字符串高优先级
if data.Command != "" {
command = append(command, data.Command)
command = append(command, "\n")
return strings.Join(command, " "), nil
}
// Options
if data.Interval > 0 {
command = append(command, fmt.Sprintf("-i %d", data.Interval))
}
if data.TTL > 0 {
command = append(command, fmt.Sprintf("-t %d", data.TTL))
}
if data.Cunt > 0 {
command = append(command, fmt.Sprintf("-c %d", data.Cunt))
}
if data.Size > 0 {
command = append(command, fmt.Sprintf("-s %d", data.Size))
}
if data.Timeout > 0 {
command = append(command, fmt.Sprintf("-w %d", data.Timeout))
}
command = append(command, data.DesAddr)
command = append(command, "\n")
return strings.Join(command, " "), nil
}

View File

@@ -25,5 +25,29 @@ func Setup(router *gin.Engine) {
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.iperf", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewIPerf.Install,
)
iperfGroup.GET("/run",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.iperf", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewIPerf.Run,
)
}
// ping ICMP网络探测工具
pingGroup := router.Group("/tool/ping")
{
pingGroup.POST("",
middleware.PreAuthorize(nil),
controller.NewPing.Statistics,
)
pingGroup.GET("",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ping", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewPing.StatisticsOn,
)
pingGroup.GET("/run",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ping", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewPing.Run,
)
}
}