diff --git a/src/modules/trace/controller/trace_data.go b/src/modules/trace/controller/trace_data.go index 1d032d64..cc011ffa 100644 --- a/src/modules/trace/controller/trace_data.go +++ b/src/modules/trace/controller/trace_data.go @@ -31,6 +31,24 @@ func (s *TraceDataController) List(c *gin.Context) { c.JSON(200, resp.OkData(map[string]any{"rows": rows, "total": total})) } +// 跟踪任务数据信息 +// +// GET /:id +func (s *TraceDataController) Info(c *gin.Context) { + id := parse.Number(c.Param("id")) + if id <= 0 { + c.JSON(400, resp.CodeMsg(40010, "bind err: id is empty")) + return + } + + data := s.traceDataService.FindById(id) + if data.ID == id { + c.JSON(200, resp.OkData(data)) + return + } + c.JSON(200, resp.Err(nil)) +} + // 跟踪任务数据删除 // // DELETE /:id diff --git a/src/modules/trace/controller/trace_task.go b/src/modules/trace/controller/trace_task.go index f37cb523..abb5dfd6 100644 --- a/src/modules/trace/controller/trace_task.go +++ b/src/modules/trace/controller/trace_task.go @@ -80,39 +80,6 @@ func (s *TraceTaskController) Add(c *gin.Context) { c.JSON(200, resp.Ok(nil)) } -// 跟踪任务修改 -// -// PUT / -func (s *TraceTaskController) Edit(c *gin.Context) { - language := reqctx.AcceptLanguage(c) - var body model.TraceTask - err := c.ShouldBindBodyWithJSON(&body) - if err != nil { - errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) - c.JSON(422, resp.CodeMsg(40422, errMsgs)) - return - } - if body.ID == 0 { - c.JSON(400, resp.CodeMsg(40010, "bind err: id is empty")) - return - } - - // 检查是否存在 - taskInfo := s.traceTaskService.FindById(body.ID) - if taskInfo.ID != body.ID { - // 没有可访问任务信息数据! - c.JSON(200, resp.ErrMsg(i18n.TKey(language, "task.noData"))) - return - } - - body.UpdateBy = reqctx.LoginUserToUserName(c) - if err = s.traceTaskService.Update(body); err != nil { - c.JSON(200, resp.ErrMsg(i18n.TKey(language, err.Error()))) - return - } - c.JSON(200, resp.Ok(nil)) -} - // 跟踪任务删除 // // DELETE /:id diff --git a/src/modules/trace/model/trace_data.go b/src/modules/trace/model/trace_data.go index f21f0859..3669199d 100644 --- a/src/modules/trace/model/trace_data.go +++ b/src/modules/trace/model/trace_data.go @@ -3,18 +3,19 @@ package model // TraceData 跟踪_数据 trace_data type TraceData struct { ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"` - TaskId int64 `json:"taskId" gorm:"column:task_id"` // 任务ID - IMSI string `json:"imsi" gorm:"column:imsi"` - MSISDN string `json:"msisdn" gorm:"column:msisdn"` // 可能存在 - SrcAddr string `json:"srcAddr" gorm:"column:src_addr"` // 源地址带端口 - DstAddr string `json:"dstAddr" gorm:"column:dst_addr"` // 目标地址带端口 - IfType int64 `json:"ifType" gorm:"column:if_type"` // 接口类型,未分类 - MsgType int64 `json:"msgType" gorm:"column:msg_type"` - MsgDirect int64 `json:"msgDirect" gorm:"column:msg_direct"` - Length int64 `json:"length" gorm:"column:length"` // 去除头后的原始数据byte长度 - Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` // 毫秒 - RawMsg string `json:"rawMsg" gorm:"column:raw_msg"` // 去除头后的原始数据byteBase64 - DecMsg string `json:"decMsg" gorm:"column:dec_msg"` // TCP内容消息 + TraceId int64 `json:"traceId" gorm:"column:trace_id"` // 跟踪任务ID + IMSI string `json:"imsi" gorm:"column:imsi"` // UE类型 IMSI + MSISDN string `json:"msisdn" gorm:"column:msisdn"` // UE类型 可能存在 + SrcAddr string `json:"srcAddr" gorm:"column:src_addr"` // 源地址 + DstAddr string `json:"dstAddr" gorm:"column:dst_addr"` // 目标地址 + IfType string `json:"ifType" gorm:"column:if_type"` // 接口类型,未分类 + MsgType int64 `json:"msgType" gorm:"column:msg_type"` // 消息类型,0-req, 1-rsp + MsgDirect int64 `json:"msgDirect" gorm:"column:msg_direct"` // 消息方向,0-recv,1-send + MsgNe string `json:"msgNe" gorm:"column:msg_ne"` // 消息网元 + MsgEvent string `json:"msgEvent" gorm:"column:msg_event"` // 消息事件,e.g: CreateContextReq + Length int64 `json:"length" gorm:"column:length"` // 原始数据byte长度 + Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` // 纳秒时间戳 + RawMsg string `json:"rawMsg" gorm:"column:raw_msg"` // 原始数据byteBase64 } // TableName 表名称 diff --git a/src/modules/trace/model/trace_task.go b/src/modules/trace/model/trace_task.go index c581afa2..29f2ce20 100644 --- a/src/modules/trace/model/trace_task.go +++ b/src/modules/trace/model/trace_task.go @@ -2,27 +2,21 @@ package model // TraceTask 跟踪_任务 type TraceTask struct { - ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"` // 跟踪任务ID - TraceId string `json:"traceId" gorm:"column:trace_id"` // 任务编号 - TraceType string `json:"traceType" gorm:"column:trace_type"` // 1-Interface,2-Device,3-User - StartTime int64 `json:"startTime" gorm:"column:start_time"` // 开始时间 毫秒 - EndTime int64 `json:"endTime" gorm:"column:end_time"` // 结束时间 毫秒 - Interfaces string `json:"interfaces" gorm:"column:interfaces"` // 接口跟踪必须 例如 N8,N10 - IMSI string `json:"imsi" gorm:"column:imsi"` // 用户跟踪必须 - MSISDN string `json:"msisdn" gorm:"column:msisdn"` // 用户跟踪可选 - UeIp string `json:"ueIp" gorm:"column:ue_ip"` // 设备跟踪必须 IP - SrcIp string `json:"srcIp" gorm:"column:src_ip"` // 源地址IP - DstIp string `json:"dstIp" gorm:"column:dst_ip"` // 目标地址IP - SignalPort int64 `json:"signalPort" gorm:"column:signal_port"` // 地址IP端口 - CreateBy string `json:"createBy" gorm:"column:create_by"` // 创建者 - CreateTime int64 `json:"createTime" gorm:"column:create_time"` // 创建时间 - UpdateBy string `json:"updateBy" gorm:"column:update_by"` // 更新者 - UpdateTime int64 `json:"updateTime" gorm:"column:update_time"` // 更新时间 - Remark string `json:"remark" gorm:"column:remark"` // 备注 - NeType string `json:"neType" gorm:"column:ne_type"` // 网元类型 - NeId string `json:"neId" gorm:"column:ne_id"` // 网元ID - NotifyUrl string `json:"notifyUrl" gorm:"column:notify_url"` // 信息数据通知回调地址UDP 例如udp:192.168.5.58:29500 - FetchMsg string `json:"fetchMsg" gorm:"column:fetch_msg"` // 任务下发请求响应消息 + ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"` // 跟踪任务ID + TraceId string `json:"traceId" gorm:"column:trace_id"` // 任务编号 + TraceType string `json:"traceType" gorm:"column:trace_type" binding:"required"` // 1-Interface,2-Device,3-UE + StartTime int64 `json:"startTime" gorm:"column:start_time"` // 开始时间 毫秒 + EndTime int64 `json:"endTime" gorm:"column:end_time" binding:"required"` // 结束时间 毫秒 + Interfaces string `json:"interfaces" gorm:"column:interfaces"` // 接口跟踪必须 例如 N8,N10 + IMSI string `json:"imsi" gorm:"column:imsi"` // 用户跟踪必须 + MSISDN string `json:"msisdn" gorm:"column:msisdn"` // 用户跟踪可选 + SrcIp string `json:"srcIp" gorm:"column:src_ip"` // 源地址IP + DstIp string `json:"dstIp" gorm:"column:dst_ip"` // 目标地址IP + CreateBy string `json:"createBy" gorm:"column:create_by"` // 创建者 + CreateTime int64 `json:"createTime" gorm:"column:create_time"` // 创建时间 + Remark string `json:"remark" gorm:"column:remark"` // 备注 + NeList string `json:"neList" gorm:"column:ne_list" binding:"required"` // 网元列表 neType_neId 例如 UDM_001,AMF_001 + NotifyUrl string `json:"notifyUrl" gorm:"column:notify_url"` // 信息数据通知回调地址UDP 例如udp:192.168.5.58:33033 } // TableName 表名称 diff --git a/src/modules/trace/model/trace_task_udp_msg.go b/src/modules/trace/model/trace_task_udp_msg.go new file mode 100644 index 00000000..99019c77 --- /dev/null +++ b/src/modules/trace/model/trace_task_udp_msg.go @@ -0,0 +1,42 @@ +package model + +type TraceDefType = int64 + +const ( + MSG_TYPE_REQ TraceDefType = 0 + MSG_TYPE_RSP TraceDefType = 1 + MSG_DIRECT_RECV TraceDefType = 0 + MSG_DIRECT_SEND TraceDefType = 1 + // NF_TYPE_AMF = "AMF" + // NF_TYPE_SMF = "SMF" + // NF_TYPE_AUSF = "AUSF" + // NF_TYPE_UDM = "UDM" + // NF_TYPE_PCF = "PCF" +) + +// TraceMsg 跟踪_消息 +type TraceMsg struct { + TraceId int + MsgType TraceDefType //0-req, 1-rsp + MsgDirect TraceDefType //0-recv,1-send + NfName string + MsgEvent string //e.g: CreateContextReq + IMSI string + IfType string //interface type: N2,N8... + SrcIpStr string + DstIpStr string + Timestamp int64 +} + +// TraceReq 跟踪_请求 +type TraceReq struct { + TraceId int64 `json:"traceId"` // Unique identifier for trace operations:delete... + TraceType string `json:"traceType"` // Type of trace: Interface;UE; Device + StartTime string `json:"startTime"` // Start time of the trace: e.g., "2025-04-12 11:24:59" + EndTime string `json:"endTime"` // End time of the trace: e.g., "2025-04-12 14:24:59" + Interfaces []string `json:"interfaces,omitempty"` // List of interfaces: e.g., ["N8", "N10", "N1/N2"] + IMSI string `json:"imsi,omitempty"` // International Mobile Subscriber Identity + SrcIp string `json:"srcIp,omitempty"` // Source IP address (used for device-level filtering) + DstIp string `json:"dstIp,omitempty"` // Destination IP address (used for device-level filtering) + NotifyUrl string `json:"notifyUrl,omitempty"` // Notification URL: e.g., "udp:192.168.5.58:33033" +} diff --git a/src/modules/trace/repository/trace_data.go b/src/modules/trace/repository/trace_data.go index 442002af..bd65b76f 100644 --- a/src/modules/trace/repository/trace_data.go +++ b/src/modules/trace/repository/trace_data.go @@ -2,6 +2,7 @@ package repository import ( "fmt" + "strings" "be.ems/src/framework/database/db" "be.ems/src/framework/logger" @@ -18,11 +19,8 @@ type TraceData struct{} func (r TraceData) SelectByPage(query map[string]string) ([]model.TraceData, int64) { tx := db.DB("").Model(&model.TraceData{}) // 查询条件拼接 - if v, ok := query["imsi"]; ok && v != "" { - tx = tx.Where("imsi like ?", fmt.Sprintf("%s%%", v)) - } - if v, ok := query["msisdn"]; ok && v != "" { - tx = tx.Where("msisdn like ?", fmt.Sprintf("%s%%", v)) + if v, ok := query["traceId"]; ok && v != "" { + tx = tx.Where("trace_id = ?", v) } if v, ok := query["startTime"]; ok && v != "" { if len(v) == 10 { @@ -53,7 +51,33 @@ func (r TraceData) SelectByPage(query map[string]string) ([]model.TraceData, int // 查询数据分页 pageNum, pageSize := db.PageNumSize(query["pageNum"], query["pageSize"]) tx = tx.Limit(pageSize).Offset(pageSize * pageNum) - err := tx.Find(&rows).Error + // 排序 + sortByStr, sortOk := query["sortBy"] + sortOrderStr, orderOk := query["sortOrder"] + if sortOk && sortByStr != "" && orderOk && sortOrderStr != "" { + sortByArr := strings.Split(sortByStr, ",") + sortOrderArr := strings.Split(sortOrderStr, ",") + for i := range sortByArr { + sortBy := sortByArr[i] + sortOrder := sortOrderArr[i] + // 排序字段 + sort := "id" + if sortBy == "timestamp" { + sort = "timestamp" + } + // 排序方式 + order := "ASC" + if strings.HasPrefix(sortOrder, "asc") { + order = "ASC" + } else if strings.HasPrefix(sortOrder, "desc") { + order = "DESC" + } + tx = tx.Order(fmt.Sprintf("%s %s", sort, order)) + } + } else { + tx = tx.Order("id desc") + } + err := tx.Omit("raw_msg").Find(&rows).Error if err != nil { logger.Errorf("query find err => %v", err.Error()) return rows, total @@ -100,3 +124,16 @@ func (r TraceData) DeleteByIds(ids []int64) int64 { } return tx.RowsAffected } + +// DeleteByTraceId 删除跟踪任务数据 返回受影响行数 +func (r TraceData) DeleteByTraceId(traceId string) int64 { + if traceId == "" || traceId == "" { + return 0 + } + tx := db.DB("").Where("trace_id = ?", traceId) + if err := tx.Delete(&model.TraceData{}).Error; err != nil { + logger.Errorf("delete err => %v", err.Error()) + return 0 + } + return tx.RowsAffected +} diff --git a/src/modules/trace/repository/trace_task.go b/src/modules/trace/repository/trace_task.go index 6cff2ff8..2233420e 100644 --- a/src/modules/trace/repository/trace_task.go +++ b/src/modules/trace/repository/trace_task.go @@ -2,6 +2,7 @@ package repository import ( "fmt" + "strings" "time" "be.ems/src/framework/database/db" @@ -19,8 +20,8 @@ type TraceTask struct{} func (r TraceTask) SelectByPage(query map[string]string) ([]model.TraceTask, int64) { tx := db.DB("").Model(&model.TraceTask{}) // 查询条件拼接 - if v, ok := query["neType"]; ok && v != "" { - tx = tx.Where("ne_type = ?", v) + if v, ok := query["traceType"]; ok && v != "" { + tx = tx.Where("trace_type = ?", v) } if v, ok := query["imsi"]; ok && v != "" { tx = tx.Where("imsi like ?", fmt.Sprintf("%s%%", v)) @@ -29,10 +30,20 @@ func (r TraceTask) SelectByPage(query map[string]string) ([]model.TraceTask, int tx = tx.Where("msisdn like ?", fmt.Sprintf("%s%%", v)) } if v, ok := query["startTime"]; ok && v != "" { - tx = tx.Where("start_time >= ?", v) + if len(v) == 10 { + v = fmt.Sprintf("%s000", v) + tx = tx.Where("start_time >= ?", v) + } else if len(v) == 13 { + tx = tx.Where("start_time >= ?", v) + } } if v, ok := query["endTime"]; ok && v != "" { - tx = tx.Where("end_time >= ?", v) + if len(v) == 10 { + v = fmt.Sprintf("%s999", v) + tx = tx.Where("end_time <= ?", v) + } else if len(v) == 13 { + tx = tx.Where("end_time <= ?", v) + } } // 查询结果 @@ -47,18 +58,33 @@ func (r TraceTask) SelectByPage(query map[string]string) ([]model.TraceTask, int // 查询数据分页 pageNum, pageSize := db.PageNumSize(query["pageNum"], query["pageSize"]) tx = tx.Limit(pageSize).Offset(pageSize * pageNum) - // 排序 - if v, ok := query["sortField"]; ok && v != "" { - sortSql := v - if v, ok := query["sortOrder"]; ok && v != "" { - if v == "desc" { - sortSql += " desc " - } else { - sortSql += " asc " + sortByStr, sortOk := query["sortBy"] + sortOrderStr, orderOk := query["sortOrder"] + if sortOk && sortByStr != "" && orderOk && sortOrderStr != "" { + sortByArr := strings.Split(sortByStr, ",") + sortOrderArr := strings.Split(sortOrderStr, ",") + for i := range sortByArr { + sortBy := sortByArr[i] + sortOrder := sortOrderArr[i] + // 排序字段 + sort := "id" + if sortBy == "operaBy" { + sort = "opera_by" + } else if sortBy == "endTime" { + sort = "end_time" } + // 排序方式 + order := "ASC" + if strings.HasPrefix(sortOrder, "asc") { + order = "ASC" + } else if strings.HasPrefix(sortOrder, "desc") { + order = "DESC" + } + tx = tx.Order(fmt.Sprintf("%s %s", sort, order)) } - tx.Order(sortSql) + } else { + tx = tx.Order("id desc") } err := tx.Find(&rows).Error @@ -90,8 +116,6 @@ func (r TraceTask) SelectByIds(ids []int64) []model.TraceTask { func (r TraceTask) Insert(param model.TraceTask) int64 { if param.CreateBy != "" { ms := time.Now().UnixMilli() - param.UpdateBy = param.CreateBy - param.UpdateTime = ms param.CreateTime = ms } // 执行插入 @@ -102,26 +126,6 @@ func (r TraceTask) Insert(param model.TraceTask) int64 { return param.ID } -// Update 修改信息 返回受影响行数 -func (r TraceTask) Update(param model.TraceTask) int64 { - if param.ID <= 0 { - return 0 - } - if param.UpdateBy != "" { - param.UpdateTime = time.Now().UnixMilli() - } - tx := db.DB("").Model(&model.TraceTask{}) - // 构建查询条件 - tx = tx.Where("id = ?", param.ID) - tx = tx.Omit("id", "create_by", "create_time") - // 执行更新 - if err := tx.Updates(param).Error; err != nil { - logger.Errorf("update err => %v", err.Error()) - return 0 - } - return tx.RowsAffected -} - // DeleteByIds 批量删除信息 返回受影响行数 func (r TraceTask) DeleteByIds(ids []int64) int64 { if len(ids) <= 0 { diff --git a/src/modules/trace/service/trace_task.go b/src/modules/trace/service/trace_task.go index 03948e2e..dfd2643c 100644 --- a/src/modules/trace/service/trace_task.go +++ b/src/modules/trace/service/trace_task.go @@ -2,7 +2,6 @@ package service import ( "encoding/base64" - "encoding/json" "fmt" "net" "strings" @@ -13,6 +12,7 @@ import ( "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/parse" neFetchlink "be.ems/src/modules/network_element/fetch_link" + neModel "be.ems/src/modules/network_element/model" neService "be.ems/src/modules/network_element/service" "be.ems/src/modules/trace/model" "be.ems/src/modules/trace/repository" @@ -37,20 +37,11 @@ type TraceTask struct { } // CreateUDP 创建UDP数据通道 -func (r *TraceTask) CreateUDP() error { +func (r TraceTask) CreateUDP() error { // 跟踪配置是否开启 - if v := config.Get("trace.enabled"); v != nil { - if !v.(bool) { - return nil - } - } - host := "127.0.0.1" - if v := config.Get("trace.host"); v != nil { - host = v.(string) - } - var port int64 = 33033 - if v := config.Get("trace.port"); v != nil { - port = parse.Number(v) + host, port, err := r.traceNotify() + if err != nil { + return err } // 初始化UDP服务 @@ -67,122 +58,116 @@ func (r *TraceTask) CreateUDP() error { } // 读取数据 - buf := make([]byte, 2048) + buf := make([]byte, 10*1048) n, addr, err := conn.ReadFromUDPAddrPort(buf) if err != nil { logger.Errorf("UDP Resolve ReadFromUDPAddrPort Error: %s", err.Error()) return } - logger.Infof("socket UDP: %s", string(buf[:n])) - // logger.Infof("socket UDP Base64: %s", base64.StdEncoding.EncodeToString(buf[:n])) - mData, err := UDPDataHandler(buf, n) - if err != nil { - logger.Errorf("UDP Resolve UDPDataHandler Error: %s", err.Error()) + // logger.Infof("socket UDP: %s", string(buf[:n])) + logger.Infof("socket UDP Base64 Encode: %s", base64.StdEncoding.EncodeToString(buf[:n])) + + // 解析数据 + if err := r.pasreUDPData(buf[:n]); err != nil { + logger.Errorf("UDP Resolve UDPData Error: %s", err.Error()) return } - taskId := parse.Number(mData["taskId"]) - - // 插入数据库做记录 - r.traceDataRepository.Insert(model.TraceData{ - TaskId: taskId, - IMSI: mData["imsi"].(string), - SrcAddr: mData["srcAddr"].(string), - DstAddr: mData["dstAddr"].(string), - IfType: parse.Number(mData["ifType"]), - MsgType: parse.Number(mData["msgType"]), - MsgDirect: parse.Number(mData["msgDirect"]), - Length: parse.Number(mData["dataLen"]), - RawMsg: mData["dataInfo"].(string), - Timestamp: parse.Number(mData["timestamp"]), - DecMsg: mData["decMsg"].(string), - }) - - // 推送文件 - if v, ok := mData["pcapFile"]; ok && v != "" { - logger.Infof("pcapFile: %s", v) - wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%d", wsService.GROUP_TRACE_NE, taskId), taskId) - } // 发送响应 if _, err := conn.WriteToUDPAddrPort([]byte("udp>"), addr); err != nil { logger.Errorf("UDP Resolve WriteToUDPAddrPort Error: %s", err.Error()) } + buf = nil }) - // ============ 测试接收网元UDP发过来的数据 - // 初始化TCP服务 后续调整TODO - tcpService := socket.SocketTCP{Addr: host, Port: port + 1} - if _, err := tcpService.New(); err != nil { - return err - } - // 接收处理TCP数据 - go tcpService.Resolve(func(conn *net.Conn, err error) { - if err != nil { - logger.Errorf("TCP Resolve %s", err.Error()) - return + // ============ 本地测试接收网元UDP发过来的数据 后续调整TODO + if config.Env() == "local" { + // 初始化TCP服务 + tcpService := socket.SocketTCP{Addr: host, Port: port + 1} + if _, err := tcpService.New(); err != nil { + return err } + // 接收处理TCP数据 + go tcpService.Resolve(func(conn *net.Conn, err error) { + if err != nil { + logger.Errorf("TCP Resolve %s", err.Error()) + return + } - c := (*conn) - // 读取数据 - buf := make([]byte, 2048) - n, err := c.Read(buf) - if err != nil { - logger.Errorf("TCP Resolve Read Error: %s", err.Error()) - return - } + c := (*conn) + // 读取数据 + buf := make([]byte, 2048) + n, err := c.Read(buf) + if err != nil { + logger.Errorf("TCP Resolve Read Error: %s", err.Error()) + return + } - logger.Infof("socket TCP: %s", string(buf[:n])) - deData, _ := base64.StdEncoding.DecodeString(string(buf[:n])) - logger.Infof("socket TCP Base64: %s", deData) - mData, err := UDPDataHandler(deData, len(deData)) - if err != nil { - logger.Errorf("TCP Resolve UDPDataHandler Error: %s", err.Error()) - return - } - taskId := parse.Number(mData["taskId"]) + // logger.Infof("socket TCP: %s", string(buf[:n])) + deData, _ := base64.StdEncoding.DecodeString(string(buf[:n])) + // logger.Infof("socket TCP Base64 Decode: %s", deData) - // 插入数据库做记录 - r.traceDataRepository.Insert(model.TraceData{ - TaskId: taskId, - IMSI: mData["imsi"].(string), - SrcAddr: mData["srcAddr"].(string), - DstAddr: mData["dstAddr"].(string), - IfType: parse.Number(mData["ifType"]), - MsgType: parse.Number(mData["msgType"]), - MsgDirect: parse.Number(mData["msgDirect"]), - Length: parse.Number(mData["dataLen"]), - RawMsg: mData["dataInfo"].(string), - Timestamp: parse.Number(mData["timestamp"]), - DecMsg: mData["decMsg"].(string), + if err := r.pasreUDPData(deData); err != nil { + logger.Errorf("TCP Resolve UDPData Error: %s", err.Error()) + return + } + + // 发送响应 + if _, err = c.Write([]byte("tcp>")); err != nil { + logger.Errorf("TCP Resolve Write Error: %s", err.Error()) + } + buf = nil }) + } - // 推送文件 - if v, ok := mData["pcapFile"]; ok && v != "" { - logger.Infof("pcapFile: %s", v) - wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%d", wsService.GROUP_TRACE_NE, taskId), taskId) - } - - // 发送响应 - if _, err = c.Write([]byte("tcp>")); err != nil { - logger.Errorf("TCP Resolve Write Error: %s", err.Error()) - } - }) return nil } +// pasreUDPData 解析数据 +func (r TraceTask) pasreUDPData(buf []byte) error { + data, err := traceHandler(buf) + if err != nil { + logger.Errorf("UDP Resolve UDPDataHandler Error: %s", err.Error()) + return err + } + taskId := parse.Number(data.NfTraceMsg.TraceId) + + // 插入数据库做记录 + item := model.TraceData{ + TraceId: taskId, + IMSI: data.NfTraceMsg.IMSI, + SrcAddr: data.NfTraceMsg.SrcIpStr, + DstAddr: data.NfTraceMsg.DstIpStr, + IfType: data.NfTraceMsg.IfType, + MsgType: data.NfTraceMsg.MsgType, + MsgDirect: data.NfTraceMsg.MsgDirect, + MsgNe: data.NfTraceMsg.NfName, + MsgEvent: data.NfTraceMsg.MsgEvent, + Length: int64(data.TracePayloadLen), + RawMsg: base64.StdEncoding.EncodeToString(data.TracePayload), + Timestamp: data.NfTraceMsg.Timestamp, + } + item.ID = r.traceDataRepository.Insert(item) + + // 推送到ws订阅组 + item.RawMsg = "" // 不推送原始数据 + wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%d", wsService.GROUP_TRACE_NE, taskId), item) + return err +} + // CloseUDP 关闭UDP数据通道 -func (r *TraceTask) CloseUDP() { +func (r TraceTask) CloseUDP() { r.udpService.Close() } // FindByPage 根据条件分页查询 -func (r *TraceTask) FindByPage(query map[string]string) ([]model.TraceTask, int64) { +func (r TraceTask) FindByPage(query map[string]string) ([]model.TraceTask, int64) { return r.traceTaskRepository.SelectByPage(query) } // FindById 通过ID查询 -func (r *TraceTask) FindById(id int64) model.TraceTask { +func (r TraceTask) FindById(id int64) model.TraceTask { tasks := r.traceTaskRepository.SelectByIds([]int64{id}) if len(tasks) > 0 { return tasks[0] @@ -191,12 +176,52 @@ func (r *TraceTask) FindById(id int64) model.TraceTask { } // Insert 新增信息 -func (r *TraceTask) Insert(task model.TraceTask) error { +func (r TraceTask) Insert(task model.TraceTask) error { // 跟踪配置是否开启 - if v := config.Get("trace.enabled"); v != nil { - if !v.(bool) { - return fmt.Errorf("tracking is not enabled") + host, port, err := r.traceNotify() + if err != nil { + return err + } + task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port) + // 网元列表 + neList := strings.Split(task.NeList, ",") + if len(neList) <= 0 { + return fmt.Errorf("ne list is empty") + } + // 生成任务ID + traceId := r.traceTaskRepository.LastID() + 1 // 生成任务ID < 65535 + task.TraceId = fmt.Sprint(traceId) + + // 发送任务给网元 + for _, neTypeID := range neList { + neTypeIDArr := strings.Split(neTypeID, "_") + if len(neTypeIDArr) != 2 { + logger.Warnf("ne type id is error") + continue } + neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) + if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { + logger.Warnf("ne type id is not exist") + continue + } + if err := r.createTaskToNe(neInfo, task); err != nil { + logger.Errorf("task to %s error: %s", neTypeID, err.Error()) + return fmt.Errorf("task to %s error: %s", neTypeID, err.Error()) + } + } + + // 插入数据库 + insertId := r.traceTaskRepository.Insert(task) + if insertId <= 0 { + return fmt.Errorf("insert task error") + } + return nil +} + +// traceNotify 网元通知地址 +func (r TraceTask) traceNotify() (string, int64, error) { + if v := config.Get("trace.enabled"); !parse.Boolean(v) { + return "", 0, fmt.Errorf("trace is not enabled") } host := "127.0.0.1" if v := config.Get("trace.host"); v != nil { @@ -206,120 +231,43 @@ func (r *TraceTask) Insert(task model.TraceTask) error { if v := config.Get("trace.port"); v != nil { port = parse.Number(v) } - task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port) + return host, port, nil +} - // 查询网元获取IP - neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(task.NeType, task.NeId) - if neInfo.NeId != task.NeId || neInfo.IP == "" { - return fmt.Errorf("app.common.noNEInfo") - } - traceId := r.traceTaskRepository.LastID() + 1 // 生成任务ID < 65535 - task.TraceId = fmt.Sprint(traceId) - - // 发送任务给网元 - data := map[string]any{ - "neType": neInfo.NeType, - "neId": neInfo.NeId, - "notifyUrl": task.NotifyUrl, - "id": traceId, - "startTime": date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS), - "endTime": date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS), +// createTaskToNe 网元创建任务 +func (r TraceTask) createTaskToNe(neInfo neModel.NeInfo, task model.TraceTask) error { + data := model.TraceReq{ + TraceId: parse.Number(task.TraceId), + NotifyUrl: task.NotifyUrl, + StartTime: date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS), + EndTime: date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS), } switch task.TraceType { case "1": // Interface - data["traceType"] = "Interface" - data["interfaces"] = strings.Split(task.Interfaces, ",") + data.TraceType = "Interface" + data.Interfaces = strings.Split(task.Interfaces, ",") case "2": // Device - data["traceType"] = "Device" - data["ueIp"] = task.UeIp - data["srcIp"] = task.SrcIp - data["dstIp"] = task.DstIp - data["signalPort"] = task.SignalPort - task.UeIp = neInfo.IP + data.TraceType = "Device" + data.SrcIp = task.SrcIp + data.DstIp = task.DstIp case "3": // UE - data["traceType"] = "UE" - data["imsi"] = task.IMSI - data["msisdn"] = task.MSISDN + data.TraceType = "UE" + data.IMSI = fmt.Sprintf("imsi-%s", task.IMSI) default: - return fmt.Errorf("trace type is not disabled") + return fmt.Errorf("trace type is not support") } msg, err := neFetchlink.NeTraceAdd(neInfo, data) if err != nil { return err } - s, _ := json.Marshal(msg) - task.FetchMsg = string(s) - - // 插入数据库 - r.traceTaskRepository.Insert(task) - return nil -} - -// Update 修改信息 -func (r *TraceTask) Update(task model.TraceTask) error { - // 跟踪配置是否开启 - if v := config.Get("trace.enabled"); v != nil { - if !v.(bool) { - return fmt.Errorf("tracking is not enabled") - } + if v, ok := msg["cause"]; ok { + return fmt.Errorf("trace task add failed, %v", v) } - host := "127.0.0.1" - if v := config.Get("trace.host"); v != nil { - host = v.(string) - } - var port int64 = 33033 - if v := config.Get("trace.port"); v != nil { - port = parse.Number(v) - } - task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port) - - // 查询网元获取IP - neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(task.NeType, task.NeId) - if neInfo.NeId != task.NeId || neInfo.IP == "" { - return fmt.Errorf("app.common.noNEInfo") - } - - // 查询网元任务信息 - if msg, err := neFetchlink.NeTraceInfo(neInfo, task.TraceId); err == nil { - s, _ := json.Marshal(msg) - task.FetchMsg = string(s) - // 修改任务信息 - data := map[string]any{ - "neType": neInfo.NeType, - "neId": neInfo.NeId, - "notifyUrl": task.NotifyUrl, - "id": parse.Number(task.TraceId), - "startTime": date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS), - "endTime": date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS), - } - switch task.TraceType { - case "1": // Interface - data["traceType"] = "Interface" - data["interfaces"] = strings.Split(task.Interfaces, ",") - case "2": // Device - task.UeIp = neInfo.IP - data["traceType"] = "Device" - data["ueIp"] = task.UeIp - data["srcIp"] = task.SrcIp - data["dstIp"] = task.DstIp - data["signalPort"] = task.SignalPort - case "3": // UE - data["traceType"] = "UE" - data["imsi"] = task.IMSI - data["msisdn"] = task.MSISDN - default: - return fmt.Errorf("trace type is not disabled") - } - neFetchlink.NeTraceEdit(neInfo, data) - } - - // 更新数据库 - r.traceTaskRepository.Update(task) return nil } // DeleteByIds 批量删除信息 -func (r *TraceTask) DeleteByIds(ids []int64) (int64, error) { +func (r TraceTask) DeleteByIds(ids []int64) (int64, error) { // 检查是否存在 rows := r.traceTaskRepository.SelectByIds(ids) if len(rows) <= 0 { @@ -327,14 +275,30 @@ func (r *TraceTask) DeleteByIds(ids []int64) (int64, error) { } if len(rows) == len(ids) { - // 停止任务 + // 删除数据同时给网元发送停止任务 for _, v := range rows { - neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(v.NeType, v.NeId) - if neInfo.NeId != v.NeId || neInfo.IP == "" { + // 删除数据 + r.traceDataRepository.DeleteByTraceId(v.TraceId) + + // 网元列表 + neList := strings.Split(v.NeList, ",") + if len(neList) <= 0 { continue } - neFetchlink.NeTraceDelete(neInfo, v.TraceId) + // 停止任务 + for _, neTypeID := range neList { + neTypeIDArr := strings.Split(neTypeID, "_") + if len(neTypeIDArr) != 2 { + continue + } + neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) + if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { + continue + } + neFetchlink.NeTraceDelete(neInfo, v.TraceId) + } } + num := r.traceTaskRepository.DeleteByIds(ids) return num, nil } diff --git a/src/modules/trace/service/trace_task_udp_data.go b/src/modules/trace/service/trace_task_udp_data.go index 2f007f38..2722566c 100644 --- a/src/modules/trace/service/trace_task_udp_data.go +++ b/src/modules/trace/service/trace_task_udp_data.go @@ -1,334 +1,271 @@ package service import ( - "encoding/base64" + "bytes" "encoding/binary" "fmt" + "net" "os" "path/filepath" "runtime" - "strings" "time" - "golang.org/x/net/http/httpguts" - "golang.org/x/net/http2/hpack" + "be.ems/src/modules/trace/model" ) -const ( - GTPU_V1_VERSION = 1 << 5 - GTPU_VER_MASK = 7 << 5 - GTPU_PT_GTP = 1 << 4 - GTPU_HEADER_LEN = 12 - GTPU_E_S_PB_BIT = 7 - GTPU_E_BI = 1 << 2 -) - -const ( - GTPU_HEADER_VERSION_INDEX = 0 - GTPU_HEADER_MSG_TYPE_INDEX = 1 - GTPU_HEADER_LENGTH_INDEX = 2 - GTPU_HEADER_TEID_INDEX = 4 -) - -type ExtHeader struct { - TaskId uint32 - IMSI string - IfType int - MsgType int - MsgDirect int // 0-recv,1-send - TimeStamp int64 - SrcIP string - DstIP string - SrcPort uint16 - DstPort uint16 - Proto int // Protocol - PPI int // only for SCTP - DataLen uint16 - DataInfo []byte +// TraceMsgToOamTraceData 结构体定义 +type TraceMsgToOamTraceData struct { + NfTraceMsg *model.TraceMsg + TimestampStr string + TracePayloadLen uint16 + TracePayload []byte } -// parseUDPData 解析UDP数据 -func parseUDPData(rvMsg []byte, rvLen int) (ExtHeader, error) { - var extHdr ExtHeader - // var tr dborm.TraceData - var off int - msg := rvMsg - - verFlags := msg[GTPU_HEADER_VERSION_INDEX] - - gtpuHdrLen := GTPU_HEADER_LEN - - localTeid := binary.BigEndian.Uint32(msg[GTPU_HEADER_TEID_INDEX:]) - - extHdr.TaskId = localTeid - - if (verFlags & GTPU_E_S_PB_BIT) != 0 { - if (verFlags & GTPU_E_BI) != 0 { - extTypeIndex := GTPU_HEADER_LEN - 1 - - extType := msg[extTypeIndex] - - if extType == 0xFE { - extHdr.IMSI = string(msg[extTypeIndex+2 : extTypeIndex+17]) - extHdr.IfType = int(msg[extTypeIndex+17]) - extHdr.MsgType = int(msg[extTypeIndex+18]) - extHdr.MsgDirect = int(msg[extTypeIndex+19]) - - extHdr.TimeStamp = time.Now().UTC().UnixMilli() - // extHdr.TimeStamp = int64(binary.BigEndian.Uint64(msg[extTypeIndex+19:])) - // fmt.Printf("ext info %v %s %d %d %d \n", msg[(extTypeIndex+2):(extTypeIndex+20)], extHdr.IMSI, extHdr.IfType, extHdr.MsgType, extHdr.MsgDirect) - // set offset of IP Packet - off = 40 + 4 - //src ip: msg+40+12 - extHdr.SrcIP = fmt.Sprintf("%d.%d.%d.%d", msg[off+12], msg[off+13], msg[off+14], msg[off+15]) - //dst ip: msg+40+12+4 - extHdr.DstIP = fmt.Sprintf("%d.%d.%d.%d", msg[off+16], msg[off+17], msg[off+18], msg[off+19]) - extHdr.SrcPort = uint16(binary.BigEndian.Uint16(msg[off+20:])) - extHdr.DstPort = uint16(binary.BigEndian.Uint16(msg[off+22:])) - // fmt.Printf("info %s:%d %s:%d \n", extHdr.SrcIP, extHdr.SrcPort, extHdr.DstIP, extHdr.DstPort) - // ip header start msg+40 - extHdr.DataLen = uint16(rvLen - off) - extHdr.DataInfo = make([]byte, int(rvLen-off)) - copy(extHdr.DataInfo, []byte(msg[off:])) - - // 132 SCTP - // 6 TCP - // 17 UDP - extHdr.Proto = int(msg[off+9]) - if extHdr.Proto == 132 { - extHdr.PPI = int(msg[off+47]) - extHdr.DataLen = uint16(binary.BigEndian.Uint16(msg[(off+34):]) - 16) - // fmt.Printf("dat len %d %d \n", extHdr.DataLen, extHdr.PPI) - } - } - - for extType != 0 && extTypeIndex < rvLen { - extLen := msg[extTypeIndex+1] << 2 - if extLen == 0 { - return extHdr, fmt.Errorf("error, extLen is zero") - } - - gtpuHdrLen += int(extLen) - extTypeIndex += int(extLen) - extType = msg[extTypeIndex] - } - } - } else { - gtpuHdrLen -= 4 - } - return extHdr, nil -} - -// UDPDataHandler UDP数据处理 -func UDPDataHandler(data []byte, n int) (map[string]any, error) { - extHdr, err := parseUDPData(data, n) +// traceHandler 处理跟踪数据 +func traceHandler(data []byte) (*TraceMsgToOamTraceData, error) { + decodeData, err := decodeTraceData(data) if err != nil { - return nil, err + return decodeData, err } - if extHdr.TaskId == 0 || extHdr.DataLen < 1 { - return nil, fmt.Errorf("data error") + fmt.Printf("TraceHandler get oamData: %s,%+v\n, payload=len(%d,%d)", decodeData.TimestampStr, decodeData.NfTraceMsg, decodeData.TracePayloadLen, len(decodeData.TracePayload)) + // Return parsed message and payload + if len(decodeData.TracePayload) != int(decodeData.TracePayloadLen) { + return decodeData, fmt.Errorf("trace payload is bad, len=%d, shall be:%d", len(decodeData.TracePayload), int(decodeData.TracePayloadLen)) } - m := map[string]any{ - "taskId": extHdr.TaskId, - "imsi": extHdr.IMSI, - "ifType": extHdr.IfType, - "srcAddr": fmt.Sprintf("%s:%d", extHdr.SrcIP, extHdr.SrcPort), - "dstAddr": fmt.Sprintf("%s:%d", extHdr.DstIP, extHdr.DstPort), - "msgType": extHdr.MsgType, - "msgDirect": extHdr.MsgDirect, - "timestamp": extHdr.TimeStamp, - "dataLen": extHdr.DataLen, - // "dataInfo": extHdr.DataInfo, - "decMsg": "", - } - // Base64 编码 - m["dataInfo"] = base64.StdEncoding.EncodeToString(extHdr.DataInfo) - - if extHdr.Proto == 6 { // TCP - // 取响应数据 - iplen := uint16(binary.BigEndian.Uint16(extHdr.DataInfo[2:])) - tcplen := uint16(iplen - 32 - 20) - hdrlen := uint16(binary.BigEndian.Uint16(extHdr.DataInfo[20+32+1:])) - offset := uint16(52) - // fmt.Printf("HTTP %d %d %d \n", iplen, tcplen, hdrlen) - if tcplen > (hdrlen + 9) { // has data - doffset := uint16(offset + hdrlen + 9) - datlen := uint16(binary.BigEndian.Uint16(extHdr.DataInfo[doffset+1:])) - // fmt.Printf("HTTP datlen %d \n", datlen) - m["decMsg"], _ = httpDataMsg(extHdr.DataInfo[offset+9:offset+9+hdrlen], extHdr.DataInfo[doffset+9:doffset+datlen+9]) - } else { - m["decMsg"], _ = httpDataMsg(extHdr.DataInfo[offset+9:hdrlen], nil) - } - } - - // pcap文件 - m["pcapFile"] = writePcap(extHdr) - return m, nil -} - -// =========== TCP协议Body =========== - -// httpDataMsg Http数据信息处理 -func httpDataMsg(header []byte, data []byte) (string, error) { - var remainSize = uint32(16 << 20) - var sawRegular bool - var invalid bool // pseudo header field errors - var Fields []hpack.HeaderField - - invalid = false - hdec := hpack.NewDecoder(4096, nil) - hdec.SetEmitEnabled(true) - hdec.SetMaxStringLength(int(16 << 20)) - hdec.SetEmitFunc(func(hf hpack.HeaderField) { - if !httpguts.ValidHeaderFieldValue(hf.Value) { - // Don't include the value in the error, because it may be sensitive. - invalid = true - } - isPseudo := strings.HasPrefix(hf.Name, ":") - if isPseudo { - if sawRegular { - invalid = true - } - } else { - sawRegular = true - if !validWireHeaderFieldName(hf.Name) { - invalid = true - } - } - - if invalid { - hdec.SetEmitEnabled(false) - return - } - - size := hf.Size() - if size > remainSize { - hdec.SetEmitEnabled(false) - //mh.Truncated = true - return - } - remainSize -= size - - Fields = append(Fields, hf) - }) - - // defer hdec.SetEmitFunc(func(hf hpack.HeaderField) {}) - - frag := header - if _, err := hdec.Write(frag); err != nil { - return "", err - } - - if err := hdec.Close(); err != nil { - return "", err - } - - // hdec.SetEmitFunc(func(hf hpack.HeaderField) {}) - - var headers []byte - var line string - for i := range Fields { - line = fmt.Sprintf("\"%s\":\"%s\",", Fields[i].Name, Fields[i].Value) - headers = append(headers, []byte(line)...) - } - - if len(data) > 0 { - return fmt.Sprintf("{ %s \"content\":%s }", string(headers), string(data)), nil - } else { - return fmt.Sprintf("{ %s }", string(headers)), nil - } -} - -// validWireHeaderFieldName 校验报文头字段名称 -func validWireHeaderFieldName(v string) bool { - if len(v) == 0 { - return false - } - for _, r := range v { - if !httpguts.IsTokenRune(r) { - return false - } - if 'A' <= r && r <= 'Z' { - return false - } - } - return true -} - -// =========== writePcap 写Pcap文件 =========== - -const magicMicroseconds = 0xA1B2C3D4 -const versionMajor = 2 -const versionMinor = 4 - -func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) error { - var err error - var file *os.File - if err := os.MkdirAll(filepath.Dir(filename), 0775); err != nil { - return err - } - if _, err = os.Stat(filename); os.IsNotExist(err) { - file, err = os.Create(filename) - // File Header - var fileHeaderBuf [24]byte - binary.LittleEndian.PutUint32(fileHeaderBuf[0:4], magicMicroseconds) - binary.LittleEndian.PutUint16(fileHeaderBuf[4:6], versionMajor) - binary.LittleEndian.PutUint16(fileHeaderBuf[6:8], versionMinor) - // bytes 8:12 stay 0 (timezone = UTC) - // bytes 12:16 stay 0 (sigfigs is always set to zero, according to - // http://wiki.wireshark.org/Development/LibpcapFileFormat - binary.LittleEndian.PutUint32(fileHeaderBuf[16:20], 0x00040000) - binary.LittleEndian.PutUint32(fileHeaderBuf[20:24], 0x00000071) - if _, err := file.Write(fileHeaderBuf[:]); err != nil { - return err - } - } else { - file, err = os.OpenFile(filename, os.O_WRONLY|os.O_APPEND, 0666) - } - if err != nil { - return err - } - defer file.Close() - - // Packet Header - var packetHeaderBuf [24]byte - t := time.UnixMilli(timeStamp) - if t.IsZero() { - t = time.Now() - } - secs := t.Unix() - usecs := t.Nanosecond() / 1000 - binary.LittleEndian.PutUint32(packetHeaderBuf[0:4], uint32(secs)) - binary.LittleEndian.PutUint32(packetHeaderBuf[4:8], uint32(usecs)) - binary.LittleEndian.PutUint32(packetHeaderBuf[8:12], uint32(length+16)) - binary.LittleEndian.PutUint32(packetHeaderBuf[12:16], uint32(length+16)) - if _, err := file.Write(packetHeaderBuf[:]); err != nil { - return err - } - - // 数据包内容的定义 - cooked := [...]byte{0x00, 0x00, 0x03, 0x04, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00} - if _, err := file.Write(cooked[:]); err != nil { - return err - } - - // Packet Data - if _, err := file.Write(data); err != nil { - return err - } - return nil -} - -// writePcap 写Pcap文件并返回文件路径 -func writePcap(extHdr ExtHeader) string { - filePath := fmt.Sprintf("/tmp/omc/trace/task_%d.pcap", extHdr.TaskId) + // 输出到文件 + filePath := fmt.Sprintf("/tmp/omc/trace/task_%d.pcap", decodeData.NfTraceMsg.TraceId) if runtime.GOOS == "windows" { filePath = fmt.Sprintf("C:%s", filePath) } - err := writeEmptyPcap(filePath, extHdr.TimeStamp, int(extHdr.DataLen), extHdr.DataInfo) - if err != nil { - return "" - } - return filePath + err = writePCAP(filePath, decodeData.NfTraceMsg.Timestamp, decodeData.TracePayload) + + return decodeData, err +} + +// decodeTraceData 解析跟踪数据 +func decodeTraceData(data []byte) (*TraceMsgToOamTraceData, error) { + buf := bytes.NewBuffer(data) + oamData := new(TraceMsgToOamTraceData) + msg := new(model.TraceMsg) + + // 1. Parse Flag (1 byte) + var flag uint8 + if err := binary.Read(buf, binary.BigEndian, &flag); err != nil { + return nil, err + } + + // Parse message type (bits 1-2) + switch flag & 0x3 { + case 0x1: + msg.MsgType = model.MSG_TYPE_REQ + case 0x2: + msg.MsgType = model.MSG_TYPE_RSP + } + + // Parse message direction (bits 3-4) + switch flag & 0xc { + case 0x4: + msg.MsgDirect = model.MSG_DIRECT_SEND + case 0x8: + msg.MsgDirect = model.MSG_DIRECT_RECV + } + + // 2. Parse TraceId (4 bytes) + var traceId uint32 + if err := binary.Read(buf, binary.BigEndian, &traceId); err != nil { + return nil, err + } + msg.TraceId = int(traceId) + + // 3. Parse IMSI (15 bytes fixed length) + imsi := make([]byte, 15) + if _, err := buf.Read(imsi); err != nil { + return nil, err + } + msg.IMSI = string(imsi) + + // 4. Parse IfType (1 byte) + ifTypeByte, err := buf.ReadByte() + if err != nil { + return nil, err + } + msg.IfType = byte2Iftype(ifTypeByte) + + // 5. Parse Timestamp (8 bytes) + if err := binary.Read(buf, binary.BigEndian, &msg.Timestamp); err != nil { + return nil, err + } + + // 6. Parse SrcIp (4 bytes IPv4) + srcIp := make([]byte, 4) + if flag&0x20 != 0 { + srcIp = make([]byte, 16) + } + if _, err := buf.Read(srcIp); err != nil { + return nil, err + } + msg.SrcIpStr = net.IP(srcIp).String() + // 7. Parse DstIp (4 bytes IPv4) + dstIp := make([]byte, 4) + if flag&0x20 != 0 { + dstIp = make([]byte, 16) + } + if _, err := buf.Read(dstIp); err != nil { + return nil, err + } + msg.DstIpStr = net.IP(dstIp).String() + + // 8. Parse NfName (variable length) + nfNameLen, err := buf.ReadByte() + if err != nil { + return nil, err + } + nfName := make([]byte, nfNameLen) + if _, err := buf.Read(nfName); err != nil { + return nil, err + } + msg.NfName = string(nfName) + + // 9. Parse MsgEvent (variable length) + msgEventLen, err := buf.ReadByte() + if err != nil { + return nil, err + } + msgEvent := make([]byte, msgEventLen) + if _, err := buf.Read(msgEvent); err != nil { + return nil, err + } + msg.MsgEvent = string(msgEvent) + + // 10. Parse ExtenBuf (variable length) + extenBufLen, err := buf.ReadByte() + if err != nil { + return nil, err + } + if extenBufLen > 0 { + extenBuf := make([]byte, extenBufLen) + if _, err := buf.Read(extenBuf); err != nil { + return nil, err + } + // TODO: Parse extended fields according to actual protocol + } + // 11. len of payload(2 byte) + var len uint16 + if err := binary.Read(buf, binary.BigEndian, &len); err != nil { + return nil, err + } + + oamData.NfTraceMsg = msg + oamData.TimestampStr = time.Unix(0, msg.Timestamp).Format("2006-01-02 15:04:05.999999999") + oamData.TracePayloadLen = len + oamData.TracePayload = buf.Bytes() + return oamData, nil +} + +// byte2Iftype 将字节转换为接口类型字符串 +// 参考:3GPP TS 32.422 V17.0.0 (2022-06) 5G; 3GPP TS 29.244 V17.0.0 (2022-06) 5G +func byte2Iftype(val uint8) string { + ret := "" + switch val { + case 1: + ret = "N1" + case 2: + ret = "N2" + case 3: + ret = "N1/N2" + case 4: + ret = "N4" + case 7: + ret = "N7" + case 8: + ret = "N8" + case 10: + ret = "N10" + case 11: + ret = "N11" + case 12: + ret = "N12" + case 13: + ret = "N13" + case 15: + ret = "N15" + } + return ret +} + +// writePCAP 写入PCAP文件 +func writePCAP(filename string, timeStamp int64, data []byte) error { + var err error + var file *os.File + + // 1. 创建目录(含权限控制) + if err := os.MkdirAll(filepath.Dir(filename), 0775); err != nil { + return err + } + + // 2. 智能打开文件(原子操作避免竞态) + if _, err = os.Stat(filename); os.IsNotExist(err) { + file, err = os.Create(filename) + if err != nil { + return err + } + // 写入PCAP全局头(严格兼容Wireshark) + fileHeader := [24]byte{ + 0xD4, 0xC3, 0xB2, 0xA1, // magic_number (微秒级) + 0x02, 0x00, 0x04, 0x00, // version_major(2) + version_minor(4) + 0x00, 0x00, 0x00, 0x00, // thiszone (UTC) + 0x00, 0x00, 0x00, 0x00, // sigfigs (固定0) + 0x00, 0x00, 0x04, 0x00, // snaplen (1024) + 0x71, 0x00, 0x00, 0x00, // network (LINKTYPE_LINUX_SLL) + } + if _, err := file.Write(fileHeader[:]); err != nil { + return err + } + } else { + // 追加模式打开(避免截断已有内容) + file, err = os.OpenFile(filename, os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0666) + if err != nil { + return err + } + } + defer file.Close() + + // 3. 构造Linux cooked头(RFC 3580规范) + linuxSLLHeaderLen := 16 // Linux cooked模式头长度 + cookedHeader := make([]byte, linuxSLLHeaderLen) + binary.BigEndian.PutUint16(cookedHeader[0:2], 0x0000) // 数据包类型(主机→网络) + binary.BigEndian.PutUint16(cookedHeader[2:4], 0x0304) // 地址类型(ARPHRD_ETHER) + binary.BigEndian.PutUint16(cookedHeader[4:6], 0x0008) // 协议类型(ETH_P_IP) + binary.BigEndian.PutUint16(cookedHeader[14:16], 0x0800) // 数据包类型(PACKET_HOST) + + // 4. 合并链路层头与数据(兼容IPv4选项) + fullData := append(cookedHeader, data...) + pktLen := len(fullData) + + // 5. 生成精确时间戳(处理闰秒和时区) + t := time.Unix(0, timeStamp).UTC() + if t.IsZero() { + t = time.Now().UTC() + } + secs := t.Unix() + usecs := t.Nanosecond() / 1000 // 微秒级时间戳 + + // 6. 构造PCAP报文头(16字节) + var packetHeader [16]byte + binary.LittleEndian.PutUint32(packetHeader[0:4], uint32(secs)) // 时间戳秒 + binary.LittleEndian.PutUint32(packetHeader[4:8], uint32(usecs)) // 时间戳微秒 + binary.LittleEndian.PutUint32(packetHeader[8:12], uint32(pktLen)) // 捕获长度 + binary.LittleEndian.PutUint32(packetHeader[12:16], uint32(pktLen)) // 原始长度 + + // 7. 原子写入操作(避免部分写入) + buf := new(bytes.Buffer) + buf.Write(packetHeader[:]) + buf.Write(fullData) + if _, err := file.Write(buf.Bytes()); err != nil { + return err + } + + // 8. 强制刷盘并验证写入 + return file.Sync() }