feat: 添加CDR和UENB记录删除调度任务及相关数据库操作

This commit is contained in:
TsMask
2025-08-18 18:58:53 +08:00
parent d0d539e4d0
commit 25b596d2c9
14 changed files with 183 additions and 7 deletions

View File

@@ -0,0 +1,67 @@
package delete_CDR_record
import (
"encoding/json"
"fmt"
"strings"
"time"
"be.ems/src/framework/cron"
"be.ems/src/framework/database/db"
"be.ems/src/framework/logger"
)
var NewProcessor = &DeleteCDRRecordProcessor{
count: 0,
}
// DeleteCDRRecordProcessor 删除CDR记录
type DeleteCDRRecordProcessor struct {
count int // 执行次数
}
func (s *DeleteCDRRecordProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
// 读取参数值
var params struct {
StoreDays int `json:"storeDays"` // store days
NeList []string `json:"neList"` // ne list
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params err: %v", err)
}
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
if len(params.NeList) <= 0 {
return nil, fmt.Errorf("params neList less than 0 ")
}
// 计算删除时间
ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli()
for _, neType := range params.NeList {
neTypeLower := strings.ToLower(neType)
// 数据表
cdrTableName := fmt.Sprintf("cdr_event_%s", neTypeLower)
cdrTx := db.DB("").Table(cdrTableName).Where("created_at < ?", ltTime)
if err := cdrTx.Delete(nil).Error; err != nil {
result[neTypeLower] = err.Error()
} else {
result[neTypeLower] = cdrTx.RowsAffected
}
}
// 返回结果,用于记录执行结果
return result, nil
}

View File

@@ -21,7 +21,7 @@ type DeleteDataRecordProcessor struct {
type optionParams struct {
TableName string `json:"tableName"` // table name
ColName string `json:"colName"` // column name
ColName string `json:"colName"` // column name is timestamp milliseconds
StoreDays int `json:"storeDays"` // store days
WhereSQL string `json:"whereSQL"` // extras condition for where
}
@@ -45,13 +45,16 @@ func (s *DeleteDataRecordProcessor) Execute(data any) (any, error) {
if params.TableName == "" {
return nil, fmt.Errorf("params tableName is empty ")
}
if params.ColName == "" {
return nil, fmt.Errorf("params colName is empty ")
}
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
// 指定表名
tx := db.DB("").Table(params.TableName)
if params.StoreDays >= 0 && params.ColName != "" {
if params.StoreDays >= 0 {
// 计算删除时间
ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli()
tx = tx.Where(fmt.Sprintf("%s < ?", params.ColName), ltTime)

View File

@@ -56,18 +56,18 @@ func (s *DeleteKPIRecordProcessor) Execute(data any) (any, error) {
kpiTableName := fmt.Sprintf("kpi_report_%s", neTypeLower)
kpiTx := db.DB("").Table(kpiTableName).Where("created_at < ?", ltTime)
if err := kpiTx.Delete(nil).Error; err != nil {
result[kpiTableName] = err.Error()
result[neTypeLower] = err.Error()
} else {
result[kpiTableName] = kpiTx.RowsAffected
result[neTypeLower] = kpiTx.RowsAffected
}
// KPI自定义数据表
kpicTableName := fmt.Sprintf("kpi_c_report_%s", neTypeLower)
kpicTx := db.DB("").Table(kpicTableName).Where("created_at < ?", ltTime)
if err := kpicTx.Delete(nil).Error; err != nil {
result[kpiTableName] = err.Error()
result["c_"+neTypeLower] = err.Error()
} else {
result[kpiTableName] = kpicTx.RowsAffected
result["c_"+neTypeLower] = kpicTx.RowsAffected
}
}

View File

@@ -0,0 +1,67 @@
package delete_UENB_record
import (
"encoding/json"
"fmt"
"strings"
"time"
"be.ems/src/framework/cron"
"be.ems/src/framework/database/db"
"be.ems/src/framework/logger"
)
var NewProcessor = &DeleteUENBRecordProcessor{
count: 0,
}
// DeleteUENBRecordProcessor 删除UENB记录
type DeleteUENBRecordProcessor struct {
count int // 执行次数
}
func (s *DeleteUENBRecordProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
// 读取参数值
var params struct {
StoreDays int `json:"storeDays"` // store days
NeList []string `json:"neList"` // ne list
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params err: %v", err)
}
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
if len(params.NeList) <= 0 {
return nil, fmt.Errorf("params neList less than 0 ")
}
// 计算删除时间
ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli()
for _, neType := range params.NeList {
neTypeLower := strings.ToLower(neType)
// 数据表
uenbTableName := fmt.Sprintf("ue_event_%s", neTypeLower)
uenbTx := db.DB("").Table(uenbTableName).Where("created_at < ?", ltTime)
if err := uenbTx.Delete(nil).Error; err != nil {
result[neTypeLower] = err.Error()
} else {
result[neTypeLower] = uenbTx.RowsAffected
}
}
// 返回结果,用于记录执行结果
return result, nil
}

View File

@@ -237,7 +237,8 @@ func (s NeAlarmStateCheckCMDProcessor) serverState(state map[string]any) (float6
}
s.neStateService.Insert(neState)
// 删除网元状态记录7天前
s.neStateService.DeleteByTime(time.Now().UnixMilli() - 7*24*60*60*1000)
ltTime := time.Now().AddDate(0, 0, -7).UnixMilli()
s.neStateService.DeleteByTime(ltTime)
// 推送ws消息
groupID := fmt.Sprintf("%s_%s_%s", wsService.GROUP_NE_STATE, neState.NeType, neState.NeId)
s.wsSendService.ByGroupID(groupID, neState)

View File

@@ -8,9 +8,11 @@ import (
processorBackupExportUDM "be.ems/src/modules/crontask/processor/backup_export_udm"
processorBackupRemoveFile "be.ems/src/modules/crontask/processor/backup_remove_file"
processorDeleteAlarmRecord "be.ems/src/modules/crontask/processor/delete_alarm_record"
processorDeleteCDRRecord "be.ems/src/modules/crontask/processor/delete_cdr_record"
processorDeleteDataRecord "be.ems/src/modules/crontask/processor/delete_data_record"
processorDeleteKPIRecord "be.ems/src/modules/crontask/processor/delete_kpi_record"
processorDeleteNeConfigBackup "be.ems/src/modules/crontask/processor/delete_ne_config_backup"
processorDeleteUENBRecord "be.ems/src/modules/crontask/processor/delete_uenb_record"
processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource"
processorNeAlarmStateCheck "be.ems/src/modules/crontask/processor/ne_alarm_state_check"
processorNeAlarmStateCheckCMD "be.ems/src/modules/crontask/processor/ne_alarm_state_check_cmd"
@@ -41,6 +43,10 @@ func InitCronQueue() {
cron.CreateQueue("delete_alarm_record", processorDeleteAlarmRecord.NewProcessor)
// 删除-KPI数据记录
cron.CreateQueue("delete_kpi_record", processorDeleteKPIRecord.NewProcessor)
// 删除-KPI数据记录
cron.CreateQueue("delete_cdr_record", processorDeleteCDRRecord.NewProcessor)
// 删除-UENB数据记录
cron.CreateQueue("delete_uenb_record", processorDeleteUENBRecord.NewProcessor)
// 删除-网元配置文件定期备份
cron.CreateQueue("delete_ne_config_backup", processorDeleteNeConfigBackup.NewProcessor)