diff --git a/src/framework/socket/tcp_server.go b/src/framework/socket/tcp_server.go index 52a9b960..857af870 100644 --- a/src/framework/socket/tcp_server.go +++ b/src/framework/socket/tcp_server.go @@ -10,10 +10,10 @@ import ( // SocketTCP TCP服务端 type SocketTCP struct { - Addr string `json:"addr"` // 主机地址 - Port int64 `json:"port"` // 端口 - Listen *net.Listener `json:"listen"` - StopChan chan struct{} `json:"stop"` // 停止信号 + Addr string `json:"addr"` // 主机地址 + Port int64 `json:"port"` // 端口 + Listener *net.TCPListener `json:"listener"` + StopChan chan struct{} `json:"stop"` // 停止信号 } // New 创建TCP服务端 @@ -26,53 +26,52 @@ func (s *SocketTCP) New() (*SocketTCP, error) { } address := fmt.Sprintf("%s:%d", s.Addr, s.Port) - ln, err := net.Listen(proto, address) + // 解析 TCP 地址 + tcpAddr, err := net.ResolveTCPAddr(proto, address) if err != nil { return nil, err } - s.Listen = &ln + // 监听 TCP 地址 + listener, err := net.ListenTCP(proto, tcpAddr) + if err != nil { + return nil, err + } + + s.Listener = listener s.StopChan = make(chan struct{}, 1) return s, nil } // Close 关闭当前TCP服务端 func (s *SocketTCP) Close() { - if s.Listen != nil { + if s.Listener != nil { s.StopChan <- struct{}{} - (*s.Listen).Close() + (*s.Listener).Close() } } // Resolve 处理消息 -func (s *SocketTCP) Resolve(bufferSize int, callback func([]byte, int)) error { - if s.Listen == nil { +func (s *SocketTCP) Resolve(callback func(conn *net.Conn)) error { + if s.Listener == nil { return fmt.Errorf("tcp service not created") } - - ln := *s.Listen - buffer := make([]byte, bufferSize) + listener := *s.Listener for { select { case <-s.StopChan: return fmt.Errorf("udp service stop") default: - conn, err := ln.Accept() + conn, err := listener.Accept() if err != nil { logger.Errorf("Error accepting connection: %v ", err) continue } defer conn.Close() - // 读取数据 - n, err := conn.Read(buffer) - if err != nil { - fmt.Println("Error reading from TCP connection:", err) - continue - } - - callback(buffer, n) + // 处理连接 + callback(&conn) // 发送响应 if _, err = conn.Write([]byte("tcp>")); err != nil { diff --git a/src/framework/socket/udp_server.go b/src/framework/socket/udp_server.go index d54b957a..33a45a60 100644 --- a/src/framework/socket/udp_server.go +++ b/src/framework/socket/udp_server.go @@ -50,29 +50,20 @@ func (s *SocketUDP) Close() { } // Resolve 处理消息 -func (s *SocketUDP) Resolve(bufferSize int, callback func([]byte, int)) error { +func (s *SocketUDP) Resolve(callback func(*net.UDPConn)) error { if s.Conn == nil { return fmt.Errorf("udp service not created") } - buffer := make([]byte, bufferSize) - for { select { case <-s.StopChan: return fmt.Errorf("udp service stop") default: - // 读取数据 - n, addr, err := s.Conn.ReadFromUDP(buffer) - if err != nil { - fmt.Println("Error reading from UDP connection:", err) - continue - } - - callback(buffer, n) + callback(s.Conn) // 发送响应 - if _, err = s.Conn.WriteToUDP([]byte("udp>"), addr); err != nil { + if _, err := s.Conn.WriteTo([]byte("udp>"), s.Conn.RemoteAddr()); err != nil { fmt.Println("Error sending response:", err) } } diff --git a/src/modules/trace/service/trace_task.go b/src/modules/trace/service/trace_task.go index e9891b28..071f8b28 100644 --- a/src/modules/trace/service/trace_task.go +++ b/src/modules/trace/service/trace_task.go @@ -1,8 +1,10 @@ package service import ( + "encoding/base64" "encoding/json" "fmt" + "net" "strings" "be.ems/src/framework/config" @@ -14,6 +16,7 @@ import ( neService "be.ems/src/modules/network_element/service" "be.ems/src/modules/trace/model" "be.ems/src/modules/trace/repository" + wsService "be.ems/src/modules/ws/service" ) // 实例化数据层 TraceTask 结构体 @@ -57,16 +60,27 @@ func (r *TraceTask) CreateUDP() error { } // 接收处理UDP数据 - go r.udpService.Resolve(2048, func(data []byte, n int) { - logger.Infof("socket UDP: %s", string(data)) - mData, err := UDPDataHandler(data, n) + go r.udpService.Resolve(func(conn *net.UDPConn) { + // 读取数据 + buf := make([]byte, 2048) + n, err := conn.Read(buf) + if err != nil { + logger.Errorf("error reading from UDP connection: %s", err.Error()) + return + } + + logger.Infof("socket UDP: %s", string(buf[:n])) + // logger.Infof("socket UDP Base64: %s", base64.StdEncoding.EncodeToString(buf[:n])) + mData, err := UDPDataHandler(buf, n) if err != nil { logger.Errorf("udp resolve data fail: %s", err.Error()) return } + taskId := parse.Number(mData["taskId"]) + // 插入数据库做记录 r.traceDataRepository.Insert(model.TraceData{ - TaskId: parse.Number(mData["taskId"]), + TaskId: taskId, IMSI: mData["imsi"].(string), SrcAddr: mData["srcAddr"].(string), DstAddr: mData["dstAddr"].(string), @@ -82,6 +96,7 @@ func (r *TraceTask) CreateUDP() error { // 推送文件 if v, ok := mData["pcapFile"]; ok && v != "" { logger.Infof("pcapFile: %s", v) + wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId) } }) @@ -92,16 +107,29 @@ func (r *TraceTask) CreateUDP() error { return err } // 接收处理TCP数据 - go tcpService.Resolve(1024, func(data []byte, n int) { - logger.Infof("socket TCP: %s", string(data)) - mData, err := UDPDataHandler(data, n) + go tcpService.Resolve(func(conn *net.Conn) { + c := (*conn) + // 读取数据 + buf := make([]byte, 2048) + n, err := c.Read(buf) + if err != nil { + logger.Errorf("error reading from TCP connection: %s", err.Error()) + return + } + + logger.Infof("socket TCP: %s", string(buf[:n])) + deData, _ := base64.StdEncoding.DecodeString(string(buf[:n])) + logger.Infof("socket TCP Base64: %s", deData) + mData, err := UDPDataHandler(deData, len(deData)) if err != nil { logger.Errorf("tcp resolve data fail: %s", err.Error()) return } + taskId := parse.Number(mData["taskId"]) + // 插入数据库做记录 r.traceDataRepository.Insert(model.TraceData{ - TaskId: parse.Number(mData["taskId"]), + TaskId: taskId, IMSI: mData["imsi"].(string), SrcAddr: mData["srcAddr"].(string), DstAddr: mData["dstAddr"].(string), @@ -117,6 +145,7 @@ func (r *TraceTask) CreateUDP() error { // 推送文件 if v, ok := mData["pcapFile"]; ok && v != "" { logger.Infof("pcapFile: %s", v) + wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId) } }) return nil diff --git a/src/modules/trace/service/trace_task_udp_data.go b/src/modules/trace/service/trace_task_udp_data.go index 057ddf41..2f007f38 100644 --- a/src/modules/trace/service/trace_task_udp_data.go +++ b/src/modules/trace/service/trace_task_udp_data.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "os" + "path/filepath" "runtime" "strings" "time" @@ -264,6 +265,9 @@ const versionMinor = 4 func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) error { var err error var file *os.File + if err := os.MkdirAll(filepath.Dir(filename), 0775); err != nil { + return err + } if _, err = os.Stat(filename); os.IsNotExist(err) { file, err = os.Create(filename) // File Header @@ -318,7 +322,7 @@ func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) e // writePcap 写Pcap文件并返回文件路径 func writePcap(extHdr ExtHeader) string { - filePath := fmt.Sprintf("/tmp/trace_%d .pcap", extHdr.TaskId) + filePath := fmt.Sprintf("/tmp/omc/trace/task_%d.pcap", extHdr.TaskId) if runtime.GOOS == "windows" { filePath = fmt.Sprintf("C:%s", filePath) } diff --git a/src/modules/ws/service/ws_send.impl.go b/src/modules/ws/service/ws_send.impl.go index ee440800..969b2688 100644 --- a/src/modules/ws/service/ws_send.impl.go +++ b/src/modules/ws/service/ws_send.impl.go @@ -12,6 +12,8 @@ import ( const ( // 组号-其他 GROUP_OTHER = "0" + // 组号-跟踪任务数据变更 2_traceId + GROUP_TRACE = "2_" // 组号-指标通用 10_neType_neId GROUP_KPI = "10_" // 组号-指标UPF 12_neId