fix: add cron task

This commit is contained in:
2023-10-25 17:00:54 +08:00
parent 2ac4bea8ba
commit 66794f91e2
5 changed files with 241 additions and 4 deletions

47
lib/global/exec_linux.go Normal file
View File

@@ -0,0 +1,47 @@
//go:build linux
// +build linux
package global
import (
"bytes"
"os/exec"
)
func ExecCmd(command string) ([]byte, error) {
cmd := exec.Command("/bin/bash", "-c", command)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, err
}
return out, nil
}
func ExecShell(command string) error {
in := bytes.NewBuffer(nil)
cmd := exec.Command("sh")
cmd.Stdin = in
in.WriteString(command)
in.WriteString("exit\n")
if err := cmd.Start(); err != nil {
return err
}
return nil
}
func ExecOsCmd(command, os string) ([]byte, error) {
var cmd *exec.Cmd
switch os {
case "Linux":
cmd = exec.Command(command)
case "Windows":
cmd = exec.Command("cmd", "/C", command)
}
out, err := cmd.CombinedOutput()
if err != nil {
return nil, err
}
return out, nil
}

View File

@@ -0,0 +1,34 @@
//go:build windows
// +build windows
package global
import (
"os/exec"
)
func ExecCmd(command string) ([]byte, error) {
cmd := exec.Command("cmd", "/C", command)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, err
}
return out, nil
}
func ExecOsCmd(command, os string) ([]byte, error) {
var cmd *exec.Cmd
switch os {
case "Linux":
cmd = exec.Command(command)
case "Windows":
cmd = exec.Command("cmd", "/C", command)
}
out, err := cmd.CombinedOutput()
if err != nil {
return nil, err
}
return out, nil
}

View File

@@ -0,0 +1,145 @@
package backupEtcFromNE
import (
"encoding/json"
"fmt"
"os"
"strings"
"time"
"ems.agt/lib/dborm"
"ems.agt/lib/global"
"ems.agt/lib/log"
"ems.agt/restagent/config"
"ems.agt/src/framework/cron"
)
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
type BarParams struct {
Duration int `json:"duration"`
TableName string `json:"tableName"`
ColName string `json:"colName"` // column name of time string
Extras string `json:"extras"` // extras condition for where
}
func (s *BarProcessor) Execute(data any) (any, error) {
log.Infof("execute %dlast progress %d ", s.count, s.progress)
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
var params BarParams
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, err
}
log.Infof("Repeat %v Job ID %s", options.Repeat, sysJob.JobID)
var nes []dborm.NeInfo
_, err = dborm.XormGetAllNeInfo(&nes)
if err != nil {
return nil, err
}
var successfulNEs, failureNEs []string
for _, neInfo := range nes {
neTypeUpper := strings.ToUpper(neInfo.NeType)
neTypeLower := strings.ToLower(neInfo.NeType)
nePath := fmt.Sprintf("%s/etc/%s", config.GetYamlConfig().OMC.Backup, neTypeLower)
isExist, err := global.PathExists(nePath)
if err != nil {
log.Errorf("Failed to PathExists:", err)
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
continue
}
if isExist {
err = os.RemoveAll(nePath)
if err != nil {
log.Errorf("Failed to RemoveAll:", err)
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
continue
}
}
err = os.MkdirAll(nePath, os.ModePerm)
if err != nil {
log.Errorf("Failed to MkdirAll:", err)
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
continue
}
var scpCmd string
ipType := global.ParseIPAddr(neInfo.Ip)
if neTypeLower != "omc" {
if ipType == global.IsIPv4 {
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
} else {
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
neInfo.Ip, config.GetYamlConfig().NE.EtcDir,
neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower)
}
} else {
if ipType == global.IsIPv4 {
scpCmd = fmt.Sprintf("scp -r %s@%s:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower)
} else {
scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User,
neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower)
}
}
zipFile := fmt.Sprintf("%s-%s-etc-%s.zip", neTypeLower, strings.ToLower(neInfo.NeId), time.Now().Format(global.DateData))
zipFilePath := config.GetYamlConfig().OMC.Backup + "/" + zipFile
zipCmd := fmt.Sprintf("cd %s/etc && zip -r %s %s/*", config.GetYamlConfig().OMC.Backup, zipFilePath, neTypeLower)
command := fmt.Sprintf("%s&&%s", scpCmd, zipCmd)
log.Trace("command:", command)
out, err := global.ExecCmd(command)
if err != nil {
log.Error("Faile to exec command:", err)
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
continue
}
log.Trace("command output:", out)
md5Sum, err := global.GetFileMD5Sum(zipFilePath)
if err != nil {
log.Error("Faile to md5sum:", err)
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
continue
}
//log.Debug("md5Str:", md5Sum)
path := config.GetYamlConfig().OMC.Backup
neBackup := dborm.NeBackup{NeType: neTypeUpper, NeId: neInfo.NeId, FileName: zipFile, Path: path, Md5Sum: md5Sum}
_, err = dborm.XormInsertTableOne("ne_backup", neBackup)
if err != nil {
log.Error("Faile to XormInsertTableOne:", err)
failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId)
continue
}
successfulNEs = append(successfulNEs, neInfo.NeType+"/"+neInfo.NeId)
}
log.Infof("successfulNEs: %s failureNEs: %s", successfulNEs, failureNEs)
// result
return map[string]any{
"successfulNEs": successfulNEs,
"failureNEs": failureNEs,
}, nil
}

View File

@@ -3,7 +3,9 @@ package crontask
import (
"ems.agt/src/framework/cron"
"ems.agt/src/framework/logger"
"ems.agt/src/modules/crontask/backupEtcFromNE"
"ems.agt/src/modules/crontask/delExpiredNeBackup"
"ems.agt/src/modules/crontask/deleteExpiredRecord"
"github.com/gin-gonic/gin"
)
@@ -21,4 +23,6 @@ func Setup(router *gin.Engine) {
func InitCronQueue() {
// delete expired NE backup file
cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor)
cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor)
cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor)
}

View File

@@ -1,4 +1,4 @@
package delExpiredNeBackup
package deleteExpiredRecord
import (
"encoding/json"
@@ -24,8 +24,9 @@ type BarProcessor struct {
type BarParams struct {
Duration int `json:"duration"`
ColName string `json:"colName"`
TableName string `json:"tableName"`
ColName string `json:"colName"` // column name of time string
Extras string `json:"extras"` // extras condition for where
}
func (s *BarProcessor) Execute(data any) (any, error) {
@@ -35,7 +36,6 @@ func (s *BarProcessor) Execute(data any) (any, error) {
options := data.(cron.JobData)
sysJob := options.SysJob
var params BarParams
duration := 60
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
@@ -66,7 +66,14 @@ func (s *BarProcessor) Execute(data any) (any, error) {
// // 改变任务进度
// s.progress = i
// }
where := fmt.Sprintf("NOW()>ADDDATE(`%s`,interval %d day)", params.ColName, params.Duration)
var where string
if params.Extras == "" {
where = fmt.Sprintf("NOW()>ADDDATE(`%s`,interval %d day)", params.ColName, params.Duration)
} else {
where = fmt.Sprintf("NOW()>ADDDATE(`%s`,interval %d day) and %s", params.ColName, params.Duration, params.Extras)
}
affected, err := dborm.XormDeleteDataByWhere(where, params.TableName)
if err != nil {
// panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err))