feat: 添加CDR和UENB记录删除调度任务及相关数据库操作

This commit is contained in:
TsMask
2025-08-19 16:09:33 +08:00
parent c2dbb98b30
commit 0379abdb87
13 changed files with 353 additions and 14 deletions

View File

@@ -0,0 +1,77 @@
package delete_alarm_record
import (
"encoding/json"
"fmt"
"be.ems/src/framework/cron"
"be.ems/src/framework/database/db"
"be.ems/src/framework/logger"
)
var NewProcessor = &DeleteAlarmRecordProcessor{
count: 0,
}
// DeleteAlarmRecordProcessor 删除告警记录
type DeleteAlarmRecordProcessor struct {
count int // 执行次数
}
func (s *DeleteAlarmRecordProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%s 执行次数:%d", options.Repeat, sysJob.JobID, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
// 读取参数值
var params struct {
StoreDays int `json:"storeDays"` // store days
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params err: %v", err)
}
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
// 告警表
alarmTx := db.DB("").Table("alarm").Where("NOW()>ADDDATE(`timestamp`,interval ? day)", params.StoreDays)
if err := alarmTx.Delete(nil).Error; err != nil {
result["alarm"] = err.Error()
} else {
result["alarm"] = alarmTx.RowsAffected
}
// 告警事件表
alarmEventTx := db.DB("").Table("alarm_event").Where("NOW()>ADDDATE(`timestamp`,interval ? day)", params.StoreDays)
if err := alarmEventTx.Delete(nil).Error; err != nil {
result["alarm_event"] = err.Error()
} else {
result["alarm_event"] = alarmEventTx.RowsAffected
}
// 告警日志表
alarmLogTx := db.DB("").Table("alarm_log").Where("NOW()>ADDDATE(`log_time`,interval ? day)", params.StoreDays)
if err := alarmLogTx.Delete(nil).Error; err != nil {
result["alarm_log"] = err.Error()
} else {
result["alarm_log"] = alarmLogTx.RowsAffected
}
// 告警转发日志表
alarmForwardLogTx := db.DB("").Table("alarm_forward_log").Where("NOW()>ADDDATE(`log_time`,interval ? day)", params.StoreDays)
if err := alarmForwardLogTx.Delete(nil).Error; err != nil {
result["alarm_forward_log"] = err.Error()
} else {
result["alarm_forward_log"] = alarmForwardLogTx.RowsAffected
}
// 返回结果,用于记录执行结果
return result, nil
}

View File

@@ -0,0 +1,63 @@
package delete_CDR_record
import (
"encoding/json"
"fmt"
"strings"
"be.ems/src/framework/cron"
"be.ems/src/framework/database/db"
"be.ems/src/framework/logger"
)
var NewProcessor = &DeleteCDRRecordProcessor{
count: 0,
}
// DeleteCDRRecordProcessor 删除CDR记录
type DeleteCDRRecordProcessor struct {
count int // 执行次数
}
func (s *DeleteCDRRecordProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%s 执行次数:%d", options.Repeat, sysJob.JobID, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
// 读取参数值
var params struct {
StoreDays int `json:"storeDays"` // store days
NeList []string `json:"neList"` // ne list
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params err: %v", err)
}
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
if len(params.NeList) <= 0 {
return nil, fmt.Errorf("params neList less than 0 ")
}
for _, neType := range params.NeList {
neTypeLower := strings.ToLower(neType)
// 数据表
cdrTableName := fmt.Sprintf("cdr_event_%s", neTypeLower)
cdrTx := db.DB("").Table(cdrTableName).Where("NOW()>ADDDATE(`created_at`,interval ? day)", params.StoreDays)
if err := cdrTx.Delete(nil).Error; err != nil {
result[neTypeLower] = err.Error()
} else {
result[neTypeLower] = cdrTx.RowsAffected
}
}
// 返回结果,用于记录执行结果
return result, nil
}

View File

@@ -0,0 +1,73 @@
package delete_kpi_record
import (
"encoding/json"
"fmt"
"strings"
"be.ems/src/framework/cron"
"be.ems/src/framework/database/db"
"be.ems/src/framework/logger"
)
var NewProcessor = &DeleteKPIRecordProcessor{
count: 0,
}
// DeleteKPIRecordProcessor 删除KPI记录
type DeleteKPIRecordProcessor struct {
count int // 执行次数
}
func (s *DeleteKPIRecordProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%s 执行次数:%d", options.Repeat, sysJob.JobID, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
// 读取参数值
var params struct {
StoreDays int `json:"storeDays"` // store days
NeList []string `json:"neList"` // ne list
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params err: %v", err)
}
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
if len(params.NeList) <= 0 {
return nil, fmt.Errorf("params neList less than 0 ")
}
for _, neType := range params.NeList {
neTypeLower := strings.ToLower(neType)
// KPI数据表
kpiTableName := fmt.Sprintf("kpi_report_%s", neTypeLower)
kpiTx := db.DB("").Table(kpiTableName).Where("NOW()>ADDDATE(`created_at`,interval ? day)", params.StoreDays)
if err := kpiTx.Delete(nil).Error; err != nil {
result[neTypeLower] = err.Error()
} else {
result[neTypeLower] = kpiTx.RowsAffected
}
// KPI自定义数据表
kpicTableName := fmt.Sprintf("kpi_c_report_%s", neTypeLower)
kpicTx := db.DB("").Table(kpicTableName).Where("NOW()>ADDDATE(`created_at`,interval ? day)", params.StoreDays)
if err := kpicTx.Delete(nil).Error; err != nil {
result["c_"+neTypeLower] = err.Error()
} else {
result["c_"+neTypeLower] = kpicTx.RowsAffected
}
}
// 返回结果,用于记录执行结果
return result, nil
}

View File

@@ -0,0 +1,63 @@
package delete_UENB_record
import (
"encoding/json"
"fmt"
"strings"
"be.ems/src/framework/cron"
"be.ems/src/framework/database/db"
"be.ems/src/framework/logger"
)
var NewProcessor = &DeleteUENBRecordProcessor{
count: 0,
}
// DeleteUENBRecordProcessor 删除UENB记录
type DeleteUENBRecordProcessor struct {
count int // 执行次数
}
func (s *DeleteUENBRecordProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%s 执行次数:%d", options.Repeat, sysJob.JobID, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
// 读取参数值
var params struct {
StoreDays int `json:"storeDays"` // store days
NeList []string `json:"neList"` // ne list
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params err: %v", err)
}
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
if len(params.NeList) <= 0 {
return nil, fmt.Errorf("params neList less than 0 ")
}
for _, neType := range params.NeList {
neTypeLower := strings.ToLower(neType)
// 数据表
uenbTableName := fmt.Sprintf("ue_event_%s", neTypeLower)
uenbTx := db.DB("").Table(uenbTableName).Where("NOW()>ADDDATE(`created_at`,interval ? day)", params.StoreDays)
if err := uenbTx.Delete(nil).Error; err != nil {
result[neTypeLower] = err.Error()
} else {
result[neTypeLower] = uenbTx.RowsAffected
}
}
// 返回结果,用于记录执行结果
return result, nil
}

View File

@@ -9,7 +9,11 @@ import (
processorBackupRemoveFile "be.ems/src/modules/crontask/processor/backup_remove_file"
"be.ems/src/modules/crontask/processor/delExpiredNeBackup"
"be.ems/src/modules/crontask/processor/deleteExpiredRecord"
processorDeleteAlarmRecord "be.ems/src/modules/crontask/processor/delete_alarm_record"
processorDeleteCDRRecord "be.ems/src/modules/crontask/processor/delete_cdr_record"
processorDeleteKPIRecord "be.ems/src/modules/crontask/processor/delete_kpi_record"
processorDeleteNeConfigBackup "be.ems/src/modules/crontask/processor/delete_ne_config_backup"
processorDeleteUENBRecord "be.ems/src/modules/crontask/processor/delete_uenb_record"
"be.ems/src/modules/crontask/processor/exportTable"
"be.ems/src/modules/crontask/processor/exportUEData"
"be.ems/src/modules/crontask/processor/genNeStateAlarm"
@@ -33,6 +37,14 @@ func InitCronQueue() {
cron.CreateQueue("ne_data_udm", processorNeDataUDM.NewProcessor)
// 删除-网元配置文件定期备份
cron.CreateQueue("delete_ne_config_backup", processorDeleteNeConfigBackup.NewProcessor)
// 删除-告警数据记录
cron.CreateQueue("delete_alarm_record", processorDeleteAlarmRecord.NewProcessor)
// 删除-KPI数据记录
cron.CreateQueue("delete_kpi_record", processorDeleteKPIRecord.NewProcessor)
// 删除-KPI数据记录
cron.CreateQueue("delete_cdr_record", processorDeleteCDRRecord.NewProcessor)
// 删除-UENB数据记录
cron.CreateQueue("delete_uenb_record", processorDeleteUENBRecord.NewProcessor)
// delete expired NE backup file
cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor)