fix: 代码优化

This commit is contained in:
TsMask
2025-07-15 15:16:44 +08:00
parent 9f6a7f3bb7
commit 150551acce
12 changed files with 202 additions and 130 deletions

View File

@@ -15,21 +15,22 @@ import (
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/file"
"be.ems/src/framework/utils/parse"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
systemModel "be.ems/src/modules/system/model"
systemService "be.ems/src/modules/system/service"
)
var NewProcessor = &BackupExportTableProcessor{
backupService: neDataService.NewBackup,
count: 0,
backupService: neDataService.NewBackup,
cdrEventService: neDataService.NewCDREvent,
count: 0,
}
// BackupExportTable 备份导出数据表
type BackupExportTableProcessor struct {
backupService *neDataService.Backup // 备份相关服务
count int // 执行次数
backupService *neDataService.Backup // 备份相关服务
cdrEventService *neDataService.CDREvent // CDR会话事件服务
count int // 执行次数
}
func (s *BackupExportTableProcessor) Execute(data any) (any, error) {
@@ -197,13 +198,13 @@ func (s BackupExportTableProcessor) exportSMF(hour int, columns []string, filePa
start := end.Add(-time.Duration(hour) * time.Hour)
// 查询数据
rows := []neDataModel.CDREventSMF{}
tx := db.DB("").Model(&neDataModel.CDREventSMF{})
tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli())
if err := tx.Find(&rows).Error; err != nil {
return 0, err
}
if len(rows) <= 0 {
rows, total := s.cdrEventService.FindByPage("SMF", map[string]string{
"sortField": "created_at",
"sortOrder": "asc",
"beginTime": fmt.Sprint(start.UnixMilli()),
"endTime": fmt.Sprint(end.UnixMilli()),
})
if total <= 0 {
return 0, nil
}
@@ -417,7 +418,7 @@ func (s BackupExportTableProcessor) exportSMF(hour int, columns []string, filePa
err := file.WriterFileCSV(data, filePath)
return tx.RowsAffected, err
return total, err
}
// exportIMS 导出csv
@@ -428,13 +429,13 @@ func (s BackupExportTableProcessor) exportIMS(hour int, columns []string, filePa
start := end.Add(-time.Duration(hour) * time.Hour)
// 查询数据
rows := []neDataModel.CDREventIMS{}
tx := db.DB("").Model(&neDataModel.CDREventIMS{})
tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli())
if err := tx.Find(&rows).Error; err != nil {
return 0, err
}
if len(rows) <= 0 {
rows, total := s.cdrEventService.FindByPage("IMS", map[string]string{
"sortField": "created_at",
"sortOrder": "asc",
"beginTime": fmt.Sprint(start.UnixMilli()),
"endTime": fmt.Sprint(end.UnixMilli()),
})
if total <= 0 {
return 0, nil
}
@@ -561,7 +562,7 @@ func (s BackupExportTableProcessor) exportIMS(hour int, columns []string, filePa
err := file.WriterFileCSV(data, filePath)
return tx.RowsAffected, err
return total, err
}
// exportSMSC 导出csv
@@ -572,13 +573,13 @@ func (s BackupExportTableProcessor) exportSMSC(hour int, columns []string, fileP
start := end.Add(-time.Duration(hour) * time.Hour)
// 查询数据
rows := []neDataModel.CDREventSMSC{}
tx := db.DB("").Model(&neDataModel.CDREventSMSC{})
tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli())
if err := tx.Find(&rows).Error; err != nil {
return 0, err
}
if len(rows) <= 0 {
rows, total := s.cdrEventService.FindByPage("SMS", map[string]string{
"sortField": "created_at",
"sortOrder": "asc",
"beginTime": fmt.Sprint(start.UnixMilli()),
"endTime": fmt.Sprint(end.UnixMilli()),
})
if total <= 0 {
return 0, nil
}
@@ -686,7 +687,7 @@ func (s BackupExportTableProcessor) exportSMSC(hour int, columns []string, fileP
err := file.WriterFileCSV(data, filePath)
return tx.RowsAffected, err
return total, err
}
// exportSGWC 导出csv
@@ -697,13 +698,13 @@ func (s BackupExportTableProcessor) exportSGWC(hour int, columns []string, fileP
start := end.Add(-time.Duration(hour) * time.Hour)
// 查询数据
rows := []neDataModel.CDREventSMSC{}
tx := db.DB("").Model(&neDataModel.CDREventSMSC{})
tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli())
if err := tx.Find(&rows).Error; err != nil {
return 0, err
}
if len(rows) <= 0 {
rows, total := s.cdrEventService.FindByPage("SGWC", map[string]string{
"sortField": "created_at",
"sortOrder": "asc",
"beginTime": fmt.Sprint(start.UnixMilli()),
"endTime": fmt.Sprint(end.UnixMilli()),
})
if total <= 0 {
return 0, nil
}
@@ -923,5 +924,5 @@ func (s BackupExportTableProcessor) exportSGWC(hour int, columns []string, fileP
err := file.WriterFileCSV(data, filePath)
return tx.RowsAffected, err
return total, err
}

View File

@@ -2,6 +2,7 @@ package service
import (
"fmt"
"sync"
"testing"
"time"
@@ -14,7 +15,7 @@ import (
func TestInfo(t *testing.T) {
s := MonitorInfo{}
s.load(0.5) // 0.5 半分钟
s.Load(5 * time.Second) // 0.5 半分钟
fmt.Println(s)
select {}
@@ -28,34 +29,22 @@ type MonitorInfo struct {
}
// load 执行资源获取
func (m *MonitorInfo) load(interval float64) {
var itemBase MonitorBase
itemBase.CreateTime = time.Now().UnixMilli()
loadInfo, _ := load.Avg()
itemBase.CPULoad1 = loadInfo.Load1
itemBase.CPULoad5 = loadInfo.Load5
itemBase.CPULoad15 = loadInfo.Load15
totalPercent, _ := cpu.Percent(3*time.Second, false)
if len(totalPercent) > 0 {
itemBase.CPU = totalPercent[0]
}
cpuCount, _ := cpu.Counts(false)
cpuAvg := (float64(cpuCount*2) * 0.75) * 100
itemBase.LoadUsage = 0
if cpuAvg > 0 {
itemBase.LoadUsage = loadInfo.Load1 / cpuAvg
}
memoryInfo, _ := mem.VirtualMemory()
itemBase.Memory = memoryInfo.UsedPercent
m.MonitorBase = itemBase
// 求平均
m.MonitorIO = loadDiskIO(interval)
m.MonitorNetwork = loadNetIO(interval)
func (m *MonitorInfo) Load(duration time.Duration) {
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
m.MonitorBase = loadCPUMem(duration)
}()
go func() {
defer wg.Done()
m.MonitorIO = loadDiskIO(duration)
}()
go func() {
defer wg.Done()
m.MonitorNetwork = loadNetIO(duration)
}()
wg.Wait()
}
// MonitorBase 监控_基本信息 monitor_base
@@ -73,25 +62,49 @@ type MonitorBase struct {
type MonitorIO struct {
CreateTime int64 `json:"createTime"` // 创建时间
Name string `json:"name"` // 磁盘名
Read int64 `json:"read"` // 读取K
Write int64 `json:"write"` // 写入K
Count int64 `json:"count"` // 次数
Time int64 `json:"time"` // 耗时
Read uint64 `json:"read"` // 读取 Bytes
Write uint64 `json:"write"` // 写入 Bytes
Count uint64 `json:"count"` // 次数
Time uint64 `json:"time"` // 耗时
}
// MonitorNetwork 监控_网络IO monitor_network
type MonitorNetwork struct {
CreateTime int64 `json:"createTime"` // 创建时间
Name string `json:"name"` // 网卡名
Up float64 `json:"up"` // 上行
Down float64 `json:"down"` // 下行
CreateTime int64 `json:"createTime"` // 创建时间
Name string `json:"name"` // 网卡名
Up uint64 `json:"up"` // 上行 bytes
Down uint64 `json:"down"` // 下行 bytes
}
// loadCPUMem CPU内存使用率interval表示采集的平均值分钟
func loadCPUMem(duration time.Duration) MonitorBase {
var itemBase MonitorBase
itemBase.CreateTime = time.Now().UnixMilli()
loadInfo, _ := load.Avg()
itemBase.CPULoad1 = loadInfo.Load1
itemBase.CPULoad5 = loadInfo.Load5
itemBase.CPULoad15 = loadInfo.Load15
totalPercent, _ := cpu.Percent(duration, false)
if len(totalPercent) > 0 {
itemBase.CPU = totalPercent[0]
}
if cpuCount, _ := cpu.Counts(false); cpuCount > 0 {
itemBase.LoadUsage = loadInfo.Load1 / float64(cpuCount)
} else {
itemBase.LoadUsage = 0
}
memoryInfo, _ := mem.VirtualMemory()
itemBase.Memory = memoryInfo.UsedPercent
return itemBase
}
// loadDiskIO 磁盘读写interval表示采集的平均值分钟
func loadDiskIO(interval float64) []MonitorIO {
func loadDiskIO(duration time.Duration) []MonitorIO {
ioStat, _ := disk.IOCounters()
time.Sleep(time.Duration(interval) * time.Minute)
time.Sleep(duration)
ioStat2, _ := disk.IOCounters()
var ioList []MonitorIO
@@ -104,32 +117,24 @@ func loadDiskIO(interval float64) []MonitorIO {
itemIO.Name = io1.Name
if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes {
itemIO.Read = int64(float64(io2.ReadBytes-io1.ReadBytes) / interval / 60)
itemIO.Read = io2.ReadBytes - io1.ReadBytes
}
if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes {
itemIO.Write = int64(float64(io2.WriteBytes-io1.WriteBytes) / interval / 60)
itemIO.Write = io2.WriteBytes - io1.WriteBytes
}
if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount {
itemIO.Count = int64(float64(io2.ReadCount-io1.ReadCount) / interval / 60)
itemIO.Count = io2.ReadCount - io1.ReadCount
}
writeCount := int64(0)
if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount {
writeCount = int64(float64(io2.WriteCount-io1.WriteCount) / interval * 60)
}
if writeCount > itemIO.Count {
itemIO.Count = writeCount
itemIO.Count += io2.WriteCount - io1.WriteCount
}
if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime {
itemIO.Time = int64(float64(io2.ReadTime-io1.ReadTime) / interval / 60)
itemIO.Time = io2.ReadTime - io1.ReadTime
}
writeTime := int64(0)
if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime {
writeTime = int64(float64(io2.WriteTime-io1.WriteTime) / interval / 60)
}
if writeTime > itemIO.Time {
itemIO.Time = writeTime
itemIO.Time += io2.WriteTime - io1.WriteTime
}
ioList = append(ioList, itemIO)
break
@@ -140,7 +145,7 @@ func loadDiskIO(interval float64) []MonitorIO {
}
// loadNetIO 网络接口包括虚拟接口interval表示采集的平均值分钟
func loadNetIO(interval float64) []MonitorNetwork {
func loadNetIO(duration time.Duration) []MonitorNetwork {
// 获取当前时刻
netStat, _ := net.IOCounters(true)
netStatAll, _ := net.IOCounters(false)
@@ -148,7 +153,7 @@ func loadNetIO(interval float64) []MonitorNetwork {
netStatList = append(netStatList, netStat...)
netStatList = append(netStatList, netStatAll...)
time.Sleep(time.Duration(interval) * time.Minute)
time.Sleep(duration)
// 获取结束时刻
netStat2, _ := net.IOCounters(true)
@@ -168,10 +173,10 @@ func loadNetIO(interval float64) []MonitorNetwork {
// 如果结束时刻发送字节数和当前时刻发送字节数都不为零,并且结束时刻发送字节数大于当前时刻发送字节数
if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent {
itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / interval / 60
itemNet.Up = net2.BytesSent - net1.BytesSent
}
if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv {
itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / interval / 60
itemNet.Down = net2.BytesRecv - net1.BytesRecv
}
netList = append(netList, itemNet)
break

View File

@@ -199,9 +199,10 @@ func (s SysJob) ExportData(rows []model.SysJob, fileName string) (string, error)
}
}
misfirePolicy := "放弃执行"
if row.MisfirePolicy == "1" {
switch row.MisfirePolicy {
case "1":
misfirePolicy = "立即执行"
} else if row.MisfirePolicy == "2" {
case "2":
misfirePolicy = "执行一次"
}
concurrent := "禁止"

View File

@@ -185,9 +185,10 @@ func (r *NeLicense) UploadLicense(neLicense model.NeLicense) error {
// 重启服务
if neLicense.Reload {
cmdStr := fmt.Sprintf("sudo systemctl restart %s", neTypeLower)
if neTypeLower == "ims" {
switch neTypeLower {
case "ims":
cmdStr = "ims-stop || true && ims-start"
} else if neTypeLower == "omc" {
case "omc":
cmdStr = "sudo systemctl restart omc"
}
sshClient.RunCMD(cmdStr)

View File

@@ -245,7 +245,8 @@ func (r NeVersion) operateCommand(action, neType string, neFilePaths []string) (
// 组合命令输入
cmdStrArr := []string{}
if neType == "OMC" {
switch neType {
case "OMC":
omcStrArr := []string{}
if action == "install" {
// 安装软件包
@@ -270,7 +271,7 @@ func (r NeVersion) operateCommand(action, neType string, neFilePaths []string) (
cmdStrArr = append(cmdStrArr, fmt.Sprintf("nohup sh -c \"sleep 2s && %s\" > /tmp/operate_run_%s_omc.out 2>&1 & \n", strings.Join(omcStrArr, " && "), action))
cmdStrArr = append(cmdStrArr, fmt.Sprintf("echo '%s' \n", okFlagStr))
return okFlagStr, cmdStrArr, nil
} else if neType == "IMS" {
case "IMS":
if action == "install" {
para5GData := r.neInfoService.Para5GData
cmdStrArr = append(cmdStrArr, pkgCmdStr+" \n")
@@ -312,7 +313,7 @@ func (r NeVersion) operateCommand(action, neType string, neFilePaths []string) (
cmdStrArr = append(cmdStrArr, pkgCmdStr+" \n")
cmdStrArr = append(cmdStrArr, "ims-start \n")
}
} else {
default:
if action == "install" {
para5GData := r.neInfoService.Para5GData
cmdStrArr = append(cmdStrArr, pkgCmdStr+" \n")

View File

@@ -6,9 +6,10 @@ import (
"net"
"strings"
"github.com/tsmask/go-oam/src/framework/socket"
"be.ems/src/framework/config"
"be.ems/src/framework/logger"
"be.ems/src/framework/socket"
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/parse"
neFetchlink "be.ems/src/modules/network_element/fetch_link"
@@ -21,16 +22,14 @@ import (
// 实例化数据层 TraceTask 结构体
var NewTraceTask = &TraceTask{
udpService: socket.SocketUDP{},
tcpService: socket.SocketTCP{},
traceTaskRepository: repository.NewTraceTask,
traceDataRepository: repository.NewTraceData,
}
// TraceTask 跟踪任务 服务层处理
type TraceTask struct {
udpService socket.SocketUDP // UDP服务对象
tcpService socket.SocketTCP // 测试用后续调整TODO
udpService socket.ServerUDP // UDP服务对象
tcpService socket.ServerTCP // 测试用后续调整TODO
traceTaskRepository *repository.TraceTask // 跟踪_任务数据信息
traceDataRepository *repository.TraceData // 跟踪_数据信息
}
@@ -48,8 +47,8 @@ func (r *TraceTask) CreateUDP(reload bool) error {
}
// 初始化UDP服务
r.udpService = socket.SocketUDP{Addr: host, Port: port}
if _, err := r.udpService.New(); err != nil {
r.udpService = socket.ServerUDP{Addr: host, Port: fmt.Sprint(port)}
if err := r.udpService.Listen(); err != nil {
return err
}
@@ -87,21 +86,20 @@ func (r *TraceTask) CreateUDP(reload bool) error {
// ============ 本地测试接收网元UDP发过来的数据 后续调整TODO
if config.Env() == "local" {
// 初始化TCP服务
r.tcpService = socket.SocketTCP{Addr: host, Port: port + 1}
if _, err := r.tcpService.New(); err != nil {
r.tcpService = socket.ServerTCP{Addr: host, Port: fmt.Sprint(port + 1)}
if err := r.tcpService.Listen(); err != nil {
return err
}
// 接收处理TCP数据
go r.tcpService.Resolve(func(conn *net.Conn, err error) {
go r.tcpService.Resolve(func(conn net.Conn, err error) {
if err != nil {
logger.Errorf("TCP Resolve %s", err.Error())
return
}
c := (*conn)
// 读取数据
buf := make([]byte, 2048)
n, err := c.Read(buf)
n, err := conn.Read(buf)
if err != nil {
logger.Errorf("TCP Resolve Read Error: %s", err.Error())
return
@@ -117,7 +115,7 @@ func (r *TraceTask) CreateUDP(reload bool) error {
}
// 发送响应
if _, err = c.Write([]byte("tcp>")); err != nil {
if _, err = conn.Write([]byte("tcp>")); err != nil {
logger.Errorf("TCP Resolve Write Error: %s", err.Error())
}
buf = nil

View File

@@ -63,16 +63,10 @@ func (s *WSReceive) Commont(client *model.WSClient, reqMsg model.WSRequest) {
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
case "net":
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
case "ims_cdr":
resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data)
case "smf_cdr":
resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data)
case "smsc_cdr":
resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data)
case "amf_ue":
resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data)
case "mme_ue":
resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data)
case "ims_cdr", "smf_cdr", "smsc_cdr", "sgwc_cdr":
resByte, err = processor.GetCDRConnect(reqMsg.RequestID, reqMsg.Data)
case "amf_ue", "mme_ue":
resByte, err = processor.GetUEConnect(reqMsg.RequestID, reqMsg.Data)
case "upf_tf":
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
case "ne_state":

View File

@@ -34,6 +34,10 @@ const (
GROUP_AMF_UE = "1010"
// 组号-MME_UE会话事件 1011_neId
GROUP_MME_UE = "1011"
// 组号-AMF_NB状态事件 1014_neId
GROUP_AMF_NB = "1014"
// 组号-MME_NB状态事件 1015_neId
GROUP_MME_NB = "1015"
// 组号-告警 2000_neType_neId
GROUP_ALARM = "2000"
// 组号-告警事件 2002_neType_neId