fix: 调度任务模块层级
This commit is contained in:
@@ -1,11 +1,8 @@
|
|||||||
package crontask
|
package crontask
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"ems.agt/src/framework/cron"
|
|
||||||
"ems.agt/src/framework/logger"
|
"ems.agt/src/framework/logger"
|
||||||
"ems.agt/src/modules/crontask/backupEtcFromNE"
|
"ems.agt/src/modules/crontask/processor"
|
||||||
"ems.agt/src/modules/crontask/delExpiredNeBackup"
|
|
||||||
"ems.agt/src/modules/crontask/deleteExpiredRecord"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
@@ -14,14 +11,13 @@ import (
|
|||||||
func Setup(router *gin.Engine) {
|
func Setup(router *gin.Engine) {
|
||||||
logger.Infof("开始加载 ====> crontask 模块路由")
|
logger.Infof("开始加载 ====> crontask 模块路由")
|
||||||
|
|
||||||
// 初始定时任务队列
|
// 启动时需要的初始参数
|
||||||
InitCronQueue()
|
InitLoad()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitCronQueue 初始定时任务队列
|
// InitLoad 初始参数
|
||||||
func InitCronQueue() {
|
func InitLoad() {
|
||||||
// delete expired NE backup file
|
// 初始化定时任务处理
|
||||||
cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor)
|
processor.InitCronQueue()
|
||||||
cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor)
|
|
||||||
cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,145 @@
|
|||||||
|
package backupEtcFromNE
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"ems.agt/lib/dborm"
|
||||||
|
"ems.agt/lib/global"
|
||||||
|
"ems.agt/lib/log"
|
||||||
|
"ems.agt/restagent/config"
|
||||||
|
"ems.agt/src/framework/cron"
|
||||||
|
)
|
||||||
|
|
||||||
|
var NewProcessor = &BarProcessor{
|
||||||
|
progress: 0,
|
||||||
|
count: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
// bar 队列任务处理
|
||||||
|
type BarProcessor struct {
|
||||||
|
// 任务进度
|
||||||
|
progress int
|
||||||
|
// 执行次数
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
type BarParams struct {
|
||||||
|
Duration int `json:"duration"`
|
||||||
|
TableName string `json:"tableName"`
|
||||||
|
ColName string `json:"colName"` // column name of time string
|
||||||
|
Extras string `json:"extras"` // extras condition for where
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||||
|
log.Infof("execute %d,last progress: %d ", s.count, s.progress)
|
||||||
|
s.count++
|
||||||
|
|
||||||
|
options := data.(cron.JobData)
|
||||||
|
sysJob := options.SysJob
|
||||||
|
var params BarParams
|
||||||
|
|
||||||
|
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Repeat %v Job ID %s", options.Repeat, sysJob.JobID)
|
||||||
|
|
||||||
|
var nes []dborm.NeInfo
|
||||||
|
_, err = dborm.XormGetAllNeInfo(&nes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var successfulNEs, failureNEs []string
|
||||||
|
for _, neInfo := range nes {
|
||||||
|
neTypeUpper := strings.ToUpper(neInfo.NeType)
|
||||||
|
neTypeLower := strings.ToLower(neInfo.NeType)
|
||||||
|
nePath := fmt.Sprintf("%s/etc/%s", config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||||
|
isExist, err := global.PathExists(nePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to PathExists:", err)
|
||||||
|
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if isExist {
|
||||||
|
err = os.RemoveAll(nePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to RemoveAll:", err)
|
||||||
|
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = os.MkdirAll(nePath, os.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to MkdirAll:", err)
|
||||||
|
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var scpCmd string
|
||||||
|
ipType := global.ParseIPAddr(neInfo.Ip)
|
||||||
|
if neTypeLower != "omc" {
|
||||||
|
if ipType == global.IsIPv4 {
|
||||||
|
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||||
|
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
|
||||||
|
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||||
|
} else {
|
||||||
|
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||||
|
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
|
||||||
|
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if ipType == global.IsIPv4 {
|
||||||
|
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||||
|
neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||||
|
} else {
|
||||||
|
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
|
||||||
|
neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
zipFile := fmt.Sprintf("%s-%s-etc-%s.zip", neTypeLower, strings.ToLower(neInfo.NeId), time.Now().Format(global.DateData))
|
||||||
|
zipFilePath := config.GetYamlConfig().OMC.Backup + "/" + zipFile
|
||||||
|
zipCmd := fmt.Sprintf("cd %s/etc && zip -r %s %s/*", config.GetYamlConfig().OMC.Backup, zipFilePath, neTypeLower)
|
||||||
|
|
||||||
|
command := fmt.Sprintf("%s&&%s", scpCmd, zipCmd)
|
||||||
|
|
||||||
|
log.Trace("command:", command)
|
||||||
|
out, err := global.ExecCmd(command)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Faile to exec command:", err)
|
||||||
|
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Trace("command output:", out)
|
||||||
|
|
||||||
|
md5Sum, err := global.GetFileMD5Sum(zipFilePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Faile to md5sum:", err)
|
||||||
|
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//log.Debug("md5Str:", md5Sum)
|
||||||
|
path := config.GetYamlConfig().OMC.Backup
|
||||||
|
neBackup := dborm.NeBackup{NeType: neTypeUpper, NeId: neInfo.NeId, FileName: zipFile, Path: path, Md5Sum: md5Sum}
|
||||||
|
_, err = dborm.XormInsertTableOne("ne_backup", neBackup)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Faile to XormInsertTableOne:", err)
|
||||||
|
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
successfulNEs = append(successfulNEs, neInfo.NeType+"/"+neInfo.NeId)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("successfulNEs: %s failureNEs: %s", successfulNEs, failureNEs)
|
||||||
|
// result
|
||||||
|
return map[string]any{
|
||||||
|
"successfulNEs": successfulNEs,
|
||||||
|
"failureNEs": failureNEs,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,80 @@
|
|||||||
|
package delExpiredNeBackup
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"ems.agt/lib/dborm"
|
||||||
|
"ems.agt/lib/log"
|
||||||
|
"ems.agt/src/framework/cron"
|
||||||
|
)
|
||||||
|
|
||||||
|
var NewProcessor = &BarProcessor{
|
||||||
|
progress: 0,
|
||||||
|
count: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
// bar 队列任务处理
|
||||||
|
type BarProcessor struct {
|
||||||
|
// 任务进度
|
||||||
|
progress int
|
||||||
|
// 执行次数
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
type BarParams struct {
|
||||||
|
Duration int `json:"duration"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||||
|
log.Infof("执行 %d 次,上次进度: %d ", s.count, s.progress)
|
||||||
|
s.count++
|
||||||
|
|
||||||
|
options := data.(cron.JobData)
|
||||||
|
sysJob := options.SysJob
|
||||||
|
var params BarParams
|
||||||
|
duration := 60
|
||||||
|
|
||||||
|
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||||
|
if err == nil {
|
||||||
|
duration = params.Duration
|
||||||
|
}
|
||||||
|
log.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID)
|
||||||
|
|
||||||
|
// // 实现任务处理逻辑
|
||||||
|
// i := 0
|
||||||
|
// s.progress = i
|
||||||
|
// for i < 5 {
|
||||||
|
// // 获取任务进度
|
||||||
|
// progress := s.progress
|
||||||
|
// log.Infof("jonId: %s => 任务进度:%d", sysJob.JobID, progress)
|
||||||
|
// // 延迟响应
|
||||||
|
// time.Sleep(time.Second * 2)
|
||||||
|
// // 程序中途执行错误
|
||||||
|
// if i == 3 {
|
||||||
|
// // arr := [1]int{1}
|
||||||
|
// // arr[i] = 3
|
||||||
|
// // fmt.Println(arr)
|
||||||
|
// // return "i = 3"
|
||||||
|
// panic("程序中途执行错误")
|
||||||
|
// }
|
||||||
|
// i++
|
||||||
|
// // 改变任务进度
|
||||||
|
// s.progress = i
|
||||||
|
// }
|
||||||
|
where := fmt.Sprintf("NOW()>ADDDATE(`create_time`,interval %d day)", duration)
|
||||||
|
affected, err := dborm.XormDeleteDataByWhere(where, "ne_backup")
|
||||||
|
if err != nil {
|
||||||
|
// panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete expired files in backup directory
|
||||||
|
// todo ...
|
||||||
|
|
||||||
|
// 返回结果,用于记录执行结果
|
||||||
|
return map[string]any{
|
||||||
|
"msg": "sucess",
|
||||||
|
"affected": affected,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package monitorsysresource
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"ems.agt/src/framework/cron"
|
||||||
|
"ems.agt/src/framework/logger"
|
||||||
|
monitorService "ems.agt/src/modules/monitor/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
var NewProcessor = &MonitorSysResourceProcessor{
|
||||||
|
monitorService: monitorService.NewMonitorImpl,
|
||||||
|
count: 0,
|
||||||
|
openDataCancel: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
// MonitorSysResourceProcessor 系统资源CPU/IO/Netword收集
|
||||||
|
type MonitorSysResourceProcessor struct {
|
||||||
|
// 服务器系统相关信息服务
|
||||||
|
monitorService monitorService.IMonitor
|
||||||
|
// 执行次数
|
||||||
|
count int
|
||||||
|
// 是否已经开启数据通道
|
||||||
|
openDataCancel bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MonitorSysResourceProcessor) Execute(data any) (any, error) {
|
||||||
|
s.count++ // 执行次数加一
|
||||||
|
options := data.(cron.JobData)
|
||||||
|
sysJob := options.SysJob
|
||||||
|
logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID)
|
||||||
|
|
||||||
|
// 读取参数值
|
||||||
|
var params struct {
|
||||||
|
Interval float64 `json:"interval"`
|
||||||
|
}
|
||||||
|
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("json params 'interval' err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 避免重复开启chan通道
|
||||||
|
if !s.openDataCancel {
|
||||||
|
s.monitorService.RunMonitorDataCancel(false, params.Interval)
|
||||||
|
s.openDataCancel = true
|
||||||
|
}
|
||||||
|
|
||||||
|
s.monitorService.RunMonitor()
|
||||||
|
|
||||||
|
// 返回结果,用于记录执行结果
|
||||||
|
result := map[string]any{
|
||||||
|
"count": s.count,
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
19
src/modules/crontask/processor/processor.go
Normal file
19
src/modules/crontask/processor/processor.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package processor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"ems.agt/src/framework/cron"
|
||||||
|
"ems.agt/src/modules/crontask/processor/backupEtcFromNE"
|
||||||
|
"ems.agt/src/modules/crontask/processor/delExpiredNeBackup"
|
||||||
|
"ems.agt/src/modules/crontask/processor/deleteExpiredRecord"
|
||||||
|
monitorsysresource "ems.agt/src/modules/crontask/processor/monitor_sys_resource"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InitCronQueue 初始定时任务队列
|
||||||
|
func InitCronQueue() {
|
||||||
|
// 监控-系统资源
|
||||||
|
cron.CreateQueue("monitor_sys_resource", monitorsysresource.NewProcessor)
|
||||||
|
// delete expired NE backup file
|
||||||
|
cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor)
|
||||||
|
cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor)
|
||||||
|
cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user