perf: 优化socket消息接收,跟踪任务推送ws

This commit is contained in:
TsMask
2024-09-23 17:33:42 +08:00
parent b7db915859
commit d69a7c9e6f
5 changed files with 68 additions and 43 deletions

View File

@@ -10,10 +10,10 @@ import (
// SocketTCP TCP服务端 // SocketTCP TCP服务端
type SocketTCP struct { type SocketTCP struct {
Addr string `json:"addr"` // 主机地址 Addr string `json:"addr"` // 主机地址
Port int64 `json:"port"` // 端口 Port int64 `json:"port"` // 端口
Listen *net.Listener `json:"listen"` Listener *net.TCPListener `json:"listener"`
StopChan chan struct{} `json:"stop"` // 停止信号 StopChan chan struct{} `json:"stop"` // 停止信号
} }
// New 创建TCP服务端 // New 创建TCP服务端
@@ -26,53 +26,52 @@ func (s *SocketTCP) New() (*SocketTCP, error) {
} }
address := fmt.Sprintf("%s:%d", s.Addr, s.Port) address := fmt.Sprintf("%s:%d", s.Addr, s.Port)
ln, err := net.Listen(proto, address) // 解析 TCP 地址
tcpAddr, err := net.ResolveTCPAddr(proto, address)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.Listen = &ln // 监听 TCP 地址
listener, err := net.ListenTCP(proto, tcpAddr)
if err != nil {
return nil, err
}
s.Listener = listener
s.StopChan = make(chan struct{}, 1) s.StopChan = make(chan struct{}, 1)
return s, nil return s, nil
} }
// Close 关闭当前TCP服务端 // Close 关闭当前TCP服务端
func (s *SocketTCP) Close() { func (s *SocketTCP) Close() {
if s.Listen != nil { if s.Listener != nil {
s.StopChan <- struct{}{} s.StopChan <- struct{}{}
(*s.Listen).Close() (*s.Listener).Close()
} }
} }
// Resolve 处理消息 // Resolve 处理消息
func (s *SocketTCP) Resolve(bufferSize int, callback func([]byte, int)) error { func (s *SocketTCP) Resolve(callback func(conn *net.Conn)) error {
if s.Listen == nil { if s.Listener == nil {
return fmt.Errorf("tcp service not created") return fmt.Errorf("tcp service not created")
} }
listener := *s.Listener
ln := *s.Listen
buffer := make([]byte, bufferSize)
for { for {
select { select {
case <-s.StopChan: case <-s.StopChan:
return fmt.Errorf("udp service stop") return fmt.Errorf("udp service stop")
default: default:
conn, err := ln.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
logger.Errorf("Error accepting connection: %v ", err) logger.Errorf("Error accepting connection: %v ", err)
continue continue
} }
defer conn.Close() defer conn.Close()
// 读取数据 // 处理连接
n, err := conn.Read(buffer) callback(&conn)
if err != nil {
fmt.Println("Error reading from TCP connection:", err)
continue
}
callback(buffer, n)
// 发送响应 // 发送响应
if _, err = conn.Write([]byte("tcp>")); err != nil { if _, err = conn.Write([]byte("tcp>")); err != nil {

View File

@@ -50,29 +50,20 @@ func (s *SocketUDP) Close() {
} }
// Resolve 处理消息 // Resolve 处理消息
func (s *SocketUDP) Resolve(bufferSize int, callback func([]byte, int)) error { func (s *SocketUDP) Resolve(callback func(*net.UDPConn)) error {
if s.Conn == nil { if s.Conn == nil {
return fmt.Errorf("udp service not created") return fmt.Errorf("udp service not created")
} }
buffer := make([]byte, bufferSize)
for { for {
select { select {
case <-s.StopChan: case <-s.StopChan:
return fmt.Errorf("udp service stop") return fmt.Errorf("udp service stop")
default: default:
// 读取数据 callback(s.Conn)
n, addr, err := s.Conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading from UDP connection:", err)
continue
}
callback(buffer, n)
// 发送响应 // 发送响应
if _, err = s.Conn.WriteToUDP([]byte("udp>"), addr); err != nil { if _, err := s.Conn.WriteTo([]byte("udp>"), s.Conn.RemoteAddr()); err != nil {
fmt.Println("Error sending response:", err) fmt.Println("Error sending response:", err)
} }
} }

View File

@@ -1,8 +1,10 @@
package service package service
import ( import (
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"strings" "strings"
"be.ems/src/framework/config" "be.ems/src/framework/config"
@@ -14,6 +16,7 @@ import (
neService "be.ems/src/modules/network_element/service" neService "be.ems/src/modules/network_element/service"
"be.ems/src/modules/trace/model" "be.ems/src/modules/trace/model"
"be.ems/src/modules/trace/repository" "be.ems/src/modules/trace/repository"
wsService "be.ems/src/modules/ws/service"
) )
// 实例化数据层 TraceTask 结构体 // 实例化数据层 TraceTask 结构体
@@ -57,16 +60,27 @@ func (r *TraceTask) CreateUDP() error {
} }
// 接收处理UDP数据 // 接收处理UDP数据
go r.udpService.Resolve(2048, func(data []byte, n int) { go r.udpService.Resolve(func(conn *net.UDPConn) {
logger.Infof("socket UDP: %s", string(data)) // 读取数据
mData, err := UDPDataHandler(data, n) buf := make([]byte, 2048)
n, err := conn.Read(buf)
if err != nil {
logger.Errorf("error reading from UDP connection: %s", err.Error())
return
}
logger.Infof("socket UDP: %s", string(buf[:n]))
// logger.Infof("socket UDP Base64: %s", base64.StdEncoding.EncodeToString(buf[:n]))
mData, err := UDPDataHandler(buf, n)
if err != nil { if err != nil {
logger.Errorf("udp resolve data fail: %s", err.Error()) logger.Errorf("udp resolve data fail: %s", err.Error())
return return
} }
taskId := parse.Number(mData["taskId"])
// 插入数据库做记录 // 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{ r.traceDataRepository.Insert(model.TraceData{
TaskId: parse.Number(mData["taskId"]), TaskId: taskId,
IMSI: mData["imsi"].(string), IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string), SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string), DstAddr: mData["dstAddr"].(string),
@@ -82,6 +96,7 @@ func (r *TraceTask) CreateUDP() error {
// 推送文件 // 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" { if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v) logger.Infof("pcapFile: %s", v)
wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
} }
}) })
@@ -92,16 +107,29 @@ func (r *TraceTask) CreateUDP() error {
return err return err
} }
// 接收处理TCP数据 // 接收处理TCP数据
go tcpService.Resolve(1024, func(data []byte, n int) { go tcpService.Resolve(func(conn *net.Conn) {
logger.Infof("socket TCP: %s", string(data)) c := (*conn)
mData, err := UDPDataHandler(data, n) // 读取数据
buf := make([]byte, 2048)
n, err := c.Read(buf)
if err != nil {
logger.Errorf("error reading from TCP connection: %s", err.Error())
return
}
logger.Infof("socket TCP: %s", string(buf[:n]))
deData, _ := base64.StdEncoding.DecodeString(string(buf[:n]))
logger.Infof("socket TCP Base64: %s", deData)
mData, err := UDPDataHandler(deData, len(deData))
if err != nil { if err != nil {
logger.Errorf("tcp resolve data fail: %s", err.Error()) logger.Errorf("tcp resolve data fail: %s", err.Error())
return return
} }
taskId := parse.Number(mData["taskId"])
// 插入数据库做记录 // 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{ r.traceDataRepository.Insert(model.TraceData{
TaskId: parse.Number(mData["taskId"]), TaskId: taskId,
IMSI: mData["imsi"].(string), IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string), SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string), DstAddr: mData["dstAddr"].(string),
@@ -117,6 +145,7 @@ func (r *TraceTask) CreateUDP() error {
// 推送文件 // 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" { if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v) logger.Infof("pcapFile: %s", v)
wsService.NewWSSendImpl.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
} }
}) })
return nil return nil

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"os" "os"
"path/filepath"
"runtime" "runtime"
"strings" "strings"
"time" "time"
@@ -264,6 +265,9 @@ const versionMinor = 4
func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) error { func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) error {
var err error var err error
var file *os.File var file *os.File
if err := os.MkdirAll(filepath.Dir(filename), 0775); err != nil {
return err
}
if _, err = os.Stat(filename); os.IsNotExist(err) { if _, err = os.Stat(filename); os.IsNotExist(err) {
file, err = os.Create(filename) file, err = os.Create(filename)
// File Header // File Header
@@ -318,7 +322,7 @@ func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) e
// writePcap 写Pcap文件并返回文件路径 // writePcap 写Pcap文件并返回文件路径
func writePcap(extHdr ExtHeader) string { func writePcap(extHdr ExtHeader) string {
filePath := fmt.Sprintf("/tmp/trace_%d .pcap", extHdr.TaskId) filePath := fmt.Sprintf("/tmp/omc/trace/task_%d.pcap", extHdr.TaskId)
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath) filePath = fmt.Sprintf("C:%s", filePath)
} }

View File

@@ -12,6 +12,8 @@ import (
const ( const (
// 组号-其他 // 组号-其他
GROUP_OTHER = "0" GROUP_OTHER = "0"
// 组号-跟踪任务数据变更 2_traceId
GROUP_TRACE = "2_"
// 组号-指标通用 10_neType_neId // 组号-指标通用 10_neType_neId
GROUP_KPI = "10_" GROUP_KPI = "10_"
// 组号-指标UPF 12_neId // 组号-指标UPF 12_neId