diff --git a/src/modules/crontask/processor/exportTable/exportTable.go b/src/modules/crontask/processor/backup_export_table/backup_export_table.go similarity index 91% rename from src/modules/crontask/processor/exportTable/exportTable.go rename to src/modules/crontask/processor/backup_export_table/backup_export_table.go index e445bfd6..f9e37c78 100644 --- a/src/modules/crontask/processor/exportTable/exportTable.go +++ b/src/modules/crontask/processor/backup_export_table/backup_export_table.go @@ -1,9 +1,10 @@ -package exportTable +package backup_export_table import ( "encoding/json" "fmt" "path/filepath" + "runtime" "strings" "time" @@ -11,25 +12,27 @@ import ( "be.ems/src/framework/database/db" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" - "be.ems/src/framework/ssh" "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/file" "be.ems/src/framework/utils/parse" neDataModel "be.ems/src/modules/network_data/model" + neDataService "be.ems/src/modules/network_data/service" systemModel "be.ems/src/modules/system/model" systemService "be.ems/src/modules/system/service" ) -var NewProcessor = &BarProcessor{ - count: 0, +var NewProcessor = &BackupExportTableProcessor{ + backupService: neDataService.NewBackup, + count: 0, } -// bar 队列任务处理 -type BarProcessor struct { - count int // 执行次数 +// BackupExportTable 备份导出数据表 +type BackupExportTableProcessor struct { + backupService *neDataService.Backup // 备份相关服务 + count int // 执行次数 } -func (s *BarProcessor) Execute(data any) (any, error) { +func (s *BackupExportTableProcessor) Execute(data any) (any, error) { s.count++ // 执行次数加一 options := data.(cron.JobData) sysJob := options.SysJob @@ -40,10 +43,10 @@ func (s *BarProcessor) Execute(data any) (any, error) { } var params struct { - Hour int `json:"hour"` // hour - TableName string `json:"tableName"` - Columns []string `json:"columns"` - FilePath string `json:"filePath"` // file path + Hour int `json:"hour"` // 数据时间从任务执行时间前的小时数 + TableName string `json:"tableName"` // 数据表名 + Columns []string `json:"columns"` // 支持字段 + BackupPath string `json:"backupPath"` // 备份输出路径 /usr/local/omc/backup/{backupPath} } err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) if err != nil { @@ -52,7 +55,11 @@ func (s *BarProcessor) Execute(data any) (any, error) { var affected int64 var errMsg error - filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, strings.ToLower(params.TableName), time.Now().Format("20060102150405")) + fileName := fmt.Sprintf("%s_export_%s.csv", strings.ToLower(params.TableName), time.Now().Format("20060102150405")) + filePath := filepath.Join("/usr/local/omc/backup", params.BackupPath, fileName) + if runtime.GOOS == "windows" { + filePath = fmt.Sprintf("C:%s", filePath) + } switch params.TableName { case "sys_log_operate": affected, errMsg = s.exportSysLogOperate(params.Hour, params.Columns, filePath) @@ -69,18 +76,20 @@ func (s *BarProcessor) Execute(data any) (any, error) { return nil, errMsg } - // put ftp + // 上传到FTP服务器 if affected > 0 { - result["affected"] = affected - s.putFTP(filePath) + if err := s.backupService.FTPPushFile(filePath, params.BackupPath); err != nil { + result["ftpErr"] = err.Error() + } } + result["affected"] = affected // 返回结果,用于记录执行结果 return result, nil } // exportSysLogOperate 导出csv -func (s BarProcessor) exportSysLogOperate(hour int, columns []string, filePath string) (int64, error) { +func (s BackupExportTableProcessor) exportSysLogOperate(hour int, columns []string, filePath string) (int64, error) { // 前 hour 小时 now := time.Now() end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) @@ -181,7 +190,7 @@ func (s BarProcessor) exportSysLogOperate(hour int, columns []string, filePath s } // exportSMF 导出csv -func (s BarProcessor) exportSMF(hour int, columns []string, filePath string) (int64, error) { +func (s BackupExportTableProcessor) exportSMF(hour int, columns []string, filePath string) (int64, error) { // 前 hour 小时 now := time.Now() end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) @@ -412,7 +421,7 @@ func (s BarProcessor) exportSMF(hour int, columns []string, filePath string) (in } // exportIMS 导出csv -func (s BarProcessor) exportIMS(hour int, columns []string, filePath string) (int64, error) { +func (s BackupExportTableProcessor) exportIMS(hour int, columns []string, filePath string) (int64, error) { // 前 hour 小时 now := time.Now() end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) @@ -552,7 +561,7 @@ func (s BarProcessor) exportIMS(hour int, columns []string, filePath string) (in } // exportSMSC 导出csv -func (s BarProcessor) exportSMSC(hour int, columns []string, filePath string) (int64, error) { +func (s BackupExportTableProcessor) exportSMSC(hour int, columns []string, filePath string) (int64, error) { // 前 hour 小时 now := time.Now() end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) @@ -677,7 +686,7 @@ func (s BarProcessor) exportSMSC(hour int, columns []string, filePath string) (i } // exportSGWC 导出csv -func (s BarProcessor) exportSGWC(hour int, columns []string, filePath string) (int64, error) { +func (s BackupExportTableProcessor) exportSGWC(hour int, columns []string, filePath string) (int64, error) { // 前 hour 小时 now := time.Now() end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) @@ -912,54 +921,3 @@ func (s BarProcessor) exportSGWC(hour int, columns []string, filePath string) (i return tx.RowsAffected, err } - -// putFTP 提交到服务器ssh -func (s BarProcessor) putFTP(localFilePath string) { - // 获取配置 - var cfgData struct { - Password string `json:"password" ` - Username string `json:"username" binding:"required"` - ToIp string `json:"toIp" binding:"required"` - ToPort int64 `json:"toPort" binding:"required"` - Enable bool `json:"enable"` - Dir string `json:"dir" binding:"required"` - } - cfg := systemService.NewSysConfig.FindByKeyDecryptValue("neData.exportTableFTP") - if cfg.ConfigId > 0 { - if err := json.Unmarshal([]byte(cfg.ConfigValue), &cfgData); err != nil { - logger.Errorf("putFTP unmarshal error: %v", err) - return - } - } - if !cfgData.Enable { - return - } - - connSSH := ssh.ConnSSH{ - User: cfgData.Username, - Password: cfgData.Password, - Addr: cfgData.ToIp, - Port: cfgData.ToPort, - AuthMode: "0", - } - sshClient, err := connSSH.NewClient() - if err != nil { - logger.Errorf("putFTP ssh error: %v", err) - return - } - defer sshClient.Close() - // 网元主机的SSH客户端进行文件传输 - sftpClient, err := sshClient.NewClientSFTP() - if err != nil { - logger.Errorf("putFTP sftp error: %v", err) - return - } - defer sftpClient.Close() - // 远程文件 - remotePath := filepath.Join(cfgData.Dir, "/backup", filepath.Base(localFilePath)) - // 复制到远程 - if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil { - logger.Errorf("putFTP uploading error: %v", err) - return - } -} diff --git a/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go b/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go new file mode 100644 index 00000000..7b3f2ea0 --- /dev/null +++ b/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go @@ -0,0 +1,273 @@ +package backup_export_udm + +import ( + "encoding/json" + "fmt" + "path/filepath" + "runtime" + "time" + + "be.ems/src/framework/cron" + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/file" + neDataModel "be.ems/src/modules/network_data/model" + neDataService "be.ems/src/modules/network_data/service" + neModel "be.ems/src/modules/network_element/model" + neService "be.ems/src/modules/network_element/service" +) + +var NewProcessor = &BackupExportUDMProcessor{ + count: 0, + neInfoService: neService.NewNeInfo, + backupService: neDataService.NewBackup, + udmAuthService: neDataService.NewUDMAuthUser, + udmSubService: neDataService.NewUDMSubUser, + udmVOIPService: neDataService.NewUDMVOIPUser, + udmVolteIMSService: neDataService.NewUDMVolteIMSUser, +} + +// BackupExportUDM 队列任务处理 +type BackupExportUDMProcessor struct { + count int // 执行次数 + neInfoService *neService.NeInfo // 网元信息服务 + backupService *neDataService.Backup // 备份相关服务 + udmAuthService *neDataService.UDMAuthUser // UDM鉴权信息服务 + udmSubService *neDataService.UDMSubUser // UDM签约信息服务 + udmVOIPService *neDataService.UDMVOIPUser // UDMVOIP信息服务 + udmVolteIMSService *neDataService.UDMVolteIMSUser // UDMVolteIMS信息服务 +} + +func (s *BackupExportUDMProcessor) Execute(data any) (any, error) { + s.count++ // 执行次数加一 + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + + var params struct { + DataType []string `json:"dataType"` // 类型支持 auth/sub/voip/volte + FileType string `json:"fileType"` // 文件类型 csv/txt + } + if err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms); err != nil { + return nil, err + } + if !(params.FileType == "csv" || params.FileType == "txt") { + return nil, fmt.Errorf("file type error, only support csv,txt") + } + + neList := s.neInfoService.Find(neModel.NeInfo{NeType: "UDM"}, false, false) + for _, neInfo := range neList { + for _, v := range params.DataType { + switch v { + case "auth": + result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportAuth(neInfo.NeId, params.FileType) + case "sub": + result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportSub(neInfo.NeId, params.FileType) + case "voip": + result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportVOIP(neInfo.NeId, params.FileType) + case "volte": + result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportVolte(neInfo.NeId, params.FileType) + } + } + } + + // 返回结果,用于记录执行结果 + return result, nil +} + +// exportAuth 导出鉴权用户数据 +func (s BackupExportUDMProcessor) exportAuth(neId, fileType string) string { + rows := s.udmAuthService.Find(neDataModel.UDMAuthUser{NeId: neId}) + if len(rows) <= 0 { + return "no data" + } + + // 文件名 + fileName := fmt.Sprintf("auth_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType) + filePath := filepath.Join("/usr/local/omc/backup/udm_data/auth", fileName) + if runtime.GOOS == "windows" { + filePath = fmt.Sprintf("C:%s", filePath) + } + + if fileType == "csv" { + // 转换数据 + data := [][]string{} + data = append(data, []string{"imsi", "ki", "algo", "amf", "opc"}) + for _, v := range rows { + opc := v.Opc + if opc == "-" { + opc = "" + } + data = append(data, []string{v.IMSI, v.Ki, v.AlgoIndex, v.Amf, opc}) + } + // 输出到文件 + if err := file.WriterFileCSV(data, filePath); err != nil { + return err.Error() + } + } + + if fileType == "txt" { + // 转换数据 + data := [][]string{} + for _, v := range rows { + opc := v.Opc + if opc == "-" { + opc = "" + } + data = append(data, []string{v.IMSI, v.Ki, v.AlgoIndex, v.Amf, opc}) + } + // 输出到文件 + if err := file.WriterFileTXT(data, ",", filePath); err != nil { + return err.Error() + } + } + + // 上传到FTP服务器 + if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil { + return "ok, ftp err:" + err.Error() + } + return "ok" +} + +// exportSub 导出签约用户数据 +func (s BackupExportUDMProcessor) exportSub(neId, fileType string) string { + rows := s.udmSubService.Find(neDataModel.UDMSubUser{NeId: neId}) + if len(rows) <= 0 { + return "no data" + } + + // 文件名 + fileName := fmt.Sprintf("sub_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType) + filePath := filepath.Join("/usr/local/omc/backup/udm_data/sub", fileName) + if runtime.GOOS == "windows" { + filePath = fmt.Sprintf("C:%s", filePath) + } + + if fileType == "csv" { + // 转换数据 + data := [][]string{} + data = append(data, []string{"IMSI", "MSISDN", "UeAmbrTpl", "NssaiTpl", "AreaForbiddenTpl", "ServiceAreaRestrictionTpl", "RatRestrictions", "CnTypeRestrictions", "SmfSel", "SmData", "EPSDat"}) + for _, v := range rows { + epsDat := fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s", v.EpsFlag, v.EpsOdb, v.HplmnOdb, v.Ard, v.Epstpl, v.ContextId, v.ApnContext, v.StaticIp) + data = append(data, []string{v.IMSI, v.MSISDN, v.UeAmbrTpl, v.NssaiTpl, v.AreaForbiddenTpl, v.ServiceAreaRestrictionTpl, v.RatRestrictions, v.CnTypeRestrictions, v.SmfSel, v.SmData, epsDat}) + } + // 输出到文件 + if err := file.WriterFileCSV(data, filePath); err != nil { + return err.Error() + } + } + + if fileType == "txt" { + // 转换数据 + data := [][]string{} + for _, v := range rows { + epsDat := fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s", v.EpsFlag, v.EpsOdb, v.HplmnOdb, v.Ard, v.Epstpl, v.ContextId, v.ApnContext, v.StaticIp) + data = append(data, []string{v.IMSI, v.MSISDN, v.UeAmbrTpl, v.NssaiTpl, v.AreaForbiddenTpl, v.ServiceAreaRestrictionTpl, v.RatRestrictions, v.CnTypeRestrictions, v.SmfSel, v.SmData, epsDat}) + } + // 输出到文件 + if err := file.WriterFileTXT(data, ",", filePath); err != nil { + return err.Error() + } + } + + // 上传到FTP服务器 + if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil { + return "ok, ftp err:" + err.Error() + } + return "ok" +} + +// exportVOIP 导出VOIP用户数据 +func (s BackupExportUDMProcessor) exportVOIP(neId, fileType string) string { + rows := s.udmVOIPService.Find(neDataModel.UDMVOIPUser{NeId: neId}) + if len(rows) <= 0 { + return "no data" + } + + // 文件名 + fileName := fmt.Sprintf("voip_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType) + filePath := filepath.Join("/usr/local/omc/backup/udm_data/voip", fileName) + if runtime.GOOS == "windows" { + filePath = fmt.Sprintf("C:%s", filePath) + } + + if fileType == "csv" { + // 转换数据 + data := [][]string{} + data = append(data, []string{"username", "password"}) + for _, v := range rows { + data = append(data, []string{v.UserName, v.Password}) + } + // 输出到文件 + if err := file.WriterFileCSV(data, filePath); err != nil { + return err.Error() + } + } + + if fileType == "txt" { + // 转换数据 + data := [][]string{} + for _, v := range rows { + data = append(data, []string{v.UserName, v.Password}) + } + // 输出到文件 + if err := file.WriterFileTXT(data, ",", filePath); err != nil { + return err.Error() + } + } + + // 上传到FTP服务器 + if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil { + return "ok, ftp err:" + err.Error() + } + return "ok" +} + +// exportVolte 导出Volte用户数据 +func (s BackupExportUDMProcessor) exportVolte(neId, fileType string) string { + rows := s.udmVolteIMSService.Find(neDataModel.UDMVolteIMSUser{NeId: neId}) + if len(rows) <= 0 { + return "no data" + } + + // 文件名 + fileName := fmt.Sprintf("volte_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType) + filePath := filepath.Join("/usr/local/omc/backup/udm_data/volte", fileName) + if runtime.GOOS == "windows" { + filePath = fmt.Sprintf("C:%s", filePath) + } + + if fileType == "csv" { + // 转换数据 + data := [][]string{} + data = append(data, []string{"IMSI", "MSISDN", "TAG", "VNI"}) + for _, v := range rows { + data = append(data, []string{v.IMSI, v.MSISDN, v.Tag, v.VNI}) + } + // 输出到文件 + if err := file.WriterFileCSV(data, filePath); err != nil { + return err.Error() + } + } + + if fileType == "txt" { + // 转换数据 + data := [][]string{} + for _, v := range rows { + data = append(data, []string{v.IMSI, v.MSISDN, v.Tag, v.VNI}) + } + // 输出到文件 + if err := file.WriterFileTXT(data, ",", filePath); err != nil { + return err.Error() + } + } + + // 上传到FTP服务器 + if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil { + return "ok, ftp err:" + err.Error() + } + return "ok" +} diff --git a/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go b/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go new file mode 100644 index 00000000..07055073 --- /dev/null +++ b/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go @@ -0,0 +1,113 @@ +package backup_remove_file + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "sort" + "strings" + "time" + + "be.ems/src/framework/cron" + "be.ems/src/framework/logger" +) + +type FileInfo struct { + Path string + Info os.FileInfo +} + +var NewProcessor = &BackupRemoveFileProcessor{ + count: 0, +} + +// BackupRemoveFileProcessor 删除备份文件 +type BackupRemoveFileProcessor struct { + count int // 执行次数 +} + +func (s *BackupRemoveFileProcessor) Execute(data any) (any, error) { + s.count++ // 执行次数加一 + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + + // 读取参数值 + var params []struct { + BackupPath string `json:"backupPath"` // 备份路径 /usr/local/omc/backup/{backupPath} + StoreDays int `json:"storeDays"` // 保留天数 + StoreNum int `json:"storeNum"` // 保留数量,默认保留7 + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params err: %v", err) + } + + for _, item := range params { + result[item.BackupPath] = "" + if item.StoreDays < 0 { + result[item.BackupPath] = "params storeDays less than 0" + continue + } + if item.StoreNum <= 0 { + item.StoreNum = 7 + } + + // 构建完整备份路径 + filePath := filepath.Join("/usr/local/omc/backup", item.BackupPath) + if runtime.GOOS == "windows" { + filePath = fmt.Sprintf("C:%s", filePath) + } + // 获取目录下所有备份文件 + files, err := s.files(filePath) + if err != nil { + result[item.BackupPath] = "read files err" + continue + } + // 按修改时间排序(从旧到新) + sort.Slice(files, func(i, j int) bool { + return files[i].Info.ModTime().Before(files[j].Info.ModTime()) + }) + + // 如果文件数量少于保留数量,则不删除 + if len(files) <= item.StoreNum { + result[item.BackupPath] = fmt.Sprintf("less StoreNum: %d, file number %d", item.StoreNum, len(files)) + continue + } + // 计算截止日期 + cutoff := time.Now().AddDate(0, 0, -item.StoreDays) + // 删除超过保留天数的文件 + deletedErr := []string{} + for _, file := range files { + if file.Info.ModTime().Before(cutoff) { + if err := os.Remove(file.Path); err != nil { + deletedErr = append(deletedErr, file.Info.Name()) // 记录删除失败的文件名称 + } + } + } + result[item.BackupPath] = strings.Join(deletedErr, ", ") + } + + // 返回结果,用于记录执行结果 + return result, nil +} + +func (s *BackupRemoveFileProcessor) files(dir string) ([]FileInfo, error) { + var files []FileInfo + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + files = append(files, FileInfo{Path: path, Info: info}) + } + return nil + }) + return files, err +} diff --git a/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go index 0de94e17..982b7323 100644 --- a/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go +++ b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go @@ -39,7 +39,8 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { // 读取参数值 var params struct { - StoreDays int `json:"storeDays"` // store days + StoreDays int `json:"storeDays"` // 保留天数 + StoreNum int `json:"storeNum"` // 保留数量,默认保留7 } err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) if err != nil { @@ -48,6 +49,9 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { if params.StoreDays < 0 { return nil, fmt.Errorf("params storeDays less than 0 ") } + if params.StoreNum <= 0 { + params.StoreNum = 7 + } neList := s.neInfoService.Find(neModel.NeInfo{}, false, false) for _, neInfo := range neList { @@ -55,6 +59,17 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { tx := db.DB("").Model(&neModel.NeConfigBackup{}) tx = tx.Where("ne_type = ? and ne_id = ?", neInfo.NeType, neInfo.NeId) + // 查询数量为0直接返回 + var total int64 = 0 + if err := tx.Count(&total).Error; err != nil { + result[neTypeAndId] = err.Error() + continue + } + if total <= int64(params.StoreNum) { + result[neTypeAndId] = "less than storeNum" + continue + } + // 查询最后记录数据 var lastCreateTime int64 = 0 lastTx := tx.Select("create_time").Order("create_time DESC").Limit(1) @@ -62,7 +77,6 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { result[neTypeAndId] = err.Error() continue } - if lastCreateTime <= 1e12 { result[neTypeAndId] = "no data" continue @@ -90,7 +104,7 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { // deleteFile 删除本地文件 func (s DeleteNeConfigBackupProcessor) deleteFile(neType, neId string, oldFileDate time.Time) { neTypeLower := strings.ToLower(neType) - localPath := fmt.Sprintf("/usr/local/omc/backup/ne_config/%s/%s/backup ", neTypeLower, neId) + localPath := fmt.Sprintf("/usr/local/omc/backup/ne_config/%s/%s ", neTypeLower, neId) files, err := os.ReadDir(localPath) if err != nil { logger.Errorf("logger Remove ne_config File ReadDir err: %v", err.Error()) diff --git a/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go b/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go deleted file mode 100644 index 50ac3e2f..00000000 --- a/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go +++ /dev/null @@ -1,160 +0,0 @@ -package getStateFromNE - -import ( - "encoding/json" - "fmt" - "net/http" - "strings" - "time" - - "be.ems/lib/config" - "be.ems/lib/dborm" - "be.ems/lib/log" - "be.ems/src/framework/cron" - "github.com/go-resty/resty/v2" -) - -var NewProcessor = &BarProcessor{ - progress: 0, - count: 0, -} - -// bar 队列任务处理 -type BarProcessor struct { - // 任务进度 - progress int - // 执行次数 - count int -} - -type BarParams struct { - Duration int `json:"duration"` -} - -type CpuUsage struct { - NfCpuUsage uint16 `json:"nfCpuUsage"` - SysCpuUsage uint16 `json:"sysCpuUsage"` -} - -type MemUsage struct { - TotalMem uint32 `json:"totalMem"` - NfUsedMem uint32 `json:"nfUsedMem"` - SysMemUsage uint16 `json:"sysMemUsage"` -} - -type PartitionInfo struct { - Total uint32 `json:"total"` // MB - Used uint32 `json:"used"` // MB -} - -type DiskSpace struct { - PartitionNum uint8 `json:"partitionNum"` - - PartitionInfo []PartitionInfo `json:"partitionInfo"` -} - -type SystemState struct { - Version string `json:"version"` - Capability uint32 `json:"capability"` - SerialNum string `json:"serialNum"` - ExpiryDate string `json:"expiryDate"` - //Timestamp string `json:"timestamp"` - - CpuUsage CpuUsage `json:"cpuUsage"` - MemUsage MemUsage `json:"memUsage"` - - DiskSpace DiskSpace `json:"diskSpace"` -} - -var client = resty.New() - -func init() { - client. - SetTimeout(time.Duration(400 * time.Millisecond)) -} - -func (s *BarProcessor) Execute(data any) (any, error) { - var err error - - s.count++ - options := data.(cron.JobData) - sysJob := options.SysJob - var params BarParams - - _ = json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) - // if err == nil { - // duration = params.Duration - // } - - var nes []dborm.NeInfo - _, err = dborm.XormGetAllNeInfo(&nes) - if err != nil { - log.Error("Failed to get all ne info:", err) - return nil, err - } - - failNum := 0 - succNum := 0 - for _, ne := range nes { - requestURI := fmt.Sprintf("/api/rest/systemManagement/v1/elementType/%s/objectType/systemState", strings.ToLower(ne.NeType)) - requestURL := fmt.Sprintf("http://%s:%s%s", ne.Ip, ne.Port, requestURI) - log.Debug("requestURL: Get", requestURL) - response, err := client.R(). - EnableTrace(). - SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). - SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). - Get(requestURL) - if err != nil { - log.Error("Failed to Get:", err) - failNum++ - continue - } - - log.Debug("StatusCode: ", response.StatusCode()) - switch response.StatusCode() { - case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: - log.Trace("response body:", string(response.Body())) - state := new(SystemState) - _ = json.Unmarshal(response.Body(), &state) - // var dateStr *string = nil - // if state.ExpiryDate != "" && state.ExpiryDate != "-" { - // dateStr = &state.ExpiryDate - // } - neState := new(dborm.NeState) - neState.NeType = ne.NeType - neState.NeId = ne.NeId - neState.Version = state.Version - neState.Capability = state.Capability - neState.SerialNum = state.SerialNum - // if dateStr != nil { - // neState.ExpiryDate = *dateStr - // } - neState.ExpiryDate = state.ExpiryDate - cu, _ := json.Marshal(state.CpuUsage) - neState.CpuUsage = string(cu) - mu, _ := json.Marshal(state.MemUsage) - neState.MemUsage = string(mu) - ds, _ := json.Marshal(state.DiskSpace) - neState.DiskSpace = string(ds) - log.Trace("neState:", neState) - _, err := dborm.XormInsertNeState(neState) - if err != nil { - log.Error("Failed to insert ne_state:", err) - failNum++ - continue - } - succNum++ - default: - log.Trace("response body:", string(response.Body())) - body := new(map[string]interface{}) - _ = json.Unmarshal(response.Body(), &body) - failNum++ - } - } - - // 返回结果,用于记录执行结果 - return map[string]any{ - "succNum": succNum, - "failNum": failNum, - }, nil -} diff --git a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go index 84cb9b1c..215c5aac 100644 --- a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go +++ b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go @@ -1,22 +1,20 @@ package ne_config_backup import ( - "encoding/json" "fmt" "path/filepath" "be.ems/src/framework/cron" "be.ems/src/framework/logger" - "be.ems/src/framework/ssh" + neDataService "be.ems/src/modules/network_data/service" neModel "be.ems/src/modules/network_element/model" neService "be.ems/src/modules/network_element/service" - systemService "be.ems/src/modules/system/service" ) var NewProcessor = &NeConfigBackupProcessor{ neConfigBackupService: neService.NewNeConfigBackup, neInfoService: neService.NewNeInfo, - sysConfigService: systemService.NewSysConfig, + backupService: neDataService.NewBackup, count: 0, } @@ -24,7 +22,7 @@ var NewProcessor = &NeConfigBackupProcessor{ type NeConfigBackupProcessor struct { neConfigBackupService *neService.NeConfigBackup // 网元配置文件备份记录服务 neInfoService *neService.NeInfo // 网元信息服务 - sysConfigService *systemService.SysConfig // 参数配置服务 + backupService *neDataService.Backup // 备份相关服务 count int // 执行次数 } @@ -47,6 +45,7 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) { result[neTypeAndId] = err.Error() continue } + // 新增备份记录 item := neModel.NeConfigBackup{ NeType: neInfo.NeType, @@ -60,60 +59,14 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) { result[neTypeAndId] = "failed" continue } - result[neTypeAndId] = "success" - s.putFTP(zipFilePath) // 上传到FTP服务器 + + msg := "ok" + // 上传到FTP服务器 + if err := s.backupService.FTPPushFile(zipFilePath, "ne_config"); err != nil { + result[neTypeAndId] = msg + ", ftp err:" + err.Error() + } + result[neTypeAndId] = msg } return result, nil } - -// putFTP 提交到服务器ssh -func (s NeConfigBackupProcessor) putFTP(localFilePath string) { - // 获取配置 - var cfgData struct { - Password string `json:"password" ` - Username string `json:"username" binding:"required"` - ToIp string `json:"toIp" binding:"required"` - ToPort int64 `json:"toPort" binding:"required"` - Enable bool `json:"enable"` - Dir string `json:"dir" binding:"required"` - } - cfg := systemService.NewSysConfig.FindByKeyDecryptValue("neData.exportTableFTP") - if cfg.ConfigId > 0 { - if err := json.Unmarshal([]byte(cfg.ConfigValue), &cfgData); err != nil { - logger.Errorf("putFTP unmarshal error: %v", err) - return - } - } - if !cfgData.Enable { - return - } - - connSSH := ssh.ConnSSH{ - User: cfgData.Username, - Password: cfgData.Password, - Addr: cfgData.ToIp, - Port: cfgData.ToPort, - AuthMode: "0", - } - sshClient, err := connSSH.NewClient() - if err != nil { - logger.Errorf("putFTP ssh error: %v", err) - return - } - defer sshClient.Close() - // 网元主机的SSH客户端进行文件传输 - sftpClient, err := sshClient.NewClientSFTP() - if err != nil { - logger.Errorf("putFTP sftp error: %v", err) - return - } - defer sftpClient.Close() - // 远程文件 - remotePath := filepath.Join(cfgData.Dir, "/ne_config", filepath.Base(localFilePath)) - // 复制到远程 - if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil { - logger.Errorf("putFTP uploading error: %v", err) - return - } -} diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go index 29aa5a30..bc2e4f78 100644 --- a/src/modules/crontask/processor/processor.go +++ b/src/modules/crontask/processor/processor.go @@ -2,16 +2,17 @@ package processor import ( "be.ems/src/framework/cron" + processorBackupExportTable "be.ems/src/modules/crontask/processor/backup_export_table" + processorBackupExportUDM "be.ems/src/modules/crontask/processor/backup_export_udm" + processorBackupRemoveFile "be.ems/src/modules/crontask/processor/backup_remove_file" processorDeleteAlarmRecord "be.ems/src/modules/crontask/processor/delete_alarm_record" processorDeleteDataRecord "be.ems/src/modules/crontask/processor/delete_data_record" processorDeleteKPIRecord "be.ems/src/modules/crontask/processor/delete_kpi_record" processorDeleteNeConfigBackup "be.ems/src/modules/crontask/processor/delete_ne_config_backup" - "be.ems/src/modules/crontask/processor/exportTable" processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource" processorNeAlarmStateCheck "be.ems/src/modules/crontask/processor/ne_alarm_state_check" processorNeConfigBackup "be.ems/src/modules/crontask/processor/ne_config_backup" processorNeDataUDM "be.ems/src/modules/crontask/processor/ne_data_udm" - "be.ems/src/modules/crontask/processor/removeFile" ) // InitCronQueue 初始定时任务队列 @@ -20,7 +21,7 @@ func InitCronQueue() { cron.CreateQueue("monitor_sys_resource", processorMonitorSysResource.NewProcessor) // 网元-网元配置文件定期备份 cron.CreateQueue("ne_config_backup", processorNeConfigBackup.NewProcessor) - // 网元数据-UDM数据刷新同步 + // 网元数据-UDM用户数据同步 cron.CreateQueue("ne_data_udm", processorNeDataUDM.NewProcessor) // 网元告警-状态检查 cron.CreateQueue("ne_alarm_state_check", processorNeAlarmStateCheck.NewProcessor) @@ -34,6 +35,10 @@ func InitCronQueue() { // 删除-网元配置文件定期备份 cron.CreateQueue("delete_ne_config_backup", processorDeleteNeConfigBackup.NewProcessor) - cron.CreateQueue("exportTable", exportTable.NewProcessor) - cron.CreateQueue("removeFile", removeFile.NewProcessor) + // 备份-导出数据表 + cron.CreateQueue("backup_export_table", processorBackupExportTable.NewProcessor) + // 备份-删除备份目录下文件 + cron.CreateQueue("backup_remove_file", processorBackupRemoveFile.NewProcessor) + // 备份-导出UDM用户数据 + cron.CreateQueue("backup_export_udm", processorBackupExportUDM.NewProcessor) } diff --git a/src/modules/crontask/processor/removeFile/removeFile.go b/src/modules/crontask/processor/removeFile/removeFile.go deleted file mode 100644 index 5420b40b..00000000 --- a/src/modules/crontask/processor/removeFile/removeFile.go +++ /dev/null @@ -1,159 +0,0 @@ -package removeFile - -import ( - "encoding/json" - "os" - "path/filepath" - "sort" - "time" - - "be.ems/lib/log" - "be.ems/src/framework/cron" -) - -var NewProcessor = &BarProcessor{ - progress: 0, - count: 0, -} - -// bar 队列任务处理 -type BarProcessor struct { - // 任务进度 - progress int - // 执行次数 - count int -} - -type BarParams struct { - FilePath string `json:"filePath"` // file path - MaxDays int `json:"maxDays"` - MaxFiles *int `json:"maxFiles"` // keep max files - MaxSize *int64 `json:"maxSize"` - Extras string `json:"extras"` // extras condition for where -} - -type FileInfo struct { - Path string - Info os.FileInfo -} - -func (s *BarProcessor) Execute(data any) (any, error) { - s.count++ - - options := data.(cron.JobData) - sysJob := options.SysJob - var params []BarParams - - err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) - if err != nil { - return nil, err - } - result := []map[string]any{} - for _, param := range params { - res, _ := s.ExecuteOne(param) - result = append(result, res) - } - - // 返回结果,用于记录执行结果 - return map[string]any{ - "result": result, - }, nil -} - -func (s *BarProcessor) ExecuteOne(params BarParams) (map[string]any, error) { - var maxFiles int = 0 - var maxSize int64 = 0 - if params.MaxFiles != nil { - maxFiles = *params.MaxFiles - } - if params.MaxSize != nil { - maxSize = int64(*params.MaxSize * 1024 * 1024) - } - files, err := getFiles(params.FilePath) - if err != nil { - return map[string]any{ - "msg": "failed", - "err": err.Error(), - }, err - } - - // 获取本地时区 - loc, err := time.LoadLocation("Local") - if err != nil { - return map[string]any{ - "msg": "failed", - "err": err.Error(), - }, err - } - cutoff := time.Now().In(loc).AddDate(0, 0, -params.MaxDays) - - var oldFiles []FileInfo - for _, file := range files { - if file.Info.ModTime().Before(cutoff) { - oldFiles = append(oldFiles, file) - } - } - - // 按修改时间排序文件(最旧的在前) - sort.Slice(oldFiles, func(i, j int) bool { - return oldFiles[i].Info.ModTime().Before(oldFiles[j].Info.ModTime()) - }) - - deleted, errorDel := 0, 0 - - // 删除文件,直到满足文件总数不超过maxFiles个且总大小不超过maxSize的条件 - var totalSize int64 - for i, file := range oldFiles { - if (maxFiles > 0 && i >= maxFiles) || (maxSize > 0 && totalSize+file.Info.Size() > maxSize) { - break - } - err := os.Remove(file.Path) - if err != nil { - log.Error("Error deleting file:", file.Path, err) - errorDel++ - continue - } - totalSize += file.Info.Size() - deleted++ - } - - // 如果仍然有超过maxFiles个文件或总大小超过maxSize,继续删除最旧的文件 - remainingFiles := files - sort.Slice(remainingFiles, func(i, j int) bool { - return remainingFiles[i].Info.ModTime().Before(remainingFiles[j].Info.ModTime()) - }) - - for (maxFiles > 0 && len(remainingFiles) > maxFiles) || (maxSize > 0 && totalSize > maxSize) { - file := remainingFiles[0] - err := os.Remove(file.Path) - if err != nil { - log.Error("Error deleting file:", file.Path, err) - remainingFiles = remainingFiles[1:] - continue - } - totalSize -= file.Info.Size() - remainingFiles = remainingFiles[1:] - } - - // 返回结果,用于记录执行结果 - return map[string]any{ - "msg": "successed", - "filePath": params.FilePath, - "deleted": deleted, - "errorDel": errorDel, - }, nil -} - -func getFiles(dir string) ([]FileInfo, error) { - var files []FileInfo - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.IsDir() { - files = append(files, FileInfo{Path: path, Info: info}) - } - return nil - }) - return files, err -} diff --git a/src/modules/network_element/service/ne_config_backup.go b/src/modules/network_element/service/ne_config_backup.go index 4937862d..784fac87 100644 --- a/src/modules/network_element/service/ne_config_backup.go +++ b/src/modules/network_element/service/ne_config_backup.go @@ -113,8 +113,6 @@ func (s NeConfigBackup) FileLocalToNe(neInfo model.NeInfo, localFile string) err sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/rtproxy && sudo cp -rf %s/rtproxy/* /usr/local/etc/rtproxy && sudo chmod 755 /usr/local/etc/rtproxy/rtproxy.conf", neDirTemp)) // iwf目录 sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/iwf && sudo cp -rf %s/iwf/* /usr/local/etc/iwf && sudo chmod 755 /usr/local/etc/iwf/*.yaml", neDirTemp)) - } else if neTypeLower == "omc" { - sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/omc/etc && sudo cp -rf %s/* /usr/local/omc/etc && sudo chmod 755 /usr/local/omc/etc/*.{yaml,conf}", neDirTemp)) } else if neTypeLower == "smsc" { chmodFile := "sudo chmod 755 /usr/local/etc/smsc/{*sys.conf,*conf.txt,conf/is41_operation.conf}" sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/smsc/conf && sudo cp -rf %s/* /usr/local/etc/smsc && %s", neDirTemp, chmodFile))