feat: 网元信令跟踪功能

This commit is contained in:
TsMask
2025-04-22 11:38:19 +08:00
parent 4914eccd79
commit fbcae7ac5a
37 changed files with 1712 additions and 3078 deletions

View File

@@ -2,7 +2,6 @@ package service
import (
"encoding/base64"
"encoding/json"
"fmt"
"net"
"strings"
@@ -13,6 +12,7 @@ import (
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/parse"
neFetchlink "be.ems/src/modules/network_element/fetch_link"
neModel "be.ems/src/modules/network_element/model"
neService "be.ems/src/modules/network_element/service"
"be.ems/src/modules/trace/model"
"be.ems/src/modules/trace/repository"
@@ -37,20 +37,11 @@ type TraceTask struct {
}
// CreateUDP 创建UDP数据通道
func (r *TraceTask) CreateUDP() error {
func (r TraceTask) CreateUDP() error {
// 跟踪配置是否开启
if v := config.Get("trace.enabled"); v != nil {
if !v.(bool) {
return nil
}
}
host := "127.0.0.1"
if v := config.Get("trace.host"); v != nil {
host = v.(string)
}
var port int64 = 33033
if v := config.Get("trace.port"); v != nil {
port = parse.Number(v)
host, port, err := r.traceNotify()
if err != nil {
return err
}
// 初始化UDP服务
@@ -67,123 +58,117 @@ func (r *TraceTask) CreateUDP() error {
}
// 读取数据
buf := make([]byte, 2048)
buf := make([]byte, 10*1048)
n, addr, err := conn.ReadFromUDPAddrPort(buf)
if err != nil {
logger.Errorf("UDP Resolve ReadFromUDPAddrPort Error: %s", err.Error())
return
}
logger.Infof("socket UDP: %s", string(buf[:n]))
// logger.Infof("socket UDP Base64: %s", base64.StdEncoding.EncodeToString(buf[:n]))
mData, err := UDPDataHandler(buf, n)
if err != nil {
logger.Errorf("UDP Resolve UDPDataHandler Error: %s", err.Error())
// logger.Infof("socket UDP: %s", string(buf[:n]))
logger.Infof("socket UDP Base64 Encode: %s", base64.StdEncoding.EncodeToString(buf[:n]))
// 解析数据
if err := r.pasreUDPData(buf[:n]); err != nil {
logger.Errorf("UDP Resolve UDPData Error: %s", err.Error())
return
}
taskId := parse.Number(mData["taskId"])
// 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{
TaskId: taskId,
IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string),
IfType: parse.Number(mData["ifType"]),
MsgType: parse.Number(mData["msgType"]),
MsgDirect: parse.Number(mData["msgDirect"]),
Length: parse.Number(mData["dataLen"]),
RawMsg: mData["dataInfo"].(string),
Timestamp: parse.Number(mData["timestamp"]),
DecMsg: mData["decMsg"].(string),
})
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE_NE, taskId), taskId)
}
// 发送响应
if _, err := conn.WriteToUDPAddrPort([]byte("udp>"), addr); err != nil {
logger.Errorf("UDP Resolve WriteToUDPAddrPort Error: %s", err.Error())
}
buf = nil
})
// ============ 测试接收网元UDP发过来的数据
// 初始化TCP服务 后续调整TODO
tcpService := socket.SocketTCP{Addr: host, Port: port + 1}
if _, err := tcpService.New(); err != nil {
return err
}
// 接收处理TCP数据
go tcpService.Resolve(func(conn *net.Conn, err error) {
if err != nil {
logger.Errorf("TCP Resolve %s", err.Error())
return
// ============ 本地测试接收网元UDP发过来的数据 后续调整TODO
if config.Env() == "local" {
// 初始化TCP服务
tcpService := socket.SocketTCP{Addr: host, Port: port + 1}
if _, err := tcpService.New(); err != nil {
return err
}
// 接收处理TCP数据
go tcpService.Resolve(func(conn *net.Conn, err error) {
if err != nil {
logger.Errorf("TCP Resolve %s", err.Error())
return
}
c := (*conn)
// 读取数据
buf := make([]byte, 2048)
n, err := c.Read(buf)
if err != nil {
logger.Errorf("TCP Resolve Read Error: %s", err.Error())
return
}
c := (*conn)
// 读取数据
buf := make([]byte, 2048)
n, err := c.Read(buf)
if err != nil {
logger.Errorf("TCP Resolve Read Error: %s", err.Error())
return
}
logger.Infof("socket TCP: %s", string(buf[:n]))
deData, _ := base64.StdEncoding.DecodeString(string(buf[:n]))
logger.Infof("socket TCP Base64: %s", deData)
mData, err := UDPDataHandler(deData, len(deData))
if err != nil {
logger.Errorf("TCP Resolve UDPDataHandler Error: %s", err.Error())
return
}
taskId := parse.Number(mData["taskId"])
// logger.Infof("socket TCP: %s", string(buf[:n]))
deData, _ := base64.StdEncoding.DecodeString(string(buf[:n]))
// logger.Infof("socket TCP Base64 Decode: %s", deData)
// 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{
TaskId: taskId,
IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string),
IfType: parse.Number(mData["ifType"]),
MsgType: parse.Number(mData["msgType"]),
MsgDirect: parse.Number(mData["msgDirect"]),
Length: parse.Number(mData["dataLen"]),
RawMsg: mData["dataInfo"].(string),
Timestamp: parse.Number(mData["timestamp"]),
DecMsg: mData["decMsg"].(string),
if err := r.pasreUDPData(deData); err != nil {
logger.Errorf("TCP Resolve UDPData Error: %s", err.Error())
return
}
// 发送响应
if _, err = c.Write([]byte("tcp>")); err != nil {
logger.Errorf("TCP Resolve Write Error: %s", err.Error())
}
buf = nil
})
}
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE_NE, taskId), taskId)
}
// 发送响应
if _, err = c.Write([]byte("tcp>")); err != nil {
logger.Errorf("TCP Resolve Write Error: %s", err.Error())
}
})
return nil
}
// pasreUDPData 解析数据
func (r TraceTask) pasreUDPData(buf []byte) error {
data, err := traceHandler(buf)
if err != nil {
logger.Errorf("UDP Resolve UDPDataHandler Error: %s", err.Error())
return err
}
taskId := parse.Number(data.NfTraceMsg.TraceId)
// 插入数据库做记录
item := model.TraceData{
TraceId: taskId,
IMSI: data.NfTraceMsg.IMSI,
SrcAddr: data.NfTraceMsg.SrcIpStr,
DstAddr: data.NfTraceMsg.DstIpStr,
IfType: data.NfTraceMsg.IfType,
MsgType: data.NfTraceMsg.MsgType,
MsgDirect: data.NfTraceMsg.MsgDirect,
MsgNe: data.NfTraceMsg.NfName,
MsgEvent: data.NfTraceMsg.MsgEvent,
Length: int64(data.TracePayloadLen),
RawMsg: base64.StdEncoding.EncodeToString(data.TracePayload),
Timestamp: data.NfTraceMsg.Timestamp,
}
item.ID = r.traceDataRepository.Insert(item)
// 推送到ws订阅组
item.RawMsg = "" // 不推送原始数据
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%d", wsService.GROUP_TRACE_NE, taskId), item)
return err
}
// CloseUDP 关闭UDP数据通道
func (r *TraceTask) CloseUDP() {
func (r TraceTask) CloseUDP() {
r.udpService.Close()
}
// SelectPage 根据条件分页查询
func (r *TraceTask) SelectPage(query map[string]any) map[string]any {
return r.traceTaskRepository.SelectPage(query)
// FindByPage 根据条件分页查询
func (r TraceTask) FindByPage(query map[string]string) ([]model.TraceTask, int64) {
return r.traceTaskRepository.SelectByPage(query)
}
// SelectById 通过ID查询
func (r *TraceTask) SelectById(id string) model.TraceTask {
tasks := r.traceTaskRepository.SelectByIds([]string{id})
// FindById 通过ID查询
func (r TraceTask) FindById(id int64) model.TraceTask {
tasks := r.traceTaskRepository.SelectByIds([]int64{id})
if len(tasks) > 0 {
return tasks[0]
}
@@ -191,12 +176,52 @@ func (r *TraceTask) SelectById(id string) model.TraceTask {
}
// Insert 新增信息
func (r *TraceTask) Insert(task model.TraceTask) error {
func (r TraceTask) Insert(task model.TraceTask) error {
// 跟踪配置是否开启
if v := config.Get("trace.enabled"); v != nil {
if !v.(bool) {
return fmt.Errorf("tracking is not enabled")
host, port, err := r.traceNotify()
if err != nil {
return err
}
task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port)
// 网元列表
neList := strings.Split(task.NeList, ",")
if len(neList) <= 0 {
return fmt.Errorf("ne list is empty")
}
// 生成任务ID
traceId := r.traceTaskRepository.LastID() + 1 // 生成任务ID < 65535
task.TraceId = fmt.Sprint(traceId)
// 发送任务给网元
for _, neTypeID := range neList {
neTypeIDArr := strings.Split(neTypeID, "_")
if len(neTypeIDArr) != 2 {
logger.Warnf("ne type id is error")
continue
}
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1])
if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" {
logger.Warnf("ne type id is not exist")
continue
}
if err := r.createTaskToNe(neInfo, task); err != nil {
logger.Errorf("task to %s error: %s", neTypeID, err.Error())
return fmt.Errorf("task to %s error: %s", neTypeID, err.Error())
}
}
// 插入数据库
insertId := r.traceTaskRepository.Insert(task)
if insertId <= 0 {
return fmt.Errorf("insert task error")
}
return nil
}
// traceNotify 网元通知地址
func (r TraceTask) traceNotify() (string, int64, error) {
if v := config.Get("trace.enabled"); !parse.Boolean(v) {
return "", 0, fmt.Errorf("trace is not enabled")
}
host := "127.0.0.1"
if v := config.Get("trace.host"); v != nil {
@@ -206,120 +231,43 @@ func (r *TraceTask) Insert(task model.TraceTask) error {
if v := config.Get("trace.port"); v != nil {
port = parse.Number(v)
}
task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port)
return host, port, nil
}
// 查询网元获取IP
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(task.NeType, task.NeId)
if neInfo.NeId != task.NeId || neInfo.IP == "" {
return fmt.Errorf("app.common.noNEInfo")
}
traceId := r.traceTaskRepository.LastID() + 1 // 生成任务ID < 65535
task.TraceId = fmt.Sprint(traceId)
// 发送任务给网元
data := map[string]any{
"neType": neInfo.NeType,
"neId": neInfo.NeId,
"notifyUrl": task.NotifyUrl,
"id": traceId,
"startTime": date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS),
"endTime": date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS),
// createTaskToNe 网元创建任务
func (r TraceTask) createTaskToNe(neInfo neModel.NeInfo, task model.TraceTask) error {
data := model.TraceReq{
TraceId: parse.Number(task.TraceId),
NotifyUrl: task.NotifyUrl,
StartTime: date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS),
EndTime: date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS),
}
switch task.TraceType {
case "1": // Interface
data["traceType"] = "Interface"
data["interfaces"] = strings.Split(task.Interfaces, ",")
data.TraceType = "Interface"
data.Interfaces = strings.Split(task.Interfaces, ",")
case "2": // Device
data["traceType"] = "Device"
data["ueIp"] = task.UeIp
data["srcIp"] = task.SrcIp
data["dstIp"] = task.DstIp
data["signalPort"] = task.SignalPort
task.UeIp = neInfo.IP
data.TraceType = "Device"
data.SrcIp = task.SrcIp
data.DstIp = task.DstIp
case "3": // UE
data["traceType"] = "UE"
data["imsi"] = task.IMSI
data["msisdn"] = task.MSISDN
data.TraceType = "UE"
data.IMSI = fmt.Sprintf("imsi-%s", task.IMSI)
default:
return fmt.Errorf("trace type is not disabled")
return fmt.Errorf("trace type is not support")
}
msg, err := neFetchlink.NeTraceAdd(neInfo, data)
if err != nil {
return err
}
s, _ := json.Marshal(msg)
task.FetchMsg = string(s)
// 插入数据库
r.traceTaskRepository.Insert(task)
return nil
}
// Update 修改信息
func (r *TraceTask) Update(task model.TraceTask) error {
// 跟踪配置是否开启
if v := config.Get("trace.enabled"); v != nil {
if !v.(bool) {
return fmt.Errorf("tracking is not enabled")
}
if v, ok := msg["cause"]; ok {
return fmt.Errorf("trace task add failed, %v", v)
}
host := "127.0.0.1"
if v := config.Get("trace.host"); v != nil {
host = v.(string)
}
var port int64 = 33033
if v := config.Get("trace.port"); v != nil {
port = parse.Number(v)
}
task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port)
// 查询网元获取IP
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(task.NeType, task.NeId)
if neInfo.NeId != task.NeId || neInfo.IP == "" {
return fmt.Errorf("app.common.noNEInfo")
}
// 查询网元任务信息
if msg, err := neFetchlink.NeTraceInfo(neInfo, task.TraceId); err == nil {
s, _ := json.Marshal(msg)
task.FetchMsg = string(s)
// 修改任务信息
data := map[string]any{
"neType": neInfo.NeType,
"neId": neInfo.NeId,
"notifyUrl": task.NotifyUrl,
"id": parse.Number(task.TraceId),
"startTime": date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS),
"endTime": date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS),
}
switch task.TraceType {
case "1": // Interface
data["traceType"] = "Interface"
data["interfaces"] = strings.Split(task.Interfaces, ",")
case "2": // Device
task.UeIp = neInfo.IP
data["traceType"] = "Device"
data["ueIp"] = task.UeIp
data["srcIp"] = task.SrcIp
data["dstIp"] = task.DstIp
data["signalPort"] = task.SignalPort
case "3": // UE
data["traceType"] = "UE"
data["imsi"] = task.IMSI
data["msisdn"] = task.MSISDN
default:
return fmt.Errorf("trace type is not disabled")
}
neFetchlink.NeTraceEdit(neInfo, data)
}
// 更新数据库
r.traceTaskRepository.Update(task)
return nil
}
// DeleteByIds 批量删除信息
func (r *TraceTask) DeleteByIds(ids []string) (int64, error) {
func (r TraceTask) DeleteByIds(ids []int64) (int64, error) {
// 检查是否存在
rows := r.traceTaskRepository.SelectByIds(ids)
if len(rows) <= 0 {
@@ -327,14 +275,30 @@ func (r *TraceTask) DeleteByIds(ids []string) (int64, error) {
}
if len(rows) == len(ids) {
// 停止任务
// 删除数据同时给网元发送停止任务
for _, v := range rows {
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(v.NeType, v.NeId)
if neInfo.NeId != v.NeId || neInfo.IP == "" {
// 删除数据
r.traceDataRepository.DeleteByTraceId(v.TraceId)
// 网元列表
neList := strings.Split(v.NeList, ",")
if len(neList) <= 0 {
continue
}
neFetchlink.NeTraceDelete(neInfo, v.TraceId)
// 停止任务
for _, neTypeID := range neList {
neTypeIDArr := strings.Split(neTypeID, "_")
if len(neTypeIDArr) != 2 {
continue
}
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1])
if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" {
continue
}
neFetchlink.NeTraceDelete(neInfo, v.TraceId)
}
}
num := r.traceTaskRepository.DeleteByIds(ids)
return num, nil
}