From 872567b28a062fb8d374c3c8eb50f8396ef28607 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Mon, 12 May 2025 18:19:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=B7=9F=E8=B8=AA=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=A0=87=E9=A2=98=EF=BC=8C=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E7=A7=BB=E9=99=A4=E5=B7=B2=E5=8F=91=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build/database/lite/install/trace_task.sql | 1 + .../database/lite/upgrade/upg_trace_task.sql | 30 +++++++++++ build/database/std/install/trace_task.sql | 1 + build/database/std/upgrade/upg_trace_task.sql | 28 ++++++++++ src/framework/database/db/expand.go | 12 ++++- src/modules/trace/model/trace_task.go | 1 + src/modules/trace/repository/trace_task.go | 17 ++++++ src/modules/trace/service/trace_task.go | 54 +++++++++++++++---- 8 files changed, 131 insertions(+), 13 deletions(-) create mode 100644 build/database/lite/upgrade/upg_trace_task.sql create mode 100644 build/database/std/upgrade/upg_trace_task.sql diff --git a/build/database/lite/install/trace_task.sql b/build/database/lite/install/trace_task.sql index 9a715d7b..b6d4dbaf 100644 --- a/build/database/lite/install/trace_task.sql +++ b/build/database/lite/install/trace_task.sql @@ -15,6 +15,7 @@ CREATE TABLE "trace_task" ( "dst_ip" text(128), "create_by" text(50), "create_time" integer, + "title" text(255), "remark" text(500), "ne_list" text(32) NOT NULL, "notify_url" text(128) NOT NULL, diff --git a/build/database/lite/upgrade/upg_trace_task.sql b/build/database/lite/upgrade/upg_trace_task.sql new file mode 100644 index 00000000..7eb078c6 --- /dev/null +++ b/build/database/lite/upgrade/upg_trace_task.sql @@ -0,0 +1,30 @@ +-- ---------------------------- +-- Table structure for trace_task +-- ---------------------------- +CREATE TABLE IF NOT EXISTS "trace_task" ( + "id" integer NOT NULL, + "trace_id" text(16) NOT NULL, + "trace_type" text(2) NOT NULL, + "start_time" integer, + "end_time" integer, + "interfaces" text(255), + "imsi" text(16), + "msisdn" text(16), + "src_ip" text(128), + "dst_ip" text(128), + "create_by" text(50), + "create_time" integer, + "title" text(255), + "remark" text(500), + "ne_list" text(32) NOT NULL, + "notify_url" text(128) NOT NULL, + PRIMARY KEY ("id") +); + +-- ---------------------------- +-- Indexes structure for table trace_task +-- ---------------------------- + + +-- ADD COLUMN +ALTER TABLE trace_task ADD COLUMN title text(255); diff --git a/build/database/std/install/trace_task.sql b/build/database/std/install/trace_task.sql index ea5ca574..58a5af84 100644 --- a/build/database/std/install/trace_task.sql +++ b/build/database/std/install/trace_task.sql @@ -16,6 +16,7 @@ CREATE TABLE `trace_task` ( `dst_ip` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '目标地址IP', `create_by` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '创建者', `create_time` bigint DEFAULT '0' COMMENT '创建时间', + `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '任务标题', `remark` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '备注', `ne_list` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '网元列表 neType_neId 例如 UDM_001,AMF_001', `notify_url` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '信息数据通知回调地址UDP 例如udp:192.168.5.58:33033', diff --git a/build/database/std/upgrade/upg_trace_task.sql b/build/database/std/upgrade/upg_trace_task.sql new file mode 100644 index 00000000..3c3ce993 --- /dev/null +++ b/build/database/std/upgrade/upg_trace_task.sql @@ -0,0 +1,28 @@ +-- +-- Table structure for table `trace_task` +-- + +CREATE TABLE IF NOT EXISTS `trace_task` ( + `id` int NOT NULL AUTO_INCREMENT COMMENT 'ID', + `trace_id` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '任务编号', + `trace_type` varchar(2) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '1-Interface,2-Device,3-UE', + `start_time` bigint DEFAULT '0' COMMENT '开始时间 毫秒', + `end_time` bigint DEFAULT '0' COMMENT '结束时间 毫秒', + `interfaces` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '接口跟踪必须 例如 N8,N10', + `imsi` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '用户跟踪必须', + `msisdn` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '用户跟踪可选', + `src_ip` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '源地址IP', + `dst_ip` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '目标地址IP', + `create_by` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '创建者', + `create_time` bigint DEFAULT '0' COMMENT '创建时间', + `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '任务标题', + `remark` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT '' COMMENT '备注', + `ne_list` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '网元列表 neType_neId 例如 UDM_001,AMF_001', + `notify_url` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '信息数据通知回调地址UDP 例如udp:192.168.5.58:33033', + PRIMARY KEY (`id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='跟踪_任务'; + +-- Dump completed on 2025-04-14 14:26:59 + +-- ADD COLUMN +ALTER TABLE `trace_task` ADD COLUMN `title` varchar(255) NULL COMMENT '任务标题'; diff --git a/src/framework/database/db/expand.go b/src/framework/database/db/expand.go index 7212e4ca..86af2cef 100644 --- a/src/framework/database/db/expand.go +++ b/src/framework/database/db/expand.go @@ -93,8 +93,16 @@ func processSQLFile(db *gorm.DB, filePath string) { if strings.HasSuffix(line, ";") { // 执行 SQL 语句 if err := db.Exec(sqlBuilder.String()).Error; err != nil { - log.Fatalln(err.Error()) - return + errorStr := err.Error() + if strings.Contains(strings.ToLower(errorStr), "duplicate column") { + // 重复字段错误忽略 + // log.Printf("Exec SQL: %s\n", line) + // log.Println(err.Error()) + } else { + // 其他错误终止程序 + log.Fatalln(errorStr) + return + } } sqlBuilder.Reset() diff --git a/src/modules/trace/model/trace_task.go b/src/modules/trace/model/trace_task.go index 29f2ce20..4a2ff489 100644 --- a/src/modules/trace/model/trace_task.go +++ b/src/modules/trace/model/trace_task.go @@ -14,6 +14,7 @@ type TraceTask struct { DstIp string `json:"dstIp" gorm:"column:dst_ip"` // 目标地址IP CreateBy string `json:"createBy" gorm:"column:create_by"` // 创建者 CreateTime int64 `json:"createTime" gorm:"column:create_time"` // 创建时间 + Title string `json:"title" gorm:"column:title" binding:"required"` // 任务标题 Remark string `json:"remark" gorm:"column:remark"` // 备注 NeList string `json:"neList" gorm:"column:ne_list" binding:"required"` // 网元列表 neType_neId 例如 UDM_001,AMF_001 NotifyUrl string `json:"notifyUrl" gorm:"column:notify_url"` // 信息数据通知回调地址UDP 例如udp:192.168.5.58:33033 diff --git a/src/modules/trace/repository/trace_task.go b/src/modules/trace/repository/trace_task.go index 2233420e..6a842431 100644 --- a/src/modules/trace/repository/trace_task.go +++ b/src/modules/trace/repository/trace_task.go @@ -29,6 +29,9 @@ func (r TraceTask) SelectByPage(query map[string]string) ([]model.TraceTask, int if v, ok := query["msisdn"]; ok && v != "" { tx = tx.Where("msisdn like ?", fmt.Sprintf("%s%%", v)) } + if v, ok := query["title"]; ok && v != "" { + tx = tx.Where("title like ?", fmt.Sprintf("%s%%", v)) + } if v, ok := query["startTime"]; ok && v != "" { if len(v) == 10 { v = fmt.Sprintf("%s000", v) @@ -112,6 +115,20 @@ func (r TraceTask) SelectByIds(ids []int64) []model.TraceTask { return rows } +// SelectByUnstopped 查询未停止的任务补发 +func (r TraceTask) SelectByUnstopped() []model.TraceTask { + rows := []model.TraceTask{} + tx := db.DB("").Model(&model.TraceTask{}) + // 构建查询条件 + tx = tx.Where("end_time > ?", time.Now().UnixMilli()) + // 查询数据 + if err := tx.Find(&rows).Error; err != nil { + logger.Errorf("query find err => %v", err.Error()) + return rows + } + return rows +} + // Insert 新增信息 返回新增数据ID func (r TraceTask) Insert(param model.TraceTask) int64 { if param.CreateBy != "" { diff --git a/src/modules/trace/service/trace_task.go b/src/modules/trace/service/trace_task.go index dfd2643c..f47cd4ae 100644 --- a/src/modules/trace/service/trace_task.go +++ b/src/modules/trace/service/trace_task.go @@ -177,6 +177,19 @@ func (r TraceTask) FindById(id int64) model.TraceTask { // Insert 新增信息 func (r TraceTask) Insert(task model.TraceTask) error { + if err := r.createTaskToNe(&task, false); err != nil { + return err + } + // 插入数据库 + insertId := r.traceTaskRepository.Insert(task) + if insertId <= 0 { + return fmt.Errorf("insert task error") + } + return nil +} + +// CreateTaskToNe 创建任务到网元 +func (r TraceTask) createTaskToNe(task *model.TraceTask, ignoreErr bool) error { // 跟踪配置是否开启 host, port, err := r.traceNotify() if err != nil { @@ -193,6 +206,7 @@ func (r TraceTask) Insert(task model.TraceTask) error { task.TraceId = fmt.Sprint(traceId) // 发送任务给网元 + errNe := []string{} for _, neTypeID := range neList { neTypeIDArr := strings.Split(neTypeID, "_") if len(neTypeIDArr) != 2 { @@ -204,18 +218,28 @@ func (r TraceTask) Insert(task model.TraceTask) error { logger.Warnf("ne type id is not exist") continue } - if err := r.createTaskToNe(neInfo, task); err != nil { - logger.Errorf("task to %s error: %s", neTypeID, err.Error()) - return fmt.Errorf("task to %s error: %s", neTypeID, err.Error()) + if err := r.traceNeTask(neInfo, *task); err != nil { + logger.Errorf("ne type id is %s to %s error: %s", task.TraceId, neTypeID, err.Error()) + errNe = append(errNe, neTypeID) + continue } } - - // 插入数据库 - insertId := r.traceTaskRepository.Insert(task) - if insertId <= 0 { - return fmt.Errorf("insert task error") + if len(errNe) <= 0 && !ignoreErr { + return nil } - return nil + // 移除任务 + for _, neTypeID := range neList { + neTypeIDArr := strings.Split(neTypeID, "_") + if len(neTypeIDArr) != 2 { + continue + } + neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) + if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { + continue + } + neFetchlink.NeTraceDelete(neInfo, task.TraceId) + } + return fmt.Errorf("task to ne error: %s", strings.Join(errNe, ",")) } // traceNotify 网元通知地址 @@ -234,8 +258,8 @@ func (r TraceTask) traceNotify() (string, int64, error) { return host, port, nil } -// createTaskToNe 网元创建任务 -func (r TraceTask) createTaskToNe(neInfo neModel.NeInfo, task model.TraceTask) error { +// traceNeTask 网元创建任务 +func (r TraceTask) traceNeTask(neInfo neModel.NeInfo, task model.TraceTask) error { data := model.TraceReq{ TraceId: parse.Number(task.TraceId), NotifyUrl: task.NotifyUrl, @@ -305,3 +329,11 @@ func (r TraceTask) DeleteByIds(ids []int64) (int64, error) { // 删除信息失败! return 0, fmt.Errorf("delete fail") } + +// RunUnstopped 启动跟踪未停止的任务 +func (r TraceTask) RunUnstopped() { + tasks := r.traceTaskRepository.SelectByUnstopped() + for _, task := range tasks { + r.createTaskToNe(&task, true) + } +}