From d228ce01f26b8a7651beae07453ab233f1364d05 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Sat, 28 Oct 2023 17:03:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=B0=83=E5=BA=A6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=B1=82=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/modules/crontask/crontask.go | 20 +-- .../backupEtcFromNE/backupEtcFromNE.go | 145 ++++++++++++++++++ .../delExpiredNeBackup/delExpiredNeBackup.go | 80 ++++++++++ .../deleteExpiredRecord.go | 0 .../monitor_sys_resource.go | 56 +++++++ src/modules/crontask/processor/processor.go | 19 +++ 6 files changed, 308 insertions(+), 12 deletions(-) create mode 100644 src/modules/crontask/processor/backupEtcFromNE/backupEtcFromNE.go create mode 100644 src/modules/crontask/processor/delExpiredNeBackup/delExpiredNeBackup.go rename src/modules/crontask/{ => processor}/deleteExpiredRecord/deleteExpiredRecord.go (100%) create mode 100644 src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go create mode 100644 src/modules/crontask/processor/processor.go diff --git a/src/modules/crontask/crontask.go b/src/modules/crontask/crontask.go index 0eea775e..53fd16e1 100644 --- a/src/modules/crontask/crontask.go +++ b/src/modules/crontask/crontask.go @@ -1,11 +1,8 @@ package crontask import ( - "ems.agt/src/framework/cron" "ems.agt/src/framework/logger" - "ems.agt/src/modules/crontask/backupEtcFromNE" - "ems.agt/src/modules/crontask/delExpiredNeBackup" - "ems.agt/src/modules/crontask/deleteExpiredRecord" + "ems.agt/src/modules/crontask/processor" "github.com/gin-gonic/gin" ) @@ -14,14 +11,13 @@ import ( func Setup(router *gin.Engine) { logger.Infof("开始加载 ====> crontask 模块路由") - // 初始定时任务队列 - InitCronQueue() + // 启动时需要的初始参数 + InitLoad() + } -// InitCronQueue 初始定时任务队列 -func InitCronQueue() { - // delete expired NE backup file - cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor) - cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor) - cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor) +// InitLoad 初始参数 +func InitLoad() { + // 初始化定时任务处理 + processor.InitCronQueue() } diff --git a/src/modules/crontask/processor/backupEtcFromNE/backupEtcFromNE.go b/src/modules/crontask/processor/backupEtcFromNE/backupEtcFromNE.go new file mode 100644 index 00000000..fe70d4a4 --- /dev/null +++ b/src/modules/crontask/processor/backupEtcFromNE/backupEtcFromNE.go @@ -0,0 +1,145 @@ +package backupEtcFromNE + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "ems.agt/lib/dborm" + "ems.agt/lib/global" + "ems.agt/lib/log" + "ems.agt/restagent/config" + "ems.agt/src/framework/cron" +) + +var NewProcessor = &BarProcessor{ + progress: 0, + count: 0, +} + +// bar 队列任务处理 +type BarProcessor struct { + // 任务进度 + progress int + // 执行次数 + count int +} + +type BarParams struct { + Duration int `json:"duration"` + TableName string `json:"tableName"` + ColName string `json:"colName"` // column name of time string + Extras string `json:"extras"` // extras condition for where +} + +func (s *BarProcessor) Execute(data any) (any, error) { + log.Infof("execute %d,last progress: %d ", s.count, s.progress) + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + var params BarParams + + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, err + } + + log.Infof("Repeat %v Job ID %s", options.Repeat, sysJob.JobID) + + var nes []dborm.NeInfo + _, err = dborm.XormGetAllNeInfo(&nes) + if err != nil { + return nil, err + } + + var successfulNEs, failureNEs []string + for _, neInfo := range nes { + neTypeUpper := strings.ToUpper(neInfo.NeType) + neTypeLower := strings.ToLower(neInfo.NeType) + nePath := fmt.Sprintf("%s/etc/%s", config.GetYamlConfig().OMC.Backup, neTypeLower) + isExist, err := global.PathExists(nePath) + if err != nil { + log.Errorf("Failed to PathExists:", err) + failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) + continue + } + if isExist { + err = os.RemoveAll(nePath) + if err != nil { + log.Errorf("Failed to RemoveAll:", err) + failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) + continue + } + } + err = os.MkdirAll(nePath, os.ModePerm) + if err != nil { + log.Errorf("Failed to MkdirAll:", err) + failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) + continue + } + + var scpCmd string + ipType := global.ParseIPAddr(neInfo.Ip) + if neTypeLower != "omc" { + if ipType == global.IsIPv4 { + scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, + neInfo.Ip, config.GetYamlConfig().NE.EtcDir, + neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower) + } else { + scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, + neInfo.Ip, config.GetYamlConfig().NE.EtcDir, + neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower) + } + } else { + if ipType == global.IsIPv4 { + scpCmd = fmt.Sprintf("scp -r %s@%s:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, + neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower) + } else { + scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, + neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower) + } + } + + zipFile := fmt.Sprintf("%s-%s-etc-%s.zip", neTypeLower, strings.ToLower(neInfo.NeId), time.Now().Format(global.DateData)) + zipFilePath := config.GetYamlConfig().OMC.Backup + "/" + zipFile + zipCmd := fmt.Sprintf("cd %s/etc && zip -r %s %s/*", config.GetYamlConfig().OMC.Backup, zipFilePath, neTypeLower) + + command := fmt.Sprintf("%s&&%s", scpCmd, zipCmd) + + log.Trace("command:", command) + out, err := global.ExecCmd(command) + if err != nil { + log.Error("Faile to exec command:", err) + failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) + continue + } + log.Trace("command output:", out) + + md5Sum, err := global.GetFileMD5Sum(zipFilePath) + if err != nil { + log.Error("Faile to md5sum:", err) + failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) + continue + } + //log.Debug("md5Str:", md5Sum) + path := config.GetYamlConfig().OMC.Backup + neBackup := dborm.NeBackup{NeType: neTypeUpper, NeId: neInfo.NeId, FileName: zipFile, Path: path, Md5Sum: md5Sum} + _, err = dborm.XormInsertTableOne("ne_backup", neBackup) + if err != nil { + log.Error("Faile to XormInsertTableOne:", err) + failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) + continue + } + successfulNEs = append(successfulNEs, neInfo.NeType+"/"+neInfo.NeId) + } + + log.Infof("successfulNEs: %s failureNEs: %s", successfulNEs, failureNEs) + // result + return map[string]any{ + "successfulNEs": successfulNEs, + "failureNEs": failureNEs, + }, nil +} diff --git a/src/modules/crontask/processor/delExpiredNeBackup/delExpiredNeBackup.go b/src/modules/crontask/processor/delExpiredNeBackup/delExpiredNeBackup.go new file mode 100644 index 00000000..641641b9 --- /dev/null +++ b/src/modules/crontask/processor/delExpiredNeBackup/delExpiredNeBackup.go @@ -0,0 +1,80 @@ +package delExpiredNeBackup + +import ( + "encoding/json" + "fmt" + + "ems.agt/lib/dborm" + "ems.agt/lib/log" + "ems.agt/src/framework/cron" +) + +var NewProcessor = &BarProcessor{ + progress: 0, + count: 0, +} + +// bar 队列任务处理 +type BarProcessor struct { + // 任务进度 + progress int + // 执行次数 + count int +} + +type BarParams struct { + Duration int `json:"duration"` +} + +func (s *BarProcessor) Execute(data any) (any, error) { + log.Infof("执行 %d 次,上次进度: %d ", s.count, s.progress) + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + var params BarParams + duration := 60 + + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err == nil { + duration = params.Duration + } + log.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID) + + // // 实现任务处理逻辑 + // i := 0 + // s.progress = i + // for i < 5 { + // // 获取任务进度 + // progress := s.progress + // log.Infof("jonId: %s => 任务进度:%d", sysJob.JobID, progress) + // // 延迟响应 + // time.Sleep(time.Second * 2) + // // 程序中途执行错误 + // if i == 3 { + // // arr := [1]int{1} + // // arr[i] = 3 + // // fmt.Println(arr) + // // return "i = 3" + // panic("程序中途执行错误") + // } + // i++ + // // 改变任务进度 + // s.progress = i + // } + where := fmt.Sprintf("NOW()>ADDDATE(`create_time`,interval %d day)", duration) + affected, err := dborm.XormDeleteDataByWhere(where, "ne_backup") + if err != nil { + // panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err)) + return nil, err + } + + // delete expired files in backup directory + // todo ... + + // 返回结果,用于记录执行结果 + return map[string]any{ + "msg": "sucess", + "affected": affected, + }, nil +} diff --git a/src/modules/crontask/deleteExpiredRecord/deleteExpiredRecord.go b/src/modules/crontask/processor/deleteExpiredRecord/deleteExpiredRecord.go similarity index 100% rename from src/modules/crontask/deleteExpiredRecord/deleteExpiredRecord.go rename to src/modules/crontask/processor/deleteExpiredRecord/deleteExpiredRecord.go diff --git a/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go new file mode 100644 index 00000000..58ecfd0d --- /dev/null +++ b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go @@ -0,0 +1,56 @@ +package monitorsysresource + +import ( + "encoding/json" + "fmt" + + "ems.agt/src/framework/cron" + "ems.agt/src/framework/logger" + monitorService "ems.agt/src/modules/monitor/service" +) + +var NewProcessor = &MonitorSysResourceProcessor{ + monitorService: monitorService.NewMonitorImpl, + count: 0, + openDataCancel: false, +} + +// MonitorSysResourceProcessor 系统资源CPU/IO/Netword收集 +type MonitorSysResourceProcessor struct { + // 服务器系统相关信息服务 + monitorService monitorService.IMonitor + // 执行次数 + count int + // 是否已经开启数据通道 + openDataCancel bool +} + +func (s *MonitorSysResourceProcessor) Execute(data any) (any, error) { + s.count++ // 执行次数加一 + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID) + + // 读取参数值 + var params struct { + Interval float64 `json:"interval"` + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params 'interval' err: %v", err) + } + + // 避免重复开启chan通道 + if !s.openDataCancel { + s.monitorService.RunMonitorDataCancel(false, params.Interval) + s.openDataCancel = true + } + + s.monitorService.RunMonitor() + + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + return result, nil +} diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go new file mode 100644 index 00000000..4a0d8ae1 --- /dev/null +++ b/src/modules/crontask/processor/processor.go @@ -0,0 +1,19 @@ +package processor + +import ( + "ems.agt/src/framework/cron" + "ems.agt/src/modules/crontask/processor/backupEtcFromNE" + "ems.agt/src/modules/crontask/processor/delExpiredNeBackup" + "ems.agt/src/modules/crontask/processor/deleteExpiredRecord" + monitorsysresource "ems.agt/src/modules/crontask/processor/monitor_sys_resource" +) + +// InitCronQueue 初始定时任务队列 +func InitCronQueue() { + // 监控-系统资源 + cron.CreateQueue("monitor_sys_resource", monitorsysresource.NewProcessor) + // delete expired NE backup file + cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor) + cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor) + cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor) +}