feat: 重构调度任务删除记录/导出cdr
This commit is contained in:
@@ -1,174 +0,0 @@
|
||||
package backupEtcFromNE
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/lib/config"
|
||||
"be.ems/lib/dborm"
|
||||
"be.ems/lib/global"
|
||||
"be.ems/lib/log"
|
||||
"be.ems/src/framework/cron"
|
||||
)
|
||||
|
||||
var NewProcessor = &BarProcessor{
|
||||
progress: 0,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// bar 队列任务处理
|
||||
type BarProcessor struct {
|
||||
// 任务进度
|
||||
progress int
|
||||
// 执行次数
|
||||
count int
|
||||
}
|
||||
|
||||
type BarParams struct {
|
||||
Duration int `json:"duration"`
|
||||
TableName string `json:"tableName"`
|
||||
ColName string `json:"colName"` // column name of time string
|
||||
Extras string `json:"extras"` // extras condition for where
|
||||
}
|
||||
|
||||
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||
log.Infof("execute %d,last progress: %d ", s.count, s.progress)
|
||||
s.count++
|
||||
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
// var params BarParams
|
||||
|
||||
// err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
log.Infof("Repeat %v Job ID %d", options.Repeat, sysJob.JobId)
|
||||
|
||||
var nes []dborm.NeInfo
|
||||
_, err := dborm.XormGetAllNeInfo(&nes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var successfulNEs, failureNEs []string
|
||||
for _, neInfo := range nes {
|
||||
neTypeUpper := strings.ToUpper(neInfo.NeType)
|
||||
neTypeLower := strings.ToLower(neInfo.NeType)
|
||||
nePath := fmt.Sprintf("%s/etc/%s", config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
isExist, err := global.PathExists(nePath)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to PathExists:", err)
|
||||
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||
continue
|
||||
}
|
||||
if isExist {
|
||||
err = os.RemoveAll(nePath)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to RemoveAll:", err)
|
||||
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||
continue
|
||||
}
|
||||
}
|
||||
err = os.MkdirAll(nePath, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to MkdirAll:", err)
|
||||
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||
continue
|
||||
}
|
||||
|
||||
var scpCmd string
|
||||
ipType := global.ParseIPAddr(neInfo.Ip)
|
||||
omcNetypeLower := strings.ToLower(config.GetYamlConfig().OMC.NeType)
|
||||
etcListIMS := "{*.yaml,mmtel,vars.cfg}"
|
||||
if config.GetYamlConfig().NE.EtcListIMS != "" {
|
||||
etcListIMS = config.GetYamlConfig().NE.EtcListIMS
|
||||
}
|
||||
switch neTypeLower {
|
||||
case omcNetypeLower:
|
||||
if ipType == global.IsIPv4 {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
} else {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
}
|
||||
|
||||
case "ims":
|
||||
if ipType == global.IsIPv4 {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/%s %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.EtcDir, neTypeLower,
|
||||
etcListIMS, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
} else {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/%s %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.EtcDir, neTypeLower,
|
||||
etcListIMS, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
}
|
||||
|
||||
case "mme":
|
||||
if ipType == global.IsIPv4 {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/*.conf %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
|
||||
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
} else {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/*.conf %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
|
||||
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
}
|
||||
|
||||
default:
|
||||
if ipType == global.IsIPv4 {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
|
||||
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
} else {
|
||||
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
|
||||
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
zipFile := fmt.Sprintf("%s-%s-etc-%s.zip", neTypeLower, strings.ToLower(neInfo.NeId), time.Now().Format(global.DateData))
|
||||
zipFilePath := config.GetYamlConfig().OMC.Backup + "/" + zipFile
|
||||
zipCmd := fmt.Sprintf("cd %s/etc && zip -r %s %s/*", config.GetYamlConfig().OMC.Backup, zipFilePath, neTypeLower)
|
||||
|
||||
command := fmt.Sprintf("%s&&%s", scpCmd, zipCmd)
|
||||
|
||||
log.Trace("command:", command)
|
||||
out, err := global.ExecCmd(command)
|
||||
if err != nil {
|
||||
log.Error("Faile to exec command:", err)
|
||||
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||
continue
|
||||
}
|
||||
log.Trace("command output:", out)
|
||||
|
||||
md5Sum, err := global.GetFileMD5Sum(zipFilePath)
|
||||
if err != nil {
|
||||
log.Error("Faile to md5sum:", err)
|
||||
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||
continue
|
||||
}
|
||||
//log.Debug("md5Str:", md5Sum)
|
||||
path := config.GetYamlConfig().OMC.Backup
|
||||
neBackup := dborm.NeBackup{NeType: neTypeUpper, NeId: neInfo.NeId, FileName: zipFile, Path: path, Md5Sum: md5Sum}
|
||||
_, err = dborm.XormInsertTableOne("ne_backup", neBackup)
|
||||
if err != nil {
|
||||
log.Error("Faile to XormInsertTableOne:", err)
|
||||
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||
continue
|
||||
}
|
||||
successfulNEs = append(successfulNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||
}
|
||||
|
||||
log.Infof("successfulNEs: %s failureNEs: %s", successfulNEs, failureNEs)
|
||||
// result
|
||||
return map[string]any{
|
||||
"successfulNEs": successfulNEs,
|
||||
"failureNEs": failureNEs,
|
||||
}, nil
|
||||
}
|
||||
@@ -1,80 +0,0 @@
|
||||
package delExpiredNeBackup
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"be.ems/lib/dborm"
|
||||
"be.ems/lib/log"
|
||||
"be.ems/src/framework/cron"
|
||||
)
|
||||
|
||||
var NewProcessor = &BarProcessor{
|
||||
progress: 0,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// bar 队列任务处理
|
||||
type BarProcessor struct {
|
||||
// 任务进度
|
||||
progress int
|
||||
// 执行次数
|
||||
count int
|
||||
}
|
||||
|
||||
type BarParams struct {
|
||||
Duration int `json:"duration"`
|
||||
}
|
||||
|
||||
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||
log.Infof("执行 %d 次,上次进度: %d ", s.count, s.progress)
|
||||
s.count++
|
||||
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
var params BarParams
|
||||
duration := 60
|
||||
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err == nil {
|
||||
duration = params.Duration
|
||||
}
|
||||
log.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId)
|
||||
|
||||
// // 实现任务处理逻辑
|
||||
// i := 0
|
||||
// s.progress = i
|
||||
// for i < 5 {
|
||||
// // 获取任务进度
|
||||
// progress := s.progress
|
||||
// log.Infof("jonId: %s => 任务进度:%d", sysJob.JobID, progress)
|
||||
// // 延迟响应
|
||||
// time.Sleep(time.Second * 2)
|
||||
// // 程序中途执行错误
|
||||
// if i == 3 {
|
||||
// // arr := [1]int{1}
|
||||
// // arr[i] = 3
|
||||
// // fmt.Println(arr)
|
||||
// // return "i = 3"
|
||||
// panic("程序中途执行错误")
|
||||
// }
|
||||
// i++
|
||||
// // 改变任务进度
|
||||
// s.progress = i
|
||||
// }
|
||||
where := fmt.Sprintf("NOW()>ADDDATE(`create_time`,interval %d day)", duration)
|
||||
affected, err := dborm.XormDeleteDataByWhere(where, "ne_backup")
|
||||
if err != nil {
|
||||
// panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// delete expired files in backup directory
|
||||
// todo ...
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return map[string]any{
|
||||
"msg": "sucess",
|
||||
"affected": affected,
|
||||
}, nil
|
||||
}
|
||||
@@ -1,98 +0,0 @@
|
||||
package deleteExpiredRecord
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"be.ems/lib/dborm"
|
||||
"be.ems/lib/log"
|
||||
"be.ems/src/framework/cron"
|
||||
)
|
||||
|
||||
var NewProcessor = &BarProcessor{
|
||||
progress: 0,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// bar 队列任务处理
|
||||
type BarProcessor struct {
|
||||
// 任务进度
|
||||
progress int
|
||||
// 执行次数
|
||||
count int
|
||||
}
|
||||
|
||||
type BarParams struct {
|
||||
Duration int `json:"duration"`
|
||||
TableName string `json:"tableName"`
|
||||
ColName string `json:"colName"` // column name of time string
|
||||
Extras string `json:"extras"` // extras condition for where
|
||||
SessFlag bool `json:"sessFlag"` // session flag, true: session model, false: no session
|
||||
}
|
||||
|
||||
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||
log.Infof("执行 %d 次,上次进度: %d ", s.count, s.progress)
|
||||
s.count++
|
||||
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
var params BarParams
|
||||
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//duration = params.Duration
|
||||
log.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId)
|
||||
|
||||
// // 实现任务处理逻辑
|
||||
// i := 0
|
||||
// s.progress = i
|
||||
// for i < 5 {
|
||||
// // 获取任务进度
|
||||
// progress := s.progress
|
||||
// log.Infof("jonId: %s => 任务进度:%d", sysJob.JobID, progress)
|
||||
// // 延迟响应
|
||||
// time.Sleep(time.Second * 2)
|
||||
// // 程序中途执行错误
|
||||
// if i == 3 {
|
||||
// // arr := [1]int{1}
|
||||
// // arr[i] = 3
|
||||
// // fmt.Println(arr)
|
||||
// // return "i = 3"
|
||||
// panic("程序中途执行错误")
|
||||
// }
|
||||
// i++
|
||||
// // 改变任务进度
|
||||
// s.progress = i
|
||||
// }
|
||||
|
||||
var where string
|
||||
if params.Extras == "" {
|
||||
where = fmt.Sprintf("NOW()>ADDDATE(`%s`,interval %d day)", params.ColName, params.Duration)
|
||||
} else {
|
||||
where = fmt.Sprintf("NOW()>ADDDATE(`%s`,interval %d day) and %s", params.ColName, params.Duration, params.Extras)
|
||||
}
|
||||
|
||||
var affected int64 = 0
|
||||
if params.SessFlag {
|
||||
affected, err = dborm.XormDeleteDataByWhere(where, params.TableName)
|
||||
if err != nil {
|
||||
// panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err))
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
affected, err = dborm.XormDeleteDataByWhereNoSession(where, params.TableName)
|
||||
if err != nil {
|
||||
// panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return map[string]any{
|
||||
"msg": "sucess",
|
||||
"affected": affected,
|
||||
}, nil
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package delete_alarm_record
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/cron"
|
||||
"be.ems/src/framework/database/db"
|
||||
"be.ems/src/framework/logger"
|
||||
)
|
||||
|
||||
var NewProcessor = &DeleteAlarmRecordProcessor{
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// DeleteAlarmRecordProcessor 删除告警记录
|
||||
type DeleteAlarmRecordProcessor struct {
|
||||
count int // 执行次数
|
||||
}
|
||||
|
||||
func (s *DeleteAlarmRecordProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
}
|
||||
|
||||
// 读取参数值
|
||||
var params struct {
|
||||
StoreDays int `json:"storeDays"` // store days
|
||||
}
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("json params err: %v", err)
|
||||
}
|
||||
if params.StoreDays < 0 {
|
||||
return nil, fmt.Errorf("params storeDays less than 0 ")
|
||||
}
|
||||
|
||||
// 计算删除时间
|
||||
ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli()
|
||||
|
||||
// 告警表
|
||||
alarmTx := db.DB("").Table("alarm").Where("timestamp < ?", ltTime)
|
||||
if err := alarmTx.Delete(nil).Error; err != nil {
|
||||
result["alarm"] = err.Error()
|
||||
} else {
|
||||
result["alarm"] = alarmTx.RowsAffected
|
||||
}
|
||||
|
||||
// 告警事件表
|
||||
alarmEventTx := db.DB("").Table("alarm_event").Where("timestamp < ?", ltTime)
|
||||
if err := alarmEventTx.Delete(nil).Error; err != nil {
|
||||
result["alarm_event"] = err.Error()
|
||||
} else {
|
||||
result["alarm_event"] = alarmEventTx.RowsAffected
|
||||
}
|
||||
|
||||
// 告警日志表
|
||||
alarmLogTx := db.DB("").Table("alarm_log").Where("created_at < ?", ltTime)
|
||||
if err := alarmLogTx.Delete(nil).Error; err != nil {
|
||||
result["alarm_log"] = err.Error()
|
||||
} else {
|
||||
result["alarm_log"] = alarmLogTx.RowsAffected
|
||||
}
|
||||
|
||||
// 告警转发日志表
|
||||
alarmForwardLogTx := db.DB("").Table("alarm_forward_log").Where("created_at < ?", ltTime)
|
||||
if err := alarmForwardLogTx.Delete(nil).Error; err != nil {
|
||||
result["alarm_forward_log"] = err.Error()
|
||||
} else {
|
||||
result["alarm_forward_log"] = alarmForwardLogTx.RowsAffected
|
||||
}
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return result, nil
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package delete_data_record
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/cron"
|
||||
"be.ems/src/framework/database/db"
|
||||
"be.ems/src/framework/logger"
|
||||
)
|
||||
|
||||
var NewProcessor = &DeleteDataRecordProcessor{
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// bar 队列任务处理
|
||||
type DeleteDataRecordProcessor struct {
|
||||
count int // 执行次数
|
||||
}
|
||||
|
||||
type optionParams struct {
|
||||
TableName string `json:"tableName"` // table name
|
||||
ColName string `json:"colName"` // column name
|
||||
StoreDays int `json:"storeDays"` // store days
|
||||
WhereSQL string `json:"whereSQL"` // extras condition for where
|
||||
}
|
||||
|
||||
func (s *DeleteDataRecordProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
}
|
||||
|
||||
// 读取参数值
|
||||
var params optionParams
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("json params err: %v", err)
|
||||
}
|
||||
if params.TableName == "" {
|
||||
return nil, fmt.Errorf("params tableName is empty ")
|
||||
}
|
||||
if params.StoreDays < 0 {
|
||||
return nil, fmt.Errorf("params storeDays less than 0 ")
|
||||
}
|
||||
// 指定表名
|
||||
tx := db.DB("").Table(params.TableName)
|
||||
|
||||
if params.StoreDays >= 0 && params.ColName != "" {
|
||||
// 计算删除时间
|
||||
ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli()
|
||||
tx = tx.Where(fmt.Sprintf("%s < ?", params.ColName), ltTime)
|
||||
}
|
||||
if params.WhereSQL != "" {
|
||||
tx = tx.Where(params.WhereSQL)
|
||||
}
|
||||
|
||||
// 执行删除
|
||||
if err := tx.Delete(nil).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result["affected"] = tx.RowsAffected
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return result, nil
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
package delete_kpi_record
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/cron"
|
||||
"be.ems/src/framework/database/db"
|
||||
"be.ems/src/framework/logger"
|
||||
)
|
||||
|
||||
var NewProcessor = &DeleteKPIRecordProcessor{
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// DeleteKPIRecordProcessor 删除KPI记录
|
||||
type DeleteKPIRecordProcessor struct {
|
||||
count int // 执行次数
|
||||
}
|
||||
|
||||
func (s *DeleteKPIRecordProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
}
|
||||
|
||||
// 读取参数值
|
||||
var params struct {
|
||||
StoreDays int `json:"storeDays"` // store days
|
||||
NeList []string `json:"neList"` // ne list
|
||||
}
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("json params err: %v", err)
|
||||
}
|
||||
if params.StoreDays < 0 {
|
||||
return nil, fmt.Errorf("params storeDays less than 0 ")
|
||||
}
|
||||
if len(params.NeList) <= 0 {
|
||||
return nil, fmt.Errorf("params neList less than 0 ")
|
||||
}
|
||||
|
||||
// 计算删除时间
|
||||
ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli()
|
||||
|
||||
for _, neType := range params.NeList {
|
||||
neTypeLower := strings.ToLower(neType)
|
||||
|
||||
// KPI数据表
|
||||
kpiTableName := fmt.Sprintf("kpi_report_%s", neTypeLower)
|
||||
kpiTx := db.DB("").Table(kpiTableName).Where("created_at < ?", ltTime)
|
||||
if err := kpiTx.Delete(nil).Error; err != nil {
|
||||
result[kpiTableName] = err.Error()
|
||||
} else {
|
||||
result[kpiTableName] = kpiTx.RowsAffected
|
||||
}
|
||||
|
||||
// KPI自定义数据表
|
||||
kpicTableName := fmt.Sprintf("kpi_c_report_%s", neTypeLower)
|
||||
kpicTx := db.DB("").Table(kpicTableName).Where("created_at < ?", ltTime)
|
||||
if err := kpicTx.Delete(nil).Error; err != nil {
|
||||
result[kpiTableName] = err.Error()
|
||||
} else {
|
||||
result[kpiTableName] = kpicTx.RowsAffected
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return result, nil
|
||||
}
|
||||
@@ -0,0 +1,128 @@
|
||||
package delete_ne_config_backup
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/cron"
|
||||
"be.ems/src/framework/database/db"
|
||||
"be.ems/src/framework/logger"
|
||||
"be.ems/src/framework/utils/date"
|
||||
neModel "be.ems/src/modules/network_element/model"
|
||||
neService "be.ems/src/modules/network_element/service"
|
||||
)
|
||||
|
||||
var NewProcessor = &DeleteNeConfigBackupProcessor{
|
||||
neInfoService: neService.NewNeInfo,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// DeleteNeConfigBackupProcessor 网元配置文件定期备份
|
||||
type DeleteNeConfigBackupProcessor struct {
|
||||
neInfoService *neService.NeInfo // 网元信息服务
|
||||
count int // 执行次数
|
||||
}
|
||||
|
||||
func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
}
|
||||
|
||||
// 读取参数值
|
||||
var params struct {
|
||||
StoreDays int `json:"storeDays"` // store days
|
||||
}
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("json params err: %v", err)
|
||||
}
|
||||
if params.StoreDays < 0 {
|
||||
return nil, fmt.Errorf("params storeDays less than 0 ")
|
||||
}
|
||||
|
||||
neList := s.neInfoService.Find(neModel.NeInfo{}, false, false)
|
||||
for _, neInfo := range neList {
|
||||
neTypeAndId := fmt.Sprintf("%s_%s", neInfo.NeType, neInfo.NeId)
|
||||
tx := db.DB("").Model(&neModel.NeConfigBackup{})
|
||||
tx = tx.Where("ne_type = ? and ne_id = ?", neInfo.NeType, neInfo.NeId)
|
||||
|
||||
// 查询最后记录数据
|
||||
var lastCreateTime int64 = 0
|
||||
lastTx := tx.Select("create_time").Order("create_time DESC").Limit(1)
|
||||
if err := lastTx.Find(&lastCreateTime).Error; err != nil {
|
||||
result[neTypeAndId] = err.Error()
|
||||
continue
|
||||
}
|
||||
|
||||
if lastCreateTime <= 1e12 {
|
||||
result[neTypeAndId] = "no data"
|
||||
continue
|
||||
}
|
||||
|
||||
// 计算删除时间
|
||||
lastTime := time.UnixMilli(lastCreateTime)
|
||||
ltTime := lastTime.AddDate(0, 0, -params.StoreDays)
|
||||
|
||||
// 删除小于最后时间的数据
|
||||
delTx := tx.Delete("create_time < ?", ltTime.UnixMilli())
|
||||
if err := delTx.Error; err != nil {
|
||||
result[neTypeAndId] = err.Error()
|
||||
continue
|
||||
}
|
||||
result[neTypeAndId] = tx.RowsAffected
|
||||
|
||||
// 删除本地文件
|
||||
s.deleteFile(neInfo.NeType, neInfo.NeId, ltTime)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// deleteFile 删除本地文件
|
||||
func (s DeleteNeConfigBackupProcessor) deleteFile(neType, neId string, oldFileDate time.Time) {
|
||||
neTypeLower := strings.ToLower(neType)
|
||||
localPath := fmt.Sprintf("/usr/local/etc/omc/ne_config/%s/%s/backup ", neTypeLower, neId)
|
||||
files, err := os.ReadDir(localPath)
|
||||
if err != nil {
|
||||
logger.Errorf("logger Remove ne_config File ReadDir err: %v", err.Error())
|
||||
return
|
||||
}
|
||||
for _, file := range files {
|
||||
// 跳过非指定文件名
|
||||
// zipFileName := fmt.Sprintf("%s-%s-etc-%s.zip", neTypeLower, neInfo.NeId, date.ParseDateToStr(time.Now(), date.YYYYMMDDHHMMSS))
|
||||
fileName := fmt.Sprintf("%s-%s-etc-", neTypeLower, neId)
|
||||
if !strings.HasPrefix(file.Name(), fileName) {
|
||||
continue
|
||||
}
|
||||
idx := strings.LastIndex(file.Name(), "-")
|
||||
if idx == -1 {
|
||||
continue
|
||||
}
|
||||
dateStr := file.Name()[idx+1 : idx+15]
|
||||
|
||||
// 解析日期字符串
|
||||
fileDate, err := time.Parse(date.YYYYMMDDHHMMSS, dateStr)
|
||||
if err != nil {
|
||||
logger.Errorf("logger Remove ne_config name Parse err: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// 判断文件日期是否在给定日期之前
|
||||
if fileDate.Before(oldFileDate) {
|
||||
err := os.Remove(filepath.Join(localPath, file.Name()))
|
||||
if err != nil {
|
||||
logger.Errorf("logger Remove ne_config file err: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -29,7 +29,7 @@ func (s *MonitorSysResourceProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId)
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
|
||||
// 读取参数值
|
||||
var params struct {
|
||||
|
||||
@@ -51,7 +51,7 @@ func (s *NeAlarmStateCheckProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId)
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
|
||||
@@ -27,7 +27,7 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId)
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
|
||||
@@ -29,7 +29,7 @@ func (s *NeDataUDM) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId)
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
|
||||
@@ -2,11 +2,11 @@ package processor
|
||||
|
||||
import (
|
||||
"be.ems/src/framework/cron"
|
||||
"be.ems/src/modules/crontask/processor/backupEtcFromNE"
|
||||
"be.ems/src/modules/crontask/processor/delExpiredNeBackup"
|
||||
"be.ems/src/modules/crontask/processor/deleteExpiredRecord"
|
||||
processorDeleteAlarmRecord "be.ems/src/modules/crontask/processor/delete_alarm_record"
|
||||
processorDeleteDataRecord "be.ems/src/modules/crontask/processor/delete_data_record"
|
||||
processorDeleteKPIRecord "be.ems/src/modules/crontask/processor/delete_kpi_record"
|
||||
processorDeleteNeConfigBackup "be.ems/src/modules/crontask/processor/delete_ne_config_backup"
|
||||
"be.ems/src/modules/crontask/processor/exportTable"
|
||||
"be.ems/src/modules/crontask/processor/getStateFromNE"
|
||||
processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource"
|
||||
processorNeAlarmStateCheck "be.ems/src/modules/crontask/processor/ne_alarm_state_check"
|
||||
processorNeConfigBackup "be.ems/src/modules/crontask/processor/ne_config_backup"
|
||||
@@ -24,11 +24,16 @@ func InitCronQueue() {
|
||||
cron.CreateQueue("ne_data_udm", processorNeDataUDM.NewProcessor)
|
||||
// 网元告警-状态检查
|
||||
cron.CreateQueue("ne_alarm_state_check", processorNeAlarmStateCheck.NewProcessor)
|
||||
// delete expired NE backup file
|
||||
cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor)
|
||||
cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor)
|
||||
cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor)
|
||||
cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor)
|
||||
|
||||
// 删除-表内数据记录
|
||||
cron.CreateQueue("delete_data_record", processorDeleteDataRecord.NewProcessor)
|
||||
// 删除-告警数据记录
|
||||
cron.CreateQueue("delete_alarm_record", processorDeleteAlarmRecord.NewProcessor)
|
||||
// 删除-KPI数据记录
|
||||
cron.CreateQueue("delete_kpi_record", processorDeleteKPIRecord.NewProcessor)
|
||||
// 删除-网元配置文件定期备份
|
||||
cron.CreateQueue("delete_ne_config_backup", processorDeleteNeConfigBackup.NewProcessor)
|
||||
|
||||
cron.CreateQueue("exportTable", exportTable.NewProcessor)
|
||||
cron.CreateQueue("removeFile", removeFile.NewProcessor)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user