From 5ccd1fd267b686f4e940efea61e9e702086627a5 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Tue, 10 Jun 2025 18:55:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=BD=91=E5=85=83=E4=BF=A1=E4=BB=A4?= =?UTF-8?q?=E8=B7=9F=E8=B8=AA=E4=BB=A3=E7=A0=81=E5=90=8C=E6=AD=A5=EF=BC=8C?= =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=BC=80=E5=85=B3=E5=88=87=E6=8D=A2=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E9=94=99=E8=AF=AF=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- features/fm/alarm.go | 4 +- src/framework/cmd/check.go | 44 +++++ src/framework/cmd/cmd.go | 168 ++++++++++++++++++ src/framework/resp/code.go | 78 ++++++++ src/framework/socket/tcp_client.go | 2 +- src/framework/socket/tcp_server.go | 8 +- src/framework/socket/udp_client.go | 2 +- src/framework/socket/udp_server.go | 7 + .../network_element/service/ne_info.go | 22 +++ src/modules/trace/controller/trace_task.go | 19 +- src/modules/trace/repository/trace_task.go | 4 +- src/modules/trace/service/trace_task.go | 44 +++-- .../trace/service/trace_task_udp_data.go | 5 +- src/modules/trace/trace.go | 2 +- 14 files changed, 370 insertions(+), 39 deletions(-) create mode 100644 src/framework/cmd/check.go create mode 100644 src/framework/cmd/cmd.go create mode 100644 src/framework/resp/code.go diff --git a/features/fm/alarm.go b/features/fm/alarm.go index 447666f9..a05ca54b 100644 --- a/features/fm/alarm.go +++ b/features/fm/alarm.go @@ -496,7 +496,7 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { } // 网元重启后,有跟踪任务的需要重新补发启动任务 if alarmData.AlarmCode == constants.ALARM_EVENT_REBOOT { - traceService.NewTraceTask.RunUnstopped() + traceService.NewTraceTask.RunUnstopped(alarmData.NeType, alarmData.NeId) } } @@ -825,7 +825,7 @@ func GetAlarmFromNF(w http.ResponseWriter, r *http.Request) { } // 网元重启后,有跟踪任务的需要重新补发启动任务 if alarmData.AlarmCode == constants.ALARM_EVENT_REBOOT { - traceService.NewTraceTask.RunUnstopped() + traceService.NewTraceTask.RunUnstopped(alarmData.NeType, alarmData.NeId) } } log.Warn("Failed to insert alarm data:", err) diff --git a/src/framework/cmd/check.go b/src/framework/cmd/check.go new file mode 100644 index 00000000..b3632529 --- /dev/null +++ b/src/framework/cmd/check.go @@ -0,0 +1,44 @@ +package cmd + +import ( + "os/exec" + "strings" +) + +// CheckIllegal 检查传入的字符串参数中是否包含一些特殊字符 +func CheckIllegal(args ...string) bool { + if args == nil { + return false + } + illegalChars := []string{"&", "|", ";", "$", "'", "`", "(", ")", "\""} + for _, arg := range args { + for _, char := range illegalChars { + if strings.Contains(arg, char) { + return true + } + } + } + return false +} + +// HasNoPasswordSudo 检查当前用户是否拥有sudo权限 +func HasNoPasswordSudo() bool { + cmd2 := exec.Command("sudo", "-n", "uname") + err2 := cmd2.Run() + return err2 == nil +} + +// SudoHandleCmd 是否拥有sudo权限并拼接 +func SudoHandleCmd() string { + cmd := exec.Command("sudo", "-n", "uname") + if err := cmd.Run(); err == nil { + return "sudo " + } + return "" +} + +// Which 可执行文件是否在系统的PATH环境变量中 +func Which(name string) bool { + _, err := exec.LookPath(name) + return err == nil +} diff --git a/src/framework/cmd/cmd.go b/src/framework/cmd/cmd.go new file mode 100644 index 00000000..e22ffe08 --- /dev/null +++ b/src/framework/cmd/cmd.go @@ -0,0 +1,168 @@ +package cmd + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "time" +) + +// Exec 本地执行命令 列如:("ls -ls") +func Exec(cmdStr string) (string, error) { + cmd := exec.Command("bash", "-c", cmdStr) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + errMsg := "" + if len(stderr.String()) != 0 { + errMsg = fmt.Sprintf("stderr: %s", stderr.String()) + } + if len(stdout.String()) != 0 { + if len(errMsg) != 0 { + errMsg = fmt.Sprintf("%s; stdout: %s", errMsg, stdout.String()) + } else { + errMsg = fmt.Sprintf("stdout: %s", stdout.String()) + } + } + return errMsg, err + } + return stdout.String(), nil +} + +// Execf 本地执行命令 列如:("ssh %s@%s", "user", "localhost") +func Execf(cmdStr string, a ...any) (string, error) { + cmd := exec.Command("bash", "-c", fmt.Sprintf(cmdStr, a...)) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + errMsg := "" + if len(stderr.String()) != 0 { + errMsg = fmt.Sprintf("stderr: %s", stderr.String()) + } + if len(stdout.String()) != 0 { + if len(errMsg) != 0 { + errMsg = fmt.Sprintf("%s; stdout: %s", errMsg, stdout.String()) + } else { + errMsg = fmt.Sprintf("stdout: %s", stdout.String()) + } + } + return errMsg, err + } + return stdout.String(), nil +} + +// ExecWithTimeOut 本地执行命令超时退出 列如:("ssh user@localhost", 20*time.Second) +func ExecWithTimeOut(cmdStr string, timeout time.Duration) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + cmd := exec.Command("bash", "-c", cmdStr) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if ctx.Err() == context.DeadlineExceeded { + return "", fmt.Errorf("errCmdTimeout %v", err) + } + if err != nil { + errMsg := "" + if len(stderr.String()) != 0 { + errMsg = fmt.Sprintf("stderr: %s", stderr.String()) + } + if len(stdout.String()) != 0 { + if len(errMsg) != 0 { + errMsg = fmt.Sprintf("%s; stdout: %s", errMsg, stdout.String()) + } else { + errMsg = fmt.Sprintf("stdout: %s", stdout.String()) + } + } + return errMsg, err + } + return stdout.String(), nil +} + +// ExecDirWithTimeOut 指定目录本地执行命令超时退出 列如:("ssh user@localhost", 20*time.Second) +func ExecDirWithTimeOut(workdir string, cmdStr string, timeout time.Duration) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + cmd := exec.Command("bash", "-c", cmdStr) + cmd.Dir = workdir + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if ctx.Err() == context.DeadlineExceeded { + return "", fmt.Errorf("errCmdTimeout %v", err) + } + + errMsg := "" + if len(stderr.String()) != 0 { + errMsg = fmt.Sprintf("stderr:\n %s", stderr.String()) + } + if len(stdout.String()) != 0 { + if len(errMsg) != 0 { + errMsg = fmt.Sprintf("%s \n\n; stdout:\n %s", errMsg, stdout.String()) + } else { + errMsg = fmt.Sprintf("stdout:\n %s", stdout.String()) + } + } + return errMsg, err +} + +// ExecDirScript 指定目录本地执行脚本文件, 默认超时10分钟 列如:("/tmp", "setup.sh") +func ExecDirScript(workDir, scriptPath string) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + cmd := exec.Command("bash", scriptPath) + cmd.Dir = workDir + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if ctx.Err() == context.DeadlineExceeded { + return "", fmt.Errorf("errCmdTimeout %v", err) + } + if err != nil { + errMsg := "" + if len(stderr.String()) != 0 { + errMsg = fmt.Sprintf("stderr: %s", stderr.String()) + } + if len(stdout.String()) != 0 { + if len(errMsg) != 0 { + errMsg = fmt.Sprintf("%s; stdout: %s", errMsg, stdout.String()) + } else { + errMsg = fmt.Sprintf("stdout: %s", stdout.String()) + } + } + return errMsg, err + } + return stdout.String(), nil +} + +// ExecCommand 执行命令程序带参数 例如:("ls", "-r", "-l", "-s") +func ExecCommand(name string, a ...string) (string, error) { + cmd := exec.Command(name, a...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + errMsg := "" + if len(stderr.String()) != 0 { + errMsg = fmt.Sprintf("stderr: %s", stderr.String()) + } + if len(stdout.String()) != 0 { + if len(errMsg) != 0 { + errMsg = fmt.Sprintf("%s; stdout: %s", errMsg, stdout.String()) + } else { + errMsg = fmt.Sprintf("stdout: %s", stdout.String()) + } + } + return errMsg, err + } + return stdout.String(), nil +} diff --git a/src/framework/resp/code.go b/src/framework/resp/code.go new file mode 100644 index 00000000..36eba106 --- /dev/null +++ b/src/framework/resp/code.go @@ -0,0 +1,78 @@ +package resp + +// |HTTP|状态码|描述|排查建议| +// |----|----|----|----| +// |500 |500001 |internal error|服务内部错误| +// |200 |200999 |encrypt|正常请求加密数据| +// |200 |200001 |request success|正常请求成功| +// |200 |400001 |exist error|正常请求错误信息| +// |200 |400002 |ratelimit over|请求限流| +// |401 |401001 |authentication error|身份认证失败或者过期| +// |401 |401002 |authentication invalid error|无效身份信息| +// |401 |401003 |authorization token error|令牌字符为空| +// |401 |401004 |device fingerprint mismatch|设备指纹信息不匹配| +// |403 |403001 |permission error|权限未分配| +// |422 |422001 |params error|参数接收解析错误| +// |422 |422002 |params error|参数属性传入错误| + +// ====== 500 ====== +const ( + // CODE_ERROR_INTERNAL 响应-code服务内部错误 + CODE_INTERNAL = 500001 + // MSG_ERROR_INTERNAL 响应-msg服务内部错误 + MSG_INTERNAL = "internal error" +) + +// ====== 200 ====== +const ( + // CODE_ENCRYPT 响应-code加密数据 + // CODE_ENCRYPT = 200999 + // MSG_ENCRYPT 响应-msg加密数据 + // MSG_ENCRYPT = "encrypt" + + // CODE_SUCCESS 响应-code正常成功 + // CODE_SUCCESS = 200001 + // MSG_SUCCCESS 响应-msg正常成功 + // MSG_SUCCCESS = "success" + + // CODE_ERROR 响应-code错误失败 + // CODE_ERROR = 400001 + // MSG_ERROR 响应-msg错误失败 + // MSG_ERROR = "error" + + // CODE_RATELIMIT 响应-code错误失败 + CODE_RATELIMIT = 400002 + // MSG_RATELIMIT 响应-msg错误失败 + MSG_RATELIMIT = "access too often, please try again later" +) + +// ====== 401 ====== +const ( + // CODE_ERROR 响应-code身份认证失败或者过期 + CODE_AUTH = 401001 + + // CODE_AUTH_INVALID 响应-code无效身份信息 + CODE_AUTH_INVALID = 401002 + + // CODE_AUTH_NOTOKEN 响应-code令牌字符为空 + CODE_AUTH_NOTOKEN = 401003 + + // CODE_AUTH_DEVICE 响应-code设备指纹信息不匹配 + CODE_AUTH_DEVICE = 401004 + // MSG_AUTH_DEVICE 响应-msg设备指纹信息不匹配 + MSG_AUTH_DEVICE = "device fingerprint mismatch" +) + +// ====== 403 ====== +const ( + // CODE_PERMISSION 响应-code权限未分配 + CODE_PERMISSION = 403001 +) + +// ====== 422 ====== +const ( + // CODE_PARAM_PARSER 响应-code参数接收解析错误 + CODE_PARAM_PARSER = 422001 + // CODE_PARAM_CHEACK 响应-code参数属性传入错误 + CODE_PARAM_CHEACK = 422002 +) diff --git a/src/framework/socket/tcp_client.go b/src/framework/socket/tcp_client.go index fb8899b5..05935f29 100644 --- a/src/framework/socket/tcp_client.go +++ b/src/framework/socket/tcp_client.go @@ -27,7 +27,7 @@ func (c *ConnTCP) New() (*ConnTCP, error) { proto = "tcp6" c.Addr = fmt.Sprintf("[%s]", c.Addr) } - address := fmt.Sprintf("%s:%d", c.Addr, c.Port) + address := net.JoinHostPort(c.Addr, fmt.Sprint(c.Port)) // 默认等待5s if c.DialTimeOut == 0 { diff --git a/src/framework/socket/tcp_server.go b/src/framework/socket/tcp_server.go index 0957438f..755b41c3 100644 --- a/src/framework/socket/tcp_server.go +++ b/src/framework/socket/tcp_server.go @@ -57,8 +57,14 @@ func (s *SocketTCP) Resolve(callback func(conn *net.Conn, err error)) { callback(nil, fmt.Errorf("tcp service not created")) return } - listener := *s.Listener + defer func() { + if err := recover(); err != nil { + callback(nil, fmt.Errorf("tcp service panic err")) + } + }() + + listener := *s.Listener for { select { case <-s.StopChan: diff --git a/src/framework/socket/udp_client.go b/src/framework/socket/udp_client.go index ecaa4864..9c77aade 100644 --- a/src/framework/socket/udp_client.go +++ b/src/framework/socket/udp_client.go @@ -27,7 +27,7 @@ func (c *ConnUDP) New() (*ConnUDP, error) { proto = "udp6" c.Addr = fmt.Sprintf("[%s]", c.Addr) } - address := fmt.Sprintf("%s:%d", c.Addr, c.Port) + address := net.JoinHostPort(c.Addr, fmt.Sprint(c.Port)) // 默认等待5s if c.DialTimeOut == 0 { diff --git a/src/framework/socket/udp_server.go b/src/framework/socket/udp_server.go index d93001b0..1c9f8eaa 100644 --- a/src/framework/socket/udp_server.go +++ b/src/framework/socket/udp_server.go @@ -56,10 +56,17 @@ func (s *SocketUDP) Resolve(callback func(*net.UDPConn, error)) { return } + defer func() { + if err := recover(); err != nil { + callback(nil, fmt.Errorf("udp service panic err")) + } + }() + for { select { case <-s.StopChan: callback(nil, fmt.Errorf("udp service not created")) + return default: callback(s.Conn, nil) } diff --git a/src/modules/network_element/service/ne_info.go b/src/modules/network_element/service/ne_info.go index a81fc39b..03c6bd53 100644 --- a/src/modules/network_element/service/ne_info.go +++ b/src/modules/network_element/service/ne_info.go @@ -8,6 +8,7 @@ import ( "runtime" "strings" + "be.ems/src/framework/constants" "be.ems/src/framework/constants/cachekey" "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" @@ -53,6 +54,27 @@ func (r *NeInfo) SelectNeInfoByNeTypeAndNeID(neType, neID string) model.NeInfo { return neInfo } +// FindByNeTypeAndNeID 通过ne_type和ne_id查询网元信息 +func (r NeInfo) FindByNeTypeAndNeID(neType, neID string) model.NeInfo { + var neInfo model.NeInfo + key := fmt.Sprintf("%s:%s:%s", constants.CACHE_NE_INFO, strings.ToUpper(neType), neID) + jsonStr, _ := redis.Get("", key) + if len(jsonStr) > 7 { + err := json.Unmarshal([]byte(jsonStr), &neInfo) + if err != nil { + neInfo = model.NeInfo{} + } + } else { + neInfo = r.neInfoRepository.SelectNeInfoByNeTypeAndNeID(neType, neID) + if neInfo.ID != "" && neInfo.NeId == neID { + redis.Del("", key) + values, _ := json.Marshal(neInfo) + redis.Set("", key, string(values)) + } + } + return neInfo +} + // RefreshByNeTypeAndNeID 通过ne_type和ne_id刷新redis中的缓存 func (r *NeInfo) RefreshByNeTypeAndNeID(neType, neID string) model.NeInfo { var neInfo model.NeInfo diff --git a/src/modules/trace/controller/trace_task.go b/src/modules/trace/controller/trace_task.go index abb5dfd6..75a4db94 100644 --- a/src/modules/trace/controller/trace_task.go +++ b/src/modules/trace/controller/trace_task.go @@ -43,7 +43,7 @@ func (s *TraceTaskController) List(c *gin.Context) { func (s *TraceTaskController) Info(c *gin.Context) { id := parse.Number(c.Param("id")) if id <= 0 { - c.JSON(400, resp.CodeMsg(40010, "bind err: id is empty")) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "bind err: id is empty")) return } @@ -61,19 +61,18 @@ func (s *TraceTaskController) Info(c *gin.Context) { func (s *TraceTaskController) Add(c *gin.Context) { language := reqctx.AcceptLanguage(c) var body model.TraceTask - err := c.ShouldBindBodyWithJSON(&body) - if err != nil { + if err := c.ShouldBindBodyWithJSON(&body); err != nil { errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) - c.JSON(422, resp.CodeMsg(40422, errMsgs)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) return } if body.ID > 0 { - c.JSON(400, resp.CodeMsg(40010, "bind err: id not is empty")) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "bind err: id not is empty")) return } body.CreateBy = reqctx.LoginUserToUserName(c) - if err = s.traceTaskService.Insert(body); err != nil { + if err := s.traceTaskService.Insert(body); err != nil { c.JSON(200, resp.ErrMsg(i18n.TKey(language, err.Error()))) return } @@ -87,7 +86,7 @@ func (s *TraceTaskController) Remove(c *gin.Context) { language := reqctx.AcceptLanguage(c) id := c.Param("id") if id == "" { - c.JSON(400, resp.CodeMsg(40010, "bind err: id is empty")) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "bind err: id is empty")) return } @@ -112,17 +111,17 @@ func (s *TraceTaskController) Remove(c *gin.Context) { // // GET /filePull func (s *TraceTaskController) FilePull(c *gin.Context) { - language := reqctx.AcceptLanguage(c) var querys struct { TraceId string `form:"traceId" binding:"required"` } if err := c.ShouldBindQuery(&querys); err != nil { - c.JSON(400, resp.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) return } fileName := fmt.Sprintf("task_%s.pcap", querys.TraceId) - localFilePath := filepath.Join("/tmp/omc/trace", fileName) + localFilePath := filepath.Join("/usr/local/omc/trace", fileName) if runtime.GOOS == "windows" { localFilePath = fmt.Sprintf("C:%s", localFilePath) } diff --git a/src/modules/trace/repository/trace_task.go b/src/modules/trace/repository/trace_task.go index 6a842431..0894c171 100644 --- a/src/modules/trace/repository/trace_task.go +++ b/src/modules/trace/repository/trace_task.go @@ -116,11 +116,11 @@ func (r TraceTask) SelectByIds(ids []int64) []model.TraceTask { } // SelectByUnstopped 查询未停止的任务补发 -func (r TraceTask) SelectByUnstopped() []model.TraceTask { +func (r TraceTask) SelectByUnstopped(neStr string) []model.TraceTask { rows := []model.TraceTask{} tx := db.DB("").Model(&model.TraceTask{}) // 构建查询条件 - tx = tx.Where("end_time > ?", time.Now().UnixMilli()) + tx = tx.Where("end_time > ? and ne_list like ?", time.Now().UnixMilli(), fmt.Sprintf("%%%s%%", neStr)) // 查询数据 if err := tx.Find(&rows).Error; err != nil { logger.Errorf("query find err => %v", err.Error()) diff --git a/src/modules/trace/service/trace_task.go b/src/modules/trace/service/trace_task.go index e63b5588..2085f282 100644 --- a/src/modules/trace/service/trace_task.go +++ b/src/modules/trace/service/trace_task.go @@ -22,22 +22,25 @@ import ( // 实例化数据层 TraceTask 结构体 var NewTraceTask = &TraceTask{ udpService: socket.SocketUDP{}, + tcpService: socket.SocketTCP{}, traceTaskRepository: repository.NewTraceTask, traceDataRepository: repository.NewTraceData, } // TraceTask 跟踪任务 服务层处理 type TraceTask struct { - // UDP服务对象 - udpService socket.SocketUDP - // 跟踪_任务数据信息 - traceTaskRepository *repository.TraceTask - // 跟踪_数据信息 - traceDataRepository *repository.TraceData + udpService socket.SocketUDP // UDP服务对象 + tcpService socket.SocketTCP // 测试用,后续调整TODO + traceTaskRepository *repository.TraceTask // 跟踪_任务数据信息 + traceDataRepository *repository.TraceData // 跟踪_数据信息 } // CreateUDP 创建UDP数据通道 -func (r TraceTask) CreateUDP() error { +func (r *TraceTask) CreateUDP(reload bool) error { + if reload { + r.CloseUDP() // 关闭之前的UDP服务 + } + // 跟踪配置是否开启 host, port, err := r.traceNotify() if err != nil { @@ -84,12 +87,12 @@ func (r TraceTask) CreateUDP() error { // ============ 本地测试接收网元UDP发过来的数据 后续调整TODO if config.Env() == "local" { // 初始化TCP服务 - tcpService := socket.SocketTCP{Addr: host, Port: port + 1} - if _, err := tcpService.New(); err != nil { + r.tcpService = socket.SocketTCP{Addr: host, Port: port + 1} + if _, err := r.tcpService.New(); err != nil { return err } // 接收处理TCP数据 - go tcpService.Resolve(func(conn *net.Conn, err error) { + go r.tcpService.Resolve(func(conn *net.Conn, err error) { if err != nil { logger.Errorf("TCP Resolve %s", err.Error()) return @@ -125,7 +128,7 @@ func (r TraceTask) CreateUDP() error { } // pasreUDPData 解析数据 -func (r TraceTask) pasreUDPData(buf []byte) error { +func (r *TraceTask) pasreUDPData(buf []byte) error { data, err := traceHandler(buf) if err != nil { logger.Errorf("UDP Resolve UDPDataHandler Error: %s", err.Error()) @@ -157,8 +160,9 @@ func (r TraceTask) pasreUDPData(buf []byte) error { } // CloseUDP 关闭UDP数据通道 -func (r TraceTask) CloseUDP() { +func (r *TraceTask) CloseUDP() { r.udpService.Close() + r.tcpService.Close() } // FindByPage 根据条件分页查询 @@ -202,8 +206,9 @@ func (r TraceTask) createTaskToNe(task *model.TraceTask, ignoreErr bool) error { return fmt.Errorf("ne list is empty") } // 生成任务ID - traceId := r.traceTaskRepository.LastID() + 1 // 生成任务ID < 65535 - task.TraceId = fmt.Sprint(traceId) + if task.TraceId == "" { + task.TraceId = fmt.Sprint(r.traceTaskRepository.LastID() + 1) // 生成任务ID < 65535 + } // 发送任务给网元 errNe := []string{} @@ -213,7 +218,7 @@ func (r TraceTask) createTaskToNe(task *model.TraceTask, ignoreErr bool) error { logger.Warnf("ne type id is error") continue } - neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) + neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { logger.Warnf("ne type id is not exist") continue @@ -233,7 +238,7 @@ func (r TraceTask) createTaskToNe(task *model.TraceTask, ignoreErr bool) error { if len(neTypeIDArr) != 2 { continue } - neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) + neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { continue } @@ -315,7 +320,7 @@ func (r TraceTask) DeleteByIds(ids []int64) (int64, error) { if len(neTypeIDArr) != 2 { continue } - neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) + neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { continue } @@ -331,8 +336,9 @@ func (r TraceTask) DeleteByIds(ids []int64) (int64, error) { } // RunUnstopped 启动跟踪未停止的任务 -func (r TraceTask) RunUnstopped() { - tasks := r.traceTaskRepository.SelectByUnstopped() +func (r TraceTask) RunUnstopped(neType string, neId string) { + neStr := fmt.Sprintf("%s_%s", neType, neId) + tasks := r.traceTaskRepository.SelectByUnstopped(neStr) for _, task := range tasks { r.createTaskToNe(&task, true) } diff --git a/src/modules/trace/service/trace_task_udp_data.go b/src/modules/trace/service/trace_task_udp_data.go index 2722566c..7bd8c6f6 100644 --- a/src/modules/trace/service/trace_task_udp_data.go +++ b/src/modules/trace/service/trace_task_udp_data.go @@ -27,14 +27,14 @@ func traceHandler(data []byte) (*TraceMsgToOamTraceData, error) { if err != nil { return decodeData, err } - fmt.Printf("TraceHandler get oamData: %s,%+v\n, payload=len(%d,%d)", decodeData.TimestampStr, decodeData.NfTraceMsg, decodeData.TracePayloadLen, len(decodeData.TracePayload)) + // fmt.Printf("TraceHandler get oamData: %s,%+v\n, payload=len(%d,%d)", decodeData.TimestampStr, decodeData.NfTraceMsg, decodeData.TracePayloadLen, len(decodeData.TracePayload)) // Return parsed message and payload if len(decodeData.TracePayload) != int(decodeData.TracePayloadLen) { return decodeData, fmt.Errorf("trace payload is bad, len=%d, shall be:%d", len(decodeData.TracePayload), int(decodeData.TracePayloadLen)) } // 输出到文件 - filePath := fmt.Sprintf("/tmp/omc/trace/task_%d.pcap", decodeData.NfTraceMsg.TraceId) + filePath := fmt.Sprintf("/usr/local/omc/trace/task_%d.pcap", decodeData.NfTraceMsg.TraceId) if runtime.GOOS == "windows" { filePath = fmt.Sprintf("C:%s", filePath) } @@ -106,6 +106,7 @@ func decodeTraceData(data []byte) (*TraceMsgToOamTraceData, error) { return nil, err } msg.SrcIpStr = net.IP(srcIp).String() + // 7. Parse DstIp (4 bytes IPv4) dstIp := make([]byte, 4) if flag&0x20 != 0 { diff --git a/src/modules/trace/trace.go b/src/modules/trace/trace.go index 273a2c4a..074668f1 100644 --- a/src/modules/trace/trace.go +++ b/src/modules/trace/trace.go @@ -141,7 +141,7 @@ func Setup(router *gin.Engine) { // InitLoad 初始参数 func InitLoad() { // 创建跟踪任务信令数据通道UDP - if err := service.NewTraceTask.CreateUDP(); err != nil { + if err := service.NewTraceTask.CreateUDP(false); err != nil { logger.Errorf("socket udp init fail: %s", err.Error()) } }