From 150551accea053c8fa5a8d05be59e87d1044daa2 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Tue, 15 Jul 2025 15:16:44 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app.go | 6 + src/framework/telnet/telnet.go | 25 +++- src/framework/utils/date/date.go | 4 +- src/framework/utils/expr/expr.go | 44 +++++++ .../backup_export_table.go | 75 +++++------ src/modules/monitor/service/monitor_test.go | 119 +++++++++--------- src/modules/monitor/service/sys_job.go | 5 +- .../network_element/service/ne_license.go | 5 +- .../network_element/service/ne_version.go | 7 +- src/modules/trace/service/trace_task.go | 24 ++-- src/modules/ws/service/ws_receive.go | 14 +-- src/modules/ws/service/ws_send.go | 4 + 12 files changed, 202 insertions(+), 130 deletions(-) create mode 100644 src/framework/utils/expr/expr.go diff --git a/src/app.go b/src/app.go index e7168a73..da30bf86 100644 --- a/src/app.go +++ b/src/app.go @@ -15,6 +15,8 @@ import ( "be.ems/src/modules/monitor" networkdata "be.ems/src/modules/network_data" networkelement "be.ems/src/modules/network_element" + "be.ems/src/modules/notification" + "be.ems/src/modules/oam" "be.ems/src/modules/oauth2" "be.ems/src/modules/system" "be.ems/src/modules/tool" @@ -84,6 +86,8 @@ func ModulesRoute(app *gin.Engine) { auth.Setup(app) // 开放客户端模块 oauth2.Setup(app) + // 网元OAM对接 + oam.Setup(app) // 通用模块 common.Setup(app) @@ -92,6 +96,8 @@ func ModulesRoute(app *gin.Engine) { // 网元数据模块 networkdata.Setup(app) + // 通知模块 + notification.Setup(app) // 跟踪模块 trace.Setup(app) // 图表模块 diff --git a/src/framework/telnet/telnet.go b/src/framework/telnet/telnet.go index c72a46bd..478c301a 100644 --- a/src/framework/telnet/telnet.go +++ b/src/framework/telnet/telnet.go @@ -42,10 +42,14 @@ func (c *ConnTelnet) NewClient() (*ConnTelnet, error) { } // 进行登录 - time.Sleep(100 * time.Millisecond) - client.Write([]byte(c.User + "\r\n")) - time.Sleep(100 * time.Millisecond) - client.Write([]byte(c.Password + "\r\n")) + if c.User != "" { + time.Sleep(100 * time.Millisecond) + client.Write([]byte(c.User + "\r\n")) + } + if c.Password != "" { + time.Sleep(100 * time.Millisecond) + client.Write([]byte(c.Password + "\r\n")) + } // fmt.Fprintln(client, c.User) // fmt.Fprintln(client, c.Password) @@ -103,6 +107,19 @@ func (c *ConnTelnet) RunCMD(cmd string) (string, error) { return c.LastResult, nil } +// WindowChange informs the remote host about a terminal window dimension change to h rows and w columns. +func (s *ConnTelnet) WindowChange(h, w int) error { + if s.Client == nil { + return fmt.Errorf("client is nil to content write failed") + } + conn := *s.Client + + // 需要确保接收方理解并正确处理发送窗口大小设置命令 + conn.Write([]byte{255, 251, 31}) + conn.Write([]byte{255, 250, 31, byte(w >> 8), byte(w & 0xFF), byte(h >> 8), byte(h & 0xFF), 255, 240}) + return nil +} + // NewClient 创建Telnet客户端会话对象 func (c *ConnTelnet) NewClientSession(cols, rows int) (*TelnetClientSession, error) { if c.Client == nil { diff --git a/src/framework/utils/date/date.go b/src/framework/utils/date/date.go index a1006bbe..7e9d3abd 100644 --- a/src/framework/utils/date/date.go +++ b/src/framework/utils/date/date.go @@ -54,9 +54,9 @@ func ParseDateToStr(date any, formatStr string) string { if v == 0 { return "" } - if v > 9999999999 { + if v > 1e12 { t = time.UnixMilli(v) - } else if v > 999999999 { + } else if v > 1e9 { t = time.Unix(v, 0) } else { logger.Infof("utils ParseDateToStr err %v", "Invalid timestamp") diff --git a/src/framework/utils/expr/expr.go b/src/framework/utils/expr/expr.go new file mode 100644 index 00000000..cd7ba420 --- /dev/null +++ b/src/framework/utils/expr/expr.go @@ -0,0 +1,44 @@ +package expr + +import ( + "fmt" + "regexp" + "strings" + + "github.com/expr-lang/expr" +) + +// Eval 计算表达式返回结果 +func Eval(exprStr string, env map[string]any) (any, error) { + return expr.Eval(exprStr, env) +} + +// ParseExprEnv 解析表达式环境变量 +// 比如 "('SMF.03'/'SMF.04')*100" +// 变量传入"SMF.03": 3 +func ParseExprEnv(exprStr string, env map[string]any) (string, map[string]any) { + // 使用正则表达式匹配带单引号的变量名 + re := regexp.MustCompile(`'([^']+)'`) + tempEnv := make(map[string]any) + tempExpr := exprStr + varCount := 0 + matches := re.FindAllStringSubmatch(exprStr, -1) + for _, match := range matches { + paramName := match[1] + tempVarName := fmt.Sprintf("var%d", varCount) + tempEnv[tempVarName] = env[paramName] + tempExpr = strings.Replace(tempExpr, match[0], tempVarName, 1) + varCount++ + } + + // 合并临时环境变量和原环境变量 + combinedEnv := make(map[string]any) + for k, v := range env { + combinedEnv[k] = v + } + for k, v := range tempEnv { + combinedEnv[k] = v + } + + return tempExpr, combinedEnv +} diff --git a/src/modules/crontask/processor/backup_export_table/backup_export_table.go b/src/modules/crontask/processor/backup_export_table/backup_export_table.go index 554eb4dd..d540038f 100644 --- a/src/modules/crontask/processor/backup_export_table/backup_export_table.go +++ b/src/modules/crontask/processor/backup_export_table/backup_export_table.go @@ -15,21 +15,22 @@ import ( "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/file" "be.ems/src/framework/utils/parse" - neDataModel "be.ems/src/modules/network_data/model" neDataService "be.ems/src/modules/network_data/service" systemModel "be.ems/src/modules/system/model" systemService "be.ems/src/modules/system/service" ) var NewProcessor = &BackupExportTableProcessor{ - backupService: neDataService.NewBackup, - count: 0, + backupService: neDataService.NewBackup, + cdrEventService: neDataService.NewCDREvent, + count: 0, } // BackupExportTable 备份导出数据表 type BackupExportTableProcessor struct { - backupService *neDataService.Backup // 备份相关服务 - count int // 执行次数 + backupService *neDataService.Backup // 备份相关服务 + cdrEventService *neDataService.CDREvent // CDR会话事件服务 + count int // 执行次数 } func (s *BackupExportTableProcessor) Execute(data any) (any, error) { @@ -197,13 +198,13 @@ func (s BackupExportTableProcessor) exportSMF(hour int, columns []string, filePa start := end.Add(-time.Duration(hour) * time.Hour) // 查询数据 - rows := []neDataModel.CDREventSMF{} - tx := db.DB("").Model(&neDataModel.CDREventSMF{}) - tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) - if err := tx.Find(&rows).Error; err != nil { - return 0, err - } - if len(rows) <= 0 { + rows, total := s.cdrEventService.FindByPage("SMF", map[string]string{ + "sortField": "created_at", + "sortOrder": "asc", + "beginTime": fmt.Sprint(start.UnixMilli()), + "endTime": fmt.Sprint(end.UnixMilli()), + }) + if total <= 0 { return 0, nil } @@ -417,7 +418,7 @@ func (s BackupExportTableProcessor) exportSMF(hour int, columns []string, filePa err := file.WriterFileCSV(data, filePath) - return tx.RowsAffected, err + return total, err } // exportIMS 导出csv @@ -428,13 +429,13 @@ func (s BackupExportTableProcessor) exportIMS(hour int, columns []string, filePa start := end.Add(-time.Duration(hour) * time.Hour) // 查询数据 - rows := []neDataModel.CDREventIMS{} - tx := db.DB("").Model(&neDataModel.CDREventIMS{}) - tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) - if err := tx.Find(&rows).Error; err != nil { - return 0, err - } - if len(rows) <= 0 { + rows, total := s.cdrEventService.FindByPage("IMS", map[string]string{ + "sortField": "created_at", + "sortOrder": "asc", + "beginTime": fmt.Sprint(start.UnixMilli()), + "endTime": fmt.Sprint(end.UnixMilli()), + }) + if total <= 0 { return 0, nil } @@ -561,7 +562,7 @@ func (s BackupExportTableProcessor) exportIMS(hour int, columns []string, filePa err := file.WriterFileCSV(data, filePath) - return tx.RowsAffected, err + return total, err } // exportSMSC 导出csv @@ -572,13 +573,13 @@ func (s BackupExportTableProcessor) exportSMSC(hour int, columns []string, fileP start := end.Add(-time.Duration(hour) * time.Hour) // 查询数据 - rows := []neDataModel.CDREventSMSC{} - tx := db.DB("").Model(&neDataModel.CDREventSMSC{}) - tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) - if err := tx.Find(&rows).Error; err != nil { - return 0, err - } - if len(rows) <= 0 { + rows, total := s.cdrEventService.FindByPage("SMS", map[string]string{ + "sortField": "created_at", + "sortOrder": "asc", + "beginTime": fmt.Sprint(start.UnixMilli()), + "endTime": fmt.Sprint(end.UnixMilli()), + }) + if total <= 0 { return 0, nil } @@ -686,7 +687,7 @@ func (s BackupExportTableProcessor) exportSMSC(hour int, columns []string, fileP err := file.WriterFileCSV(data, filePath) - return tx.RowsAffected, err + return total, err } // exportSGWC 导出csv @@ -697,13 +698,13 @@ func (s BackupExportTableProcessor) exportSGWC(hour int, columns []string, fileP start := end.Add(-time.Duration(hour) * time.Hour) // 查询数据 - rows := []neDataModel.CDREventSMSC{} - tx := db.DB("").Model(&neDataModel.CDREventSMSC{}) - tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) - if err := tx.Find(&rows).Error; err != nil { - return 0, err - } - if len(rows) <= 0 { + rows, total := s.cdrEventService.FindByPage("SGWC", map[string]string{ + "sortField": "created_at", + "sortOrder": "asc", + "beginTime": fmt.Sprint(start.UnixMilli()), + "endTime": fmt.Sprint(end.UnixMilli()), + }) + if total <= 0 { return 0, nil } @@ -923,5 +924,5 @@ func (s BackupExportTableProcessor) exportSGWC(hour int, columns []string, fileP err := file.WriterFileCSV(data, filePath) - return tx.RowsAffected, err + return total, err } diff --git a/src/modules/monitor/service/monitor_test.go b/src/modules/monitor/service/monitor_test.go index 44dd2a72..cd64a3d8 100644 --- a/src/modules/monitor/service/monitor_test.go +++ b/src/modules/monitor/service/monitor_test.go @@ -2,6 +2,7 @@ package service import ( "fmt" + "sync" "testing" "time" @@ -14,7 +15,7 @@ import ( func TestInfo(t *testing.T) { s := MonitorInfo{} - s.load(0.5) // 0.5 半分钟 + s.Load(5 * time.Second) // 0.5 半分钟 fmt.Println(s) select {} @@ -28,34 +29,22 @@ type MonitorInfo struct { } // load 执行资源获取 -func (m *MonitorInfo) load(interval float64) { - var itemBase MonitorBase - itemBase.CreateTime = time.Now().UnixMilli() - - loadInfo, _ := load.Avg() - itemBase.CPULoad1 = loadInfo.Load1 - itemBase.CPULoad5 = loadInfo.Load5 - itemBase.CPULoad15 = loadInfo.Load15 - - totalPercent, _ := cpu.Percent(3*time.Second, false) - if len(totalPercent) > 0 { - itemBase.CPU = totalPercent[0] - } - cpuCount, _ := cpu.Counts(false) - cpuAvg := (float64(cpuCount*2) * 0.75) * 100 - itemBase.LoadUsage = 0 - if cpuAvg > 0 { - itemBase.LoadUsage = loadInfo.Load1 / cpuAvg - } - - memoryInfo, _ := mem.VirtualMemory() - itemBase.Memory = memoryInfo.UsedPercent - - m.MonitorBase = itemBase - - // 求平均 - m.MonitorIO = loadDiskIO(interval) - m.MonitorNetwork = loadNetIO(interval) +func (m *MonitorInfo) Load(duration time.Duration) { + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + m.MonitorBase = loadCPUMem(duration) + }() + go func() { + defer wg.Done() + m.MonitorIO = loadDiskIO(duration) + }() + go func() { + defer wg.Done() + m.MonitorNetwork = loadNetIO(duration) + }() + wg.Wait() } // MonitorBase 监控_基本信息 monitor_base @@ -73,25 +62,49 @@ type MonitorBase struct { type MonitorIO struct { CreateTime int64 `json:"createTime"` // 创建时间 Name string `json:"name"` // 磁盘名 - Read int64 `json:"read"` // 读取K - Write int64 `json:"write"` // 写入K - Count int64 `json:"count"` // 次数 - Time int64 `json:"time"` // 耗时 + Read uint64 `json:"read"` // 读取 Bytes + Write uint64 `json:"write"` // 写入 Bytes + Count uint64 `json:"count"` // 次数 + Time uint64 `json:"time"` // 耗时 } // MonitorNetwork 监控_网络IO monitor_network type MonitorNetwork struct { - CreateTime int64 `json:"createTime"` // 创建时间 - Name string `json:"name"` // 网卡名 - Up float64 `json:"up"` // 上行 - Down float64 `json:"down"` // 下行 + CreateTime int64 `json:"createTime"` // 创建时间 + Name string `json:"name"` // 网卡名 + Up uint64 `json:"up"` // 上行 bytes + Down uint64 `json:"down"` // 下行 bytes +} + +// loadCPUMem CPU内存使用率,interval表示采集的平均值(分钟) +func loadCPUMem(duration time.Duration) MonitorBase { + var itemBase MonitorBase + itemBase.CreateTime = time.Now().UnixMilli() + + loadInfo, _ := load.Avg() + itemBase.CPULoad1 = loadInfo.Load1 + itemBase.CPULoad5 = loadInfo.Load5 + itemBase.CPULoad15 = loadInfo.Load15 + totalPercent, _ := cpu.Percent(duration, false) + if len(totalPercent) > 0 { + itemBase.CPU = totalPercent[0] + } + if cpuCount, _ := cpu.Counts(false); cpuCount > 0 { + itemBase.LoadUsage = loadInfo.Load1 / float64(cpuCount) + } else { + itemBase.LoadUsage = 0 + } + + memoryInfo, _ := mem.VirtualMemory() + itemBase.Memory = memoryInfo.UsedPercent + return itemBase } // loadDiskIO 磁盘读写,interval表示采集的平均值(分钟) -func loadDiskIO(interval float64) []MonitorIO { +func loadDiskIO(duration time.Duration) []MonitorIO { ioStat, _ := disk.IOCounters() - time.Sleep(time.Duration(interval) * time.Minute) + time.Sleep(duration) ioStat2, _ := disk.IOCounters() var ioList []MonitorIO @@ -104,32 +117,24 @@ func loadDiskIO(interval float64) []MonitorIO { itemIO.Name = io1.Name if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes { - itemIO.Read = int64(float64(io2.ReadBytes-io1.ReadBytes) / interval / 60) + itemIO.Read = io2.ReadBytes - io1.ReadBytes } if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes { - itemIO.Write = int64(float64(io2.WriteBytes-io1.WriteBytes) / interval / 60) + itemIO.Write = io2.WriteBytes - io1.WriteBytes } if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount { - itemIO.Count = int64(float64(io2.ReadCount-io1.ReadCount) / interval / 60) + itemIO.Count = io2.ReadCount - io1.ReadCount } - writeCount := int64(0) if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount { - writeCount = int64(float64(io2.WriteCount-io1.WriteCount) / interval * 60) - } - if writeCount > itemIO.Count { - itemIO.Count = writeCount + itemIO.Count += io2.WriteCount - io1.WriteCount } if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime { - itemIO.Time = int64(float64(io2.ReadTime-io1.ReadTime) / interval / 60) + itemIO.Time = io2.ReadTime - io1.ReadTime } - writeTime := int64(0) if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime { - writeTime = int64(float64(io2.WriteTime-io1.WriteTime) / interval / 60) - } - if writeTime > itemIO.Time { - itemIO.Time = writeTime + itemIO.Time += io2.WriteTime - io1.WriteTime } ioList = append(ioList, itemIO) break @@ -140,7 +145,7 @@ func loadDiskIO(interval float64) []MonitorIO { } // loadNetIO 网络接口(包括虚拟接口),interval表示采集的平均值(分钟) -func loadNetIO(interval float64) []MonitorNetwork { +func loadNetIO(duration time.Duration) []MonitorNetwork { // 获取当前时刻 netStat, _ := net.IOCounters(true) netStatAll, _ := net.IOCounters(false) @@ -148,7 +153,7 @@ func loadNetIO(interval float64) []MonitorNetwork { netStatList = append(netStatList, netStat...) netStatList = append(netStatList, netStatAll...) - time.Sleep(time.Duration(interval) * time.Minute) + time.Sleep(duration) // 获取结束时刻 netStat2, _ := net.IOCounters(true) @@ -168,10 +173,10 @@ func loadNetIO(interval float64) []MonitorNetwork { // 如果结束时刻发送字节数和当前时刻发送字节数都不为零,并且结束时刻发送字节数大于当前时刻发送字节数 if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent { - itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / interval / 60 + itemNet.Up = net2.BytesSent - net1.BytesSent } if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv { - itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / interval / 60 + itemNet.Down = net2.BytesRecv - net1.BytesRecv } netList = append(netList, itemNet) break diff --git a/src/modules/monitor/service/sys_job.go b/src/modules/monitor/service/sys_job.go index 3ddb6a1d..7f1cb62b 100644 --- a/src/modules/monitor/service/sys_job.go +++ b/src/modules/monitor/service/sys_job.go @@ -199,9 +199,10 @@ func (s SysJob) ExportData(rows []model.SysJob, fileName string) (string, error) } } misfirePolicy := "放弃执行" - if row.MisfirePolicy == "1" { + switch row.MisfirePolicy { + case "1": misfirePolicy = "立即执行" - } else if row.MisfirePolicy == "2" { + case "2": misfirePolicy = "执行一次" } concurrent := "禁止" diff --git a/src/modules/network_element/service/ne_license.go b/src/modules/network_element/service/ne_license.go index 59db136f..4daea13b 100644 --- a/src/modules/network_element/service/ne_license.go +++ b/src/modules/network_element/service/ne_license.go @@ -185,9 +185,10 @@ func (r *NeLicense) UploadLicense(neLicense model.NeLicense) error { // 重启服务 if neLicense.Reload { cmdStr := fmt.Sprintf("sudo systemctl restart %s", neTypeLower) - if neTypeLower == "ims" { + switch neTypeLower { + case "ims": cmdStr = "ims-stop || true && ims-start" - } else if neTypeLower == "omc" { + case "omc": cmdStr = "sudo systemctl restart omc" } sshClient.RunCMD(cmdStr) diff --git a/src/modules/network_element/service/ne_version.go b/src/modules/network_element/service/ne_version.go index 793e6663..a8662650 100644 --- a/src/modules/network_element/service/ne_version.go +++ b/src/modules/network_element/service/ne_version.go @@ -245,7 +245,8 @@ func (r NeVersion) operateCommand(action, neType string, neFilePaths []string) ( // 组合命令输入 cmdStrArr := []string{} - if neType == "OMC" { + switch neType { + case "OMC": omcStrArr := []string{} if action == "install" { // 安装软件包 @@ -270,7 +271,7 @@ func (r NeVersion) operateCommand(action, neType string, neFilePaths []string) ( cmdStrArr = append(cmdStrArr, fmt.Sprintf("nohup sh -c \"sleep 2s && %s\" > /tmp/operate_run_%s_omc.out 2>&1 & \n", strings.Join(omcStrArr, " && "), action)) cmdStrArr = append(cmdStrArr, fmt.Sprintf("echo '%s' \n", okFlagStr)) return okFlagStr, cmdStrArr, nil - } else if neType == "IMS" { + case "IMS": if action == "install" { para5GData := r.neInfoService.Para5GData cmdStrArr = append(cmdStrArr, pkgCmdStr+" \n") @@ -312,7 +313,7 @@ func (r NeVersion) operateCommand(action, neType string, neFilePaths []string) ( cmdStrArr = append(cmdStrArr, pkgCmdStr+" \n") cmdStrArr = append(cmdStrArr, "ims-start \n") } - } else { + default: if action == "install" { para5GData := r.neInfoService.Para5GData cmdStrArr = append(cmdStrArr, pkgCmdStr+" \n") diff --git a/src/modules/trace/service/trace_task.go b/src/modules/trace/service/trace_task.go index 2085f282..be335142 100644 --- a/src/modules/trace/service/trace_task.go +++ b/src/modules/trace/service/trace_task.go @@ -6,9 +6,10 @@ import ( "net" "strings" + "github.com/tsmask/go-oam/src/framework/socket" + "be.ems/src/framework/config" "be.ems/src/framework/logger" - "be.ems/src/framework/socket" "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/parse" neFetchlink "be.ems/src/modules/network_element/fetch_link" @@ -21,16 +22,14 @@ import ( // 实例化数据层 TraceTask 结构体 var NewTraceTask = &TraceTask{ - udpService: socket.SocketUDP{}, - tcpService: socket.SocketTCP{}, traceTaskRepository: repository.NewTraceTask, traceDataRepository: repository.NewTraceData, } // TraceTask 跟踪任务 服务层处理 type TraceTask struct { - udpService socket.SocketUDP // UDP服务对象 - tcpService socket.SocketTCP // 测试用,后续调整TODO + udpService socket.ServerUDP // UDP服务对象 + tcpService socket.ServerTCP // 测试用,后续调整TODO traceTaskRepository *repository.TraceTask // 跟踪_任务数据信息 traceDataRepository *repository.TraceData // 跟踪_数据信息 } @@ -48,8 +47,8 @@ func (r *TraceTask) CreateUDP(reload bool) error { } // 初始化UDP服务 - r.udpService = socket.SocketUDP{Addr: host, Port: port} - if _, err := r.udpService.New(); err != nil { + r.udpService = socket.ServerUDP{Addr: host, Port: fmt.Sprint(port)} + if err := r.udpService.Listen(); err != nil { return err } @@ -87,21 +86,20 @@ func (r *TraceTask) CreateUDP(reload bool) error { // ============ 本地测试接收网元UDP发过来的数据 后续调整TODO if config.Env() == "local" { // 初始化TCP服务 - r.tcpService = socket.SocketTCP{Addr: host, Port: port + 1} - if _, err := r.tcpService.New(); err != nil { + r.tcpService = socket.ServerTCP{Addr: host, Port: fmt.Sprint(port + 1)} + if err := r.tcpService.Listen(); err != nil { return err } // 接收处理TCP数据 - go r.tcpService.Resolve(func(conn *net.Conn, err error) { + go r.tcpService.Resolve(func(conn net.Conn, err error) { if err != nil { logger.Errorf("TCP Resolve %s", err.Error()) return } - c := (*conn) // 读取数据 buf := make([]byte, 2048) - n, err := c.Read(buf) + n, err := conn.Read(buf) if err != nil { logger.Errorf("TCP Resolve Read Error: %s", err.Error()) return @@ -117,7 +115,7 @@ func (r *TraceTask) CreateUDP(reload bool) error { } // 发送响应 - if _, err = c.Write([]byte("tcp>")); err != nil { + if _, err = conn.Write([]byte("tcp>")); err != nil { logger.Errorf("TCP Resolve Write Error: %s", err.Error()) } buf = nil diff --git a/src/modules/ws/service/ws_receive.go b/src/modules/ws/service/ws_receive.go index 4a1720f1..ad224f35 100644 --- a/src/modules/ws/service/ws_receive.go +++ b/src/modules/ws/service/ws_receive.go @@ -63,16 +63,10 @@ func (s *WSReceive) Commont(client *model.WSClient, reqMsg model.WSRequest) { resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data) case "net": resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data) - case "ims_cdr": - resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data) - case "smf_cdr": - resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data) - case "smsc_cdr": - resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data) - case "amf_ue": - resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data) - case "mme_ue": - resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data) + case "ims_cdr", "smf_cdr", "smsc_cdr", "sgwc_cdr": + resByte, err = processor.GetCDRConnect(reqMsg.RequestID, reqMsg.Data) + case "amf_ue", "mme_ue": + resByte, err = processor.GetUEConnect(reqMsg.RequestID, reqMsg.Data) case "upf_tf": resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data) case "ne_state": diff --git a/src/modules/ws/service/ws_send.go b/src/modules/ws/service/ws_send.go index adcccaef..b419e1fa 100644 --- a/src/modules/ws/service/ws_send.go +++ b/src/modules/ws/service/ws_send.go @@ -34,6 +34,10 @@ const ( GROUP_AMF_UE = "1010" // 组号-MME_UE会话事件 1011_neId GROUP_MME_UE = "1011" + // 组号-AMF_NB状态事件 1014_neId + GROUP_AMF_NB = "1014" + // 组号-MME_NB状态事件 1015_neId + GROUP_MME_NB = "1015" // 组号-告警 2000_neType_neId GROUP_ALARM = "2000" // 组号-告警事件 2002_neType_neId