ref: 添加定时导出UDM数据备份任务,优化CDR导出删除任务

This commit is contained in:
TsMask
2025-04-25 15:57:37 +08:00
parent 357289c491
commit 151271fdd6
9 changed files with 454 additions and 459 deletions

View File

@@ -1,9 +1,10 @@
package exportTable
package backup_export_table
import (
"encoding/json"
"fmt"
"path/filepath"
"runtime"
"strings"
"time"
@@ -11,25 +12,27 @@ import (
"be.ems/src/framework/database/db"
"be.ems/src/framework/i18n"
"be.ems/src/framework/logger"
"be.ems/src/framework/ssh"
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/file"
"be.ems/src/framework/utils/parse"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
systemModel "be.ems/src/modules/system/model"
systemService "be.ems/src/modules/system/service"
)
var NewProcessor = &BarProcessor{
count: 0,
var NewProcessor = &BackupExportTableProcessor{
backupService: neDataService.NewBackup,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
count int // 执行次数
// BackupExportTable 备份导出数据表
type BackupExportTableProcessor struct {
backupService *neDataService.Backup // 备份相关服务
count int // 执行次数
}
func (s *BarProcessor) Execute(data any) (any, error) {
func (s *BackupExportTableProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
@@ -40,10 +43,10 @@ func (s *BarProcessor) Execute(data any) (any, error) {
}
var params struct {
Hour int `json:"hour"` // hour
TableName string `json:"tableName"`
Columns []string `json:"columns"`
FilePath string `json:"filePath"` // file path
Hour int `json:"hour"` // 数据时间从任务执行时间前的小时数
TableName string `json:"tableName"` // 数据表名
Columns []string `json:"columns"` // 支持字段
BackupPath string `json:"backupPath"` // 备份输出路径 /usr/local/omc/backup/{backupPath}
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
@@ -52,7 +55,11 @@ func (s *BarProcessor) Execute(data any) (any, error) {
var affected int64
var errMsg error
filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, strings.ToLower(params.TableName), time.Now().Format("20060102150405"))
fileName := fmt.Sprintf("%s_export_%s.csv", strings.ToLower(params.TableName), time.Now().Format("20060102150405"))
filePath := filepath.Join("/usr/local/omc/backup", params.BackupPath, fileName)
if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath)
}
switch params.TableName {
case "sys_log_operate":
affected, errMsg = s.exportSysLogOperate(params.Hour, params.Columns, filePath)
@@ -69,18 +76,20 @@ func (s *BarProcessor) Execute(data any) (any, error) {
return nil, errMsg
}
// put ftp
// 上传到FTP服务器
if affected > 0 {
result["affected"] = affected
s.putFTP(filePath)
if err := s.backupService.FTPPushFile(filePath, params.BackupPath); err != nil {
result["ftpErr"] = err.Error()
}
}
result["affected"] = affected
// 返回结果,用于记录执行结果
return result, nil
}
// exportSysLogOperate 导出csv
func (s BarProcessor) exportSysLogOperate(hour int, columns []string, filePath string) (int64, error) {
func (s BackupExportTableProcessor) exportSysLogOperate(hour int, columns []string, filePath string) (int64, error) {
// 前 hour 小时
now := time.Now()
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
@@ -181,7 +190,7 @@ func (s BarProcessor) exportSysLogOperate(hour int, columns []string, filePath s
}
// exportSMF 导出csv
func (s BarProcessor) exportSMF(hour int, columns []string, filePath string) (int64, error) {
func (s BackupExportTableProcessor) exportSMF(hour int, columns []string, filePath string) (int64, error) {
// 前 hour 小时
now := time.Now()
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
@@ -412,7 +421,7 @@ func (s BarProcessor) exportSMF(hour int, columns []string, filePath string) (in
}
// exportIMS 导出csv
func (s BarProcessor) exportIMS(hour int, columns []string, filePath string) (int64, error) {
func (s BackupExportTableProcessor) exportIMS(hour int, columns []string, filePath string) (int64, error) {
// 前 hour 小时
now := time.Now()
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
@@ -552,7 +561,7 @@ func (s BarProcessor) exportIMS(hour int, columns []string, filePath string) (in
}
// exportSMSC 导出csv
func (s BarProcessor) exportSMSC(hour int, columns []string, filePath string) (int64, error) {
func (s BackupExportTableProcessor) exportSMSC(hour int, columns []string, filePath string) (int64, error) {
// 前 hour 小时
now := time.Now()
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
@@ -677,7 +686,7 @@ func (s BarProcessor) exportSMSC(hour int, columns []string, filePath string) (i
}
// exportSGWC 导出csv
func (s BarProcessor) exportSGWC(hour int, columns []string, filePath string) (int64, error) {
func (s BackupExportTableProcessor) exportSGWC(hour int, columns []string, filePath string) (int64, error) {
// 前 hour 小时
now := time.Now()
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
@@ -912,54 +921,3 @@ func (s BarProcessor) exportSGWC(hour int, columns []string, filePath string) (i
return tx.RowsAffected, err
}
// putFTP 提交到服务器ssh
func (s BarProcessor) putFTP(localFilePath string) {
// 获取配置
var cfgData struct {
Password string `json:"password" `
Username string `json:"username" binding:"required"`
ToIp string `json:"toIp" binding:"required"`
ToPort int64 `json:"toPort" binding:"required"`
Enable bool `json:"enable"`
Dir string `json:"dir" binding:"required"`
}
cfg := systemService.NewSysConfig.FindByKeyDecryptValue("neData.exportTableFTP")
if cfg.ConfigId > 0 {
if err := json.Unmarshal([]byte(cfg.ConfigValue), &cfgData); err != nil {
logger.Errorf("putFTP unmarshal error: %v", err)
return
}
}
if !cfgData.Enable {
return
}
connSSH := ssh.ConnSSH{
User: cfgData.Username,
Password: cfgData.Password,
Addr: cfgData.ToIp,
Port: cfgData.ToPort,
AuthMode: "0",
}
sshClient, err := connSSH.NewClient()
if err != nil {
logger.Errorf("putFTP ssh error: %v", err)
return
}
defer sshClient.Close()
// 网元主机的SSH客户端进行文件传输
sftpClient, err := sshClient.NewClientSFTP()
if err != nil {
logger.Errorf("putFTP sftp error: %v", err)
return
}
defer sftpClient.Close()
// 远程文件
remotePath := filepath.Join(cfgData.Dir, "/backup", filepath.Base(localFilePath))
// 复制到远程
if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil {
logger.Errorf("putFTP uploading error: %v", err)
return
}
}

View File

@@ -0,0 +1,273 @@
package backup_export_udm
import (
"encoding/json"
"fmt"
"path/filepath"
"runtime"
"time"
"be.ems/src/framework/cron"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/file"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neModel "be.ems/src/modules/network_element/model"
neService "be.ems/src/modules/network_element/service"
)
var NewProcessor = &BackupExportUDMProcessor{
count: 0,
neInfoService: neService.NewNeInfo,
backupService: neDataService.NewBackup,
udmAuthService: neDataService.NewUDMAuthUser,
udmSubService: neDataService.NewUDMSubUser,
udmVOIPService: neDataService.NewUDMVOIPUser,
udmVolteIMSService: neDataService.NewUDMVolteIMSUser,
}
// BackupExportUDM 队列任务处理
type BackupExportUDMProcessor struct {
count int // 执行次数
neInfoService *neService.NeInfo // 网元信息服务
backupService *neDataService.Backup // 备份相关服务
udmAuthService *neDataService.UDMAuthUser // UDM鉴权信息服务
udmSubService *neDataService.UDMSubUser // UDM签约信息服务
udmVOIPService *neDataService.UDMVOIPUser // UDMVOIP信息服务
udmVolteIMSService *neDataService.UDMVolteIMSUser // UDMVolteIMS信息服务
}
func (s *BackupExportUDMProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
var params struct {
DataType []string `json:"dataType"` // 类型支持 auth/sub/voip/volte
FileType string `json:"fileType"` // 文件类型 csv/txt
}
if err := json.Unmarshal([]byte(sysJob.TargetParams), &params); err != nil {
return nil, err
}
if !(params.FileType == "csv" || params.FileType == "txt") {
return nil, fmt.Errorf("file type error, only support csv,txt")
}
neList := s.neInfoService.Find(neModel.NeInfo{NeType: "UDM"}, false, false)
for _, neInfo := range neList {
for _, v := range params.DataType {
switch v {
case "auth":
result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportAuth(neInfo.NeId, params.FileType)
case "sub":
result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportSub(neInfo.NeId, params.FileType)
case "voip":
result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportVOIP(neInfo.NeId, params.FileType)
case "volte":
result[fmt.Sprintf("%s-%s", neInfo.NeId, v)] = s.exportVolte(neInfo.NeId, params.FileType)
}
}
}
// 返回结果,用于记录执行结果
return result, nil
}
// exportAuth 导出鉴权用户数据
func (s BackupExportUDMProcessor) exportAuth(neId, fileType string) string {
rows := s.udmAuthService.Find(neDataModel.UDMAuthUser{NeId: neId})
if len(rows) <= 0 {
return "no data"
}
// 文件名
fileName := fmt.Sprintf("auth_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType)
filePath := filepath.Join("/usr/local/omc/backup/udm_data/auth", fileName)
if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath)
}
if fileType == "csv" {
// 转换数据
data := [][]string{}
data = append(data, []string{"imsi", "ki", "algo", "amf", "opc"})
for _, v := range rows {
opc := v.Opc
if opc == "-" {
opc = ""
}
data = append(data, []string{v.IMSI, v.Ki, v.AlgoIndex, v.Amf, opc})
}
// 输出到文件
if err := file.WriterFileCSV(data, filePath); err != nil {
return err.Error()
}
}
if fileType == "txt" {
// 转换数据
data := [][]string{}
for _, v := range rows {
opc := v.Opc
if opc == "-" {
opc = ""
}
data = append(data, []string{v.IMSI, v.Ki, v.AlgoIndex, v.Amf, opc})
}
// 输出到文件
if err := file.WriterFileTXT(data, ",", filePath); err != nil {
return err.Error()
}
}
// 上传到FTP服务器
if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil {
return "ok, ftp err:" + err.Error()
}
return "ok"
}
// exportSub 导出签约用户数据
func (s BackupExportUDMProcessor) exportSub(neId, fileType string) string {
rows := s.udmSubService.Find(neDataModel.UDMSubUser{NeId: neId})
if len(rows) <= 0 {
return "no data"
}
// 文件名
fileName := fmt.Sprintf("sub_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType)
filePath := filepath.Join("/usr/local/omc/backup/udm_data/sub", fileName)
if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath)
}
if fileType == "csv" {
// 转换数据
data := [][]string{}
data = append(data, []string{"IMSI", "MSISDN", "UeAmbrTpl", "NssaiTpl", "AreaForbiddenTpl", "ServiceAreaRestrictionTpl", "RatRestrictions", "CnTypeRestrictions", "SmfSel", "SmData", "EPSDat"})
for _, v := range rows {
epsDat := fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s", v.EpsFlag, v.EpsOdb, v.HplmnOdb, v.Ard, v.Epstpl, v.ContextId, v.ApnContext, v.StaticIp)
data = append(data, []string{v.IMSI, v.MSISDN, v.UeAmbrTpl, v.NssaiTpl, v.AreaForbiddenTpl, v.ServiceAreaRestrictionTpl, v.RatRestrictions, v.CnTypeRestrictions, v.SmfSel, v.SmData, epsDat})
}
// 输出到文件
if err := file.WriterFileCSV(data, filePath); err != nil {
return err.Error()
}
}
if fileType == "txt" {
// 转换数据
data := [][]string{}
for _, v := range rows {
epsDat := fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s", v.EpsFlag, v.EpsOdb, v.HplmnOdb, v.Ard, v.Epstpl, v.ContextId, v.ApnContext, v.StaticIp)
data = append(data, []string{v.IMSI, v.MSISDN, v.UeAmbrTpl, v.NssaiTpl, v.AreaForbiddenTpl, v.ServiceAreaRestrictionTpl, v.RatRestrictions, v.CnTypeRestrictions, v.SmfSel, v.SmData, epsDat})
}
// 输出到文件
if err := file.WriterFileTXT(data, ",", filePath); err != nil {
return err.Error()
}
}
// 上传到FTP服务器
if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil {
return "ok, ftp err:" + err.Error()
}
return "ok"
}
// exportVOIP 导出VOIP用户数据
func (s BackupExportUDMProcessor) exportVOIP(neId, fileType string) string {
rows := s.udmVOIPService.Find(neDataModel.UDMVOIPUser{NeId: neId})
if len(rows) <= 0 {
return "no data"
}
// 文件名
fileName := fmt.Sprintf("voip_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType)
filePath := filepath.Join("/usr/local/omc/backup/udm_data/voip", fileName)
if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath)
}
if fileType == "csv" {
// 转换数据
data := [][]string{}
data = append(data, []string{"username", "password"})
for _, v := range rows {
data = append(data, []string{v.UserName, v.Password})
}
// 输出到文件
if err := file.WriterFileCSV(data, filePath); err != nil {
return err.Error()
}
}
if fileType == "txt" {
// 转换数据
data := [][]string{}
for _, v := range rows {
data = append(data, []string{v.UserName, v.Password})
}
// 输出到文件
if err := file.WriterFileTXT(data, ",", filePath); err != nil {
return err.Error()
}
}
// 上传到FTP服务器
if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil {
return "ok, ftp err:" + err.Error()
}
return "ok"
}
// exportVolte 导出Volte用户数据
func (s BackupExportUDMProcessor) exportVolte(neId, fileType string) string {
rows := s.udmVolteIMSService.Find(neDataModel.UDMVolteIMSUser{NeId: neId})
if len(rows) <= 0 {
return "no data"
}
// 文件名
fileName := fmt.Sprintf("volte_%s_export_%s.%s", neId, time.Now().Format("20060102150405"), fileType)
filePath := filepath.Join("/usr/local/omc/backup/udm_data/volte", fileName)
if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath)
}
if fileType == "csv" {
// 转换数据
data := [][]string{}
data = append(data, []string{"IMSI", "MSISDN", "TAG", "VNI"})
for _, v := range rows {
data = append(data, []string{v.IMSI, v.MSISDN, v.Tag, v.VNI})
}
// 输出到文件
if err := file.WriterFileCSV(data, filePath); err != nil {
return err.Error()
}
}
if fileType == "txt" {
// 转换数据
data := [][]string{}
for _, v := range rows {
data = append(data, []string{v.IMSI, v.MSISDN, v.Tag, v.VNI})
}
// 输出到文件
if err := file.WriterFileTXT(data, ",", filePath); err != nil {
return err.Error()
}
}
// 上传到FTP服务器
if err := s.backupService.FTPPushFile(filePath, "udm_data"); err != nil {
return "ok, ftp err:" + err.Error()
}
return "ok"
}

View File

@@ -0,0 +1,113 @@
package backup_remove_file
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"time"
"be.ems/src/framework/cron"
"be.ems/src/framework/logger"
)
type FileInfo struct {
Path string
Info os.FileInfo
}
var NewProcessor = &BackupRemoveFileProcessor{
count: 0,
}
// BackupRemoveFileProcessor 删除备份文件
type BackupRemoveFileProcessor struct {
count int // 执行次数
}
func (s *BackupRemoveFileProcessor) Execute(data any) (any, error) {
s.count++ // 执行次数加一
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
// 返回结果,用于记录执行结果
result := map[string]any{
"count": s.count,
}
// 读取参数值
var params []struct {
BackupPath string `json:"backupPath"` // 备份路径 /usr/local/omc/backup/{backupPath}
StoreDays int `json:"storeDays"` // 保留天数
StoreNum int `json:"storeNum"` // 保留数量默认保留7
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params err: %v", err)
}
for _, item := range params {
result[item.BackupPath] = ""
if item.StoreDays < 0 {
result[item.BackupPath] = "params storeDays less than 0"
continue
}
if item.StoreNum <= 0 {
item.StoreNum = 7
}
// 构建完整备份路径
filePath := filepath.Join("/usr/local/omc/backup", item.BackupPath)
if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath)
}
// 获取目录下所有备份文件
files, err := s.files(filePath)
if err != nil {
result[item.BackupPath] = "read files err"
continue
}
// 按修改时间排序(从旧到新)
sort.Slice(files, func(i, j int) bool {
return files[i].Info.ModTime().Before(files[j].Info.ModTime())
})
// 如果文件数量少于保留数量,则不删除
if len(files) <= item.StoreNum {
result[item.BackupPath] = fmt.Sprintf("less StoreNum: %d, file number %d", item.StoreNum, len(files))
continue
}
// 计算截止日期
cutoff := time.Now().AddDate(0, 0, -item.StoreDays)
// 删除超过保留天数的文件
deletedErr := []string{}
for _, file := range files {
if file.Info.ModTime().Before(cutoff) {
if err := os.Remove(file.Path); err != nil {
deletedErr = append(deletedErr, file.Info.Name()) // 记录删除失败的文件名称
}
}
}
result[item.BackupPath] = strings.Join(deletedErr, ", ")
}
// 返回结果,用于记录执行结果
return result, nil
}
func (s *BackupRemoveFileProcessor) files(dir string) ([]FileInfo, error) {
var files []FileInfo
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
files = append(files, FileInfo{Path: path, Info: info})
}
return nil
})
return files, err
}

View File

@@ -39,7 +39,8 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) {
// 读取参数值
var params struct {
StoreDays int `json:"storeDays"` // store days
StoreDays int `json:"storeDays"` // 保留天数
StoreNum int `json:"storeNum"` // 保留数量默认保留7
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
@@ -48,6 +49,9 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) {
if params.StoreDays < 0 {
return nil, fmt.Errorf("params storeDays less than 0 ")
}
if params.StoreNum <= 0 {
params.StoreNum = 7
}
neList := s.neInfoService.Find(neModel.NeInfo{}, false, false)
for _, neInfo := range neList {
@@ -55,6 +59,17 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) {
tx := db.DB("").Model(&neModel.NeConfigBackup{})
tx = tx.Where("ne_type = ? and ne_id = ?", neInfo.NeType, neInfo.NeId)
// 查询数量为0直接返回
var total int64 = 0
if err := tx.Count(&total).Error; err != nil {
result[neTypeAndId] = err.Error()
continue
}
if total <= int64(params.StoreNum) {
result[neTypeAndId] = "less than storeNum"
continue
}
// 查询最后记录数据
var lastCreateTime int64 = 0
lastTx := tx.Select("create_time").Order("create_time DESC").Limit(1)
@@ -62,7 +77,6 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) {
result[neTypeAndId] = err.Error()
continue
}
if lastCreateTime <= 1e12 {
result[neTypeAndId] = "no data"
continue
@@ -90,7 +104,7 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) {
// deleteFile 删除本地文件
func (s DeleteNeConfigBackupProcessor) deleteFile(neType, neId string, oldFileDate time.Time) {
neTypeLower := strings.ToLower(neType)
localPath := fmt.Sprintf("/usr/local/omc/backup/ne_config/%s/%s/backup ", neTypeLower, neId)
localPath := fmt.Sprintf("/usr/local/omc/backup/ne_config/%s/%s ", neTypeLower, neId)
files, err := os.ReadDir(localPath)
if err != nil {
logger.Errorf("logger Remove ne_config File ReadDir err: %v", err.Error())

View File

@@ -1,160 +0,0 @@
package getStateFromNE
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"be.ems/lib/config"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/framework/cron"
"github.com/go-resty/resty/v2"
)
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
type BarParams struct {
Duration int `json:"duration"`
}
type CpuUsage struct {
NfCpuUsage uint16 `json:"nfCpuUsage"`
SysCpuUsage uint16 `json:"sysCpuUsage"`
}
type MemUsage struct {
TotalMem uint32 `json:"totalMem"`
NfUsedMem uint32 `json:"nfUsedMem"`
SysMemUsage uint16 `json:"sysMemUsage"`
}
type PartitionInfo struct {
Total uint32 `json:"total"` // MB
Used uint32 `json:"used"` // MB
}
type DiskSpace struct {
PartitionNum uint8 `json:"partitionNum"`
PartitionInfo []PartitionInfo `json:"partitionInfo"`
}
type SystemState struct {
Version string `json:"version"`
Capability uint32 `json:"capability"`
SerialNum string `json:"serialNum"`
ExpiryDate string `json:"expiryDate"`
//Timestamp string `json:"timestamp"`
CpuUsage CpuUsage `json:"cpuUsage"`
MemUsage MemUsage `json:"memUsage"`
DiskSpace DiskSpace `json:"diskSpace"`
}
var client = resty.New()
func init() {
client.
SetTimeout(time.Duration(400 * time.Millisecond))
}
func (s *BarProcessor) Execute(data any) (any, error) {
var err error
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
var params BarParams
_ = json.Unmarshal([]byte(sysJob.TargetParams), &params)
// if err == nil {
// duration = params.Duration
// }
var nes []dborm.NeInfo
_, err = dborm.XormGetAllNeInfo(&nes)
if err != nil {
log.Error("Failed to get all ne info:", err)
return nil, err
}
failNum := 0
succNum := 0
for _, ne := range nes {
requestURI := fmt.Sprintf("/api/rest/systemManagement/v1/elementType/%s/objectType/systemState", strings.ToLower(ne.NeType))
requestURL := fmt.Sprintf("http://%s:%s%s", ne.Ip, ne.Port, requestURI)
log.Debug("requestURL: Get", requestURL)
response, err := client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Get(requestURL)
if err != nil {
log.Error("Failed to Get:", err)
failNum++
continue
}
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
log.Trace("response body:", string(response.Body()))
state := new(SystemState)
_ = json.Unmarshal(response.Body(), &state)
// var dateStr *string = nil
// if state.ExpiryDate != "" && state.ExpiryDate != "-" {
// dateStr = &state.ExpiryDate
// }
neState := new(dborm.NeState)
neState.NeType = ne.NeType
neState.NeId = ne.NeId
neState.Version = state.Version
neState.Capability = state.Capability
neState.SerialNum = state.SerialNum
// if dateStr != nil {
// neState.ExpiryDate = *dateStr
// }
neState.ExpiryDate = state.ExpiryDate
cu, _ := json.Marshal(state.CpuUsage)
neState.CpuUsage = string(cu)
mu, _ := json.Marshal(state.MemUsage)
neState.MemUsage = string(mu)
ds, _ := json.Marshal(state.DiskSpace)
neState.DiskSpace = string(ds)
log.Trace("neState:", neState)
_, err := dborm.XormInsertNeState(neState)
if err != nil {
log.Error("Failed to insert ne_state:", err)
failNum++
continue
}
succNum++
default:
log.Trace("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
failNum++
}
}
// 返回结果,用于记录执行结果
return map[string]any{
"succNum": succNum,
"failNum": failNum,
}, nil
}

View File

@@ -1,22 +1,20 @@
package ne_config_backup
import (
"encoding/json"
"fmt"
"path/filepath"
"be.ems/src/framework/cron"
"be.ems/src/framework/logger"
"be.ems/src/framework/ssh"
neDataService "be.ems/src/modules/network_data/service"
neModel "be.ems/src/modules/network_element/model"
neService "be.ems/src/modules/network_element/service"
systemService "be.ems/src/modules/system/service"
)
var NewProcessor = &NeConfigBackupProcessor{
neConfigBackupService: neService.NewNeConfigBackup,
neInfoService: neService.NewNeInfo,
sysConfigService: systemService.NewSysConfig,
backupService: neDataService.NewBackup,
count: 0,
}
@@ -24,7 +22,7 @@ var NewProcessor = &NeConfigBackupProcessor{
type NeConfigBackupProcessor struct {
neConfigBackupService *neService.NeConfigBackup // 网元配置文件备份记录服务
neInfoService *neService.NeInfo // 网元信息服务
sysConfigService *systemService.SysConfig // 参数配置服务
backupService *neDataService.Backup // 备份相关服务
count int // 执行次数
}
@@ -47,6 +45,7 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) {
result[neTypeAndId] = err.Error()
continue
}
// 新增备份记录
item := neModel.NeConfigBackup{
NeType: neInfo.NeType,
@@ -60,60 +59,14 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) {
result[neTypeAndId] = "failed"
continue
}
result[neTypeAndId] = "success"
s.putFTP(zipFilePath) // 上传到FTP服务器
msg := "ok"
// 上传到FTP服务器
if err := s.backupService.FTPPushFile(zipFilePath, "ne_config"); err != nil {
result[neTypeAndId] = msg + ", ftp err:" + err.Error()
}
result[neTypeAndId] = msg
}
return result, nil
}
// putFTP 提交到服务器ssh
func (s NeConfigBackupProcessor) putFTP(localFilePath string) {
// 获取配置
var cfgData struct {
Password string `json:"password" `
Username string `json:"username" binding:"required"`
ToIp string `json:"toIp" binding:"required"`
ToPort int64 `json:"toPort" binding:"required"`
Enable bool `json:"enable"`
Dir string `json:"dir" binding:"required"`
}
cfg := systemService.NewSysConfig.FindByKeyDecryptValue("neData.exportTableFTP")
if cfg.ConfigId > 0 {
if err := json.Unmarshal([]byte(cfg.ConfigValue), &cfgData); err != nil {
logger.Errorf("putFTP unmarshal error: %v", err)
return
}
}
if !cfgData.Enable {
return
}
connSSH := ssh.ConnSSH{
User: cfgData.Username,
Password: cfgData.Password,
Addr: cfgData.ToIp,
Port: cfgData.ToPort,
AuthMode: "0",
}
sshClient, err := connSSH.NewClient()
if err != nil {
logger.Errorf("putFTP ssh error: %v", err)
return
}
defer sshClient.Close()
// 网元主机的SSH客户端进行文件传输
sftpClient, err := sshClient.NewClientSFTP()
if err != nil {
logger.Errorf("putFTP sftp error: %v", err)
return
}
defer sftpClient.Close()
// 远程文件
remotePath := filepath.Join(cfgData.Dir, "/ne_config", filepath.Base(localFilePath))
// 复制到远程
if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil {
logger.Errorf("putFTP uploading error: %v", err)
return
}
}

View File

@@ -2,16 +2,17 @@ package processor
import (
"be.ems/src/framework/cron"
processorBackupExportTable "be.ems/src/modules/crontask/processor/backup_export_table"
processorBackupExportUDM "be.ems/src/modules/crontask/processor/backup_export_udm"
processorBackupRemoveFile "be.ems/src/modules/crontask/processor/backup_remove_file"
processorDeleteAlarmRecord "be.ems/src/modules/crontask/processor/delete_alarm_record"
processorDeleteDataRecord "be.ems/src/modules/crontask/processor/delete_data_record"
processorDeleteKPIRecord "be.ems/src/modules/crontask/processor/delete_kpi_record"
processorDeleteNeConfigBackup "be.ems/src/modules/crontask/processor/delete_ne_config_backup"
"be.ems/src/modules/crontask/processor/exportTable"
processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource"
processorNeAlarmStateCheck "be.ems/src/modules/crontask/processor/ne_alarm_state_check"
processorNeConfigBackup "be.ems/src/modules/crontask/processor/ne_config_backup"
processorNeDataUDM "be.ems/src/modules/crontask/processor/ne_data_udm"
"be.ems/src/modules/crontask/processor/removeFile"
)
// InitCronQueue 初始定时任务队列
@@ -20,7 +21,7 @@ func InitCronQueue() {
cron.CreateQueue("monitor_sys_resource", processorMonitorSysResource.NewProcessor)
// 网元-网元配置文件定期备份
cron.CreateQueue("ne_config_backup", processorNeConfigBackup.NewProcessor)
// 网元数据-UDM数据刷新同步
// 网元数据-UDM用户数据同步
cron.CreateQueue("ne_data_udm", processorNeDataUDM.NewProcessor)
// 网元告警-状态检查
cron.CreateQueue("ne_alarm_state_check", processorNeAlarmStateCheck.NewProcessor)
@@ -34,6 +35,10 @@ func InitCronQueue() {
// 删除-网元配置文件定期备份
cron.CreateQueue("delete_ne_config_backup", processorDeleteNeConfigBackup.NewProcessor)
cron.CreateQueue("exportTable", exportTable.NewProcessor)
cron.CreateQueue("removeFile", removeFile.NewProcessor)
// 备份-导出数据表
cron.CreateQueue("backup_export_table", processorBackupExportTable.NewProcessor)
// 备份-删除备份目录下文件
cron.CreateQueue("backup_remove_file", processorBackupRemoveFile.NewProcessor)
// 备份-导出UDM用户数据
cron.CreateQueue("backup_export_udm", processorBackupExportUDM.NewProcessor)
}

View File

@@ -1,159 +0,0 @@
package removeFile
import (
"encoding/json"
"os"
"path/filepath"
"sort"
"time"
"be.ems/lib/log"
"be.ems/src/framework/cron"
)
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
type BarParams struct {
FilePath string `json:"filePath"` // file path
MaxDays int `json:"maxDays"`
MaxFiles *int `json:"maxFiles"` // keep max files
MaxSize *int64 `json:"maxSize"`
Extras string `json:"extras"` // extras condition for where
}
type FileInfo struct {
Path string
Info os.FileInfo
}
func (s *BarProcessor) Execute(data any) (any, error) {
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
var params []BarParams
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, err
}
result := []map[string]any{}
for _, param := range params {
res, _ := s.ExecuteOne(param)
result = append(result, res)
}
// 返回结果,用于记录执行结果
return map[string]any{
"result": result,
}, nil
}
func (s *BarProcessor) ExecuteOne(params BarParams) (map[string]any, error) {
var maxFiles int = 0
var maxSize int64 = 0
if params.MaxFiles != nil {
maxFiles = *params.MaxFiles
}
if params.MaxSize != nil {
maxSize = int64(*params.MaxSize * 1024 * 1024)
}
files, err := getFiles(params.FilePath)
if err != nil {
return map[string]any{
"msg": "failed",
"err": err.Error(),
}, err
}
// 获取本地时区
loc, err := time.LoadLocation("Local")
if err != nil {
return map[string]any{
"msg": "failed",
"err": err.Error(),
}, err
}
cutoff := time.Now().In(loc).AddDate(0, 0, -params.MaxDays)
var oldFiles []FileInfo
for _, file := range files {
if file.Info.ModTime().Before(cutoff) {
oldFiles = append(oldFiles, file)
}
}
// 按修改时间排序文件(最旧的在前)
sort.Slice(oldFiles, func(i, j int) bool {
return oldFiles[i].Info.ModTime().Before(oldFiles[j].Info.ModTime())
})
deleted, errorDel := 0, 0
// 删除文件直到满足文件总数不超过maxFiles个且总大小不超过maxSize的条件
var totalSize int64
for i, file := range oldFiles {
if (maxFiles > 0 && i >= maxFiles) || (maxSize > 0 && totalSize+file.Info.Size() > maxSize) {
break
}
err := os.Remove(file.Path)
if err != nil {
log.Error("Error deleting file:", file.Path, err)
errorDel++
continue
}
totalSize += file.Info.Size()
deleted++
}
// 如果仍然有超过maxFiles个文件或总大小超过maxSize继续删除最旧的文件
remainingFiles := files
sort.Slice(remainingFiles, func(i, j int) bool {
return remainingFiles[i].Info.ModTime().Before(remainingFiles[j].Info.ModTime())
})
for (maxFiles > 0 && len(remainingFiles) > maxFiles) || (maxSize > 0 && totalSize > maxSize) {
file := remainingFiles[0]
err := os.Remove(file.Path)
if err != nil {
log.Error("Error deleting file:", file.Path, err)
remainingFiles = remainingFiles[1:]
continue
}
totalSize -= file.Info.Size()
remainingFiles = remainingFiles[1:]
}
// 返回结果,用于记录执行结果
return map[string]any{
"msg": "successed",
"filePath": params.FilePath,
"deleted": deleted,
"errorDel": errorDel,
}, nil
}
func getFiles(dir string) ([]FileInfo, error) {
var files []FileInfo
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
files = append(files, FileInfo{Path: path, Info: info})
}
return nil
})
return files, err
}

View File

@@ -113,8 +113,6 @@ func (s NeConfigBackup) FileLocalToNe(neInfo model.NeInfo, localFile string) err
sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/rtproxy && sudo cp -rf %s/rtproxy/* /usr/local/etc/rtproxy && sudo chmod 755 /usr/local/etc/rtproxy/rtproxy.conf", neDirTemp))
// iwf目录
sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/iwf && sudo cp -rf %s/iwf/* /usr/local/etc/iwf && sudo chmod 755 /usr/local/etc/iwf/*.yaml", neDirTemp))
} else if neTypeLower == "omc" {
sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/omc/etc && sudo cp -rf %s/* /usr/local/omc/etc && sudo chmod 755 /usr/local/omc/etc/*.{yaml,conf}", neDirTemp))
} else if neTypeLower == "smsc" {
chmodFile := "sudo chmod 755 /usr/local/etc/smsc/{*sys.conf,*conf.txt,conf/is41_operation.conf}"
sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/smsc/conf && sudo cp -rf %s/* /usr/local/etc/smsc && %s", neDirTemp, chmodFile))