diff --git a/src/framework/cron/cron.go b/src/framework/cron/cron.go index ce2bc811..e45c01bf 100644 --- a/src/framework/cron/cron.go +++ b/src/framework/cron/cron.go @@ -1,6 +1,7 @@ package cron import ( + "errors" "fmt" "time" @@ -194,6 +195,9 @@ func (qj QueueJob) Run() { // 获取队列处理器接口实现 processor := *job.queueProcessor result, err := processor.Execute(job.Data) + if errors.Is(err, ErrTaskRunning) { + return + } if err != nil { job.Status = Failed cronLog.Error(err, "failed", job) diff --git a/src/framework/cron/cron_log.go b/src/framework/cron/cron_log.go index b1ade2ed..4d20bf84 100644 --- a/src/framework/cron/cron_log.go +++ b/src/framework/cron/cron_log.go @@ -2,6 +2,7 @@ package cron import ( "encoding/json" + "errors" "time" "be.ems/src/framework/constants" @@ -123,3 +124,6 @@ type JobData struct { // 定时任务调度表记录信息 SysJob monitorModel.SysJob } + +// ErrTaskRunning 任务正在运行错误 +var ErrTaskRunning = errors.New("task is running") diff --git a/src/framework/database/redis/expand.go b/src/framework/database/redis/expand.go index 85a8076a..1ede2195 100644 --- a/src/framework/database/redis/expand.go +++ b/src/framework/database/redis/expand.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "net" + "os" "sync" "time" @@ -173,3 +175,49 @@ func Expire(source, key string, expiration time.Duration) error { } return nil } + +// SetNX 设置分布式锁,不存在时设置 +func SetNX(source, key string, expiration time.Duration) bool { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return false + } + hostname, ipAddr := hostnameAndIP() + value := fmt.Sprintf("%s@%s", hostname, ipAddr) + // 过期时间设置 + ctx := context.Background() + ok, err := rdb.SetNX(ctx, key, value, expiration).Result() + if err != nil { + logger.Errorf("redis SetNX err %v", err) + return false + } + return ok +} + +// hostnameAndIP 获取当前主机的名称和 IP 地址 +func hostnameAndIP() (string, string) { + hostname, err := os.Hostname() + if err != nil { + return "unknown", "unknown" + } + + // 获取本机的第一个非 loopback 的 IP 地址 + addrs, err := net.InterfaceAddrs() + if err != nil { + return hostname, "unknown" + } + var ipAddr string + for _, addr := range addrs { + // 只选择 IPv4 地址且不是回环地址 + ipNet, ok := addr.(*net.IPNet) + if ok && ipNet.IP.To4() != nil && !ipNet.IP.IsLoopback() { + ipAddr = ipNet.IP.String() + break + } + } + if ipAddr == "" { + return hostname, "unknown" + } + return hostname, ipAddr +} diff --git a/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go b/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go index 86152d6e..7e2c0652 100644 --- a/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go +++ b/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go @@ -10,6 +10,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" @@ -48,6 +49,13 @@ func (s *BackupExportCDRProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_cdr:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 ims/smsc/smf/sgwc FileType string `json:"fileType"` // 文件类型 csv/xlsx diff --git a/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go b/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go index 7249dc3f..15d0f910 100644 --- a/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go +++ b/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go @@ -11,6 +11,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/file" @@ -48,6 +49,13 @@ func (s *BackupExportKPIProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_kpi:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 ims/amf/udm/smf/pcf/upf/mme/smsc FileType string `json:"fileType"` // 文件类型 csv/xlsx diff --git a/src/modules/crontask/processor/backup_export_log/backup_export_log.go b/src/modules/crontask/processor/backup_export_log/backup_export_log.go index 1c311680..ccb6d00b 100644 --- a/src/modules/crontask/processor/backup_export_log/backup_export_log.go +++ b/src/modules/crontask/processor/backup_export_log/backup_export_log.go @@ -9,6 +9,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" @@ -43,6 +44,13 @@ func (s *BackupExportLogProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_log:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 operate/login FileType string `json:"fileType"` // 文件类型 csv/xlsx diff --git a/src/modules/crontask/processor/backup_export_table/backup_export_table.go b/src/modules/crontask/processor/backup_export_table/backup_export_table.go index d540038f..2d817186 100644 --- a/src/modules/crontask/processor/backup_export_table/backup_export_table.go +++ b/src/modules/crontask/processor/backup_export_table/backup_export_table.go @@ -10,6 +10,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" @@ -54,6 +55,13 @@ func (s *BackupExportTableProcessor) Execute(data any) (any, error) { return nil, err } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_table:%d:%s", sysJob.JobId, params.TableName) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var affected int64 var errMsg error fileName := fmt.Sprintf("%s_export_%s.csv", strings.ToLower(params.TableName), time.Now().Format("20060102150405")) diff --git a/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go b/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go index 38cc6df5..a547c255 100644 --- a/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go +++ b/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go @@ -8,6 +8,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/file" neDataModel "be.ems/src/modules/network_data/model" @@ -47,6 +48,13 @@ func (s *BackupExportUDMProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_udm:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 auth/sub/voip/volte FileType string `json:"fileType"` // 文件类型 csv/txt diff --git a/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go b/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go index 07055073..5e8866e6 100644 --- a/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go +++ b/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go @@ -11,6 +11,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -38,6 +39,13 @@ func (s *BackupRemoveFileProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_remove_file:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params []struct { BackupPath string `json:"backupPath"` // 备份路径 /usr/local/omc/backup/{backupPath} diff --git a/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go b/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go index 806a2f69..9435f6f4 100644 --- a/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go +++ b/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go @@ -7,6 +7,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -29,6 +30,13 @@ func (s *DeleteAlarmRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_alarm_record:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go b/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go index 3ff4239c..b5483f73 100644 --- a/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go +++ b/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go @@ -1,4 +1,4 @@ -package delete_CDR_record +package delete_cdr_record import ( "encoding/json" @@ -8,6 +8,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -30,6 +31,13 @@ func (s *DeleteCDRRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_cdr_record:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/delete_data_record/delete_data_record.go b/src/modules/crontask/processor/delete_data_record/delete_data_record.go index 692a1af9..b4a44288 100644 --- a/src/modules/crontask/processor/delete_data_record/delete_data_record.go +++ b/src/modules/crontask/processor/delete_data_record/delete_data_record.go @@ -7,6 +7,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -51,6 +52,14 @@ func (s *DeleteDataRecordProcessor) Execute(data any) (any, error) { if params.StoreDays < 0 { return nil, fmt.Errorf("params storeDays less than 0 ") } + + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_data_record:%d:%s", sysJob.JobId, params.TableName) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 指定表名 tx := db.DB("").Table(params.TableName) diff --git a/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go b/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go index ea87fd9a..68597053 100644 --- a/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go +++ b/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go @@ -8,6 +8,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -30,6 +31,13 @@ func (s *DeleteKPIRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_kpi_record:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go index 982b7323..583f3920 100644 --- a/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go +++ b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go @@ -10,6 +10,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" neModel "be.ems/src/modules/network_element/model" @@ -37,6 +38,13 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_ne_config_backup:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // 保留天数 diff --git a/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go b/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go index d19662e8..289241ec 100644 --- a/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go +++ b/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go @@ -1,4 +1,4 @@ -package delete_UENB_record +package delete_uenb_record import ( "encoding/json" @@ -8,6 +8,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -30,6 +31,13 @@ func (s *DeleteUENBRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_uenb_record:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go index bd242c22..cc037dbf 100644 --- a/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go +++ b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go @@ -5,6 +5,7 @@ import ( "fmt" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" monitorService "be.ems/src/modules/monitor/service" ) @@ -31,6 +32,13 @@ func (s *MonitorSysResourceProcessor) Execute(data any) (any, error) { sysJob := options.SysJob logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:monitor_sys_resource:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { Interval float64 `json:"interval"` diff --git a/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go b/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go index e79e6f1d..dca6fb1d 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go +++ b/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go @@ -9,6 +9,7 @@ import ( "be.ems/src/framework/constants" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/parse" neModel "be.ems/src/modules/network_element/model" @@ -50,6 +51,13 @@ func (s *NeAlarmStateCheckProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_alarm_state_check:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params alarmParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) diff --git a/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go b/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go index 52a8b376..53402749 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go +++ b/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go @@ -14,6 +14,7 @@ import ( "be.ems/src/framework/constants" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/parse" neDataModel "be.ems/src/modules/network_data/model" @@ -71,6 +72,13 @@ func (s *NeAlarmStateCheckCMDProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_alarm_state_check_cmd:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params alarmParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) diff --git a/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go b/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go index 8e866a05..007d520b 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go +++ b/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go @@ -8,6 +8,7 @@ import ( "be.ems/src/framework/constants" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/parse" "github.com/tsmask/go-oam" @@ -52,6 +53,13 @@ func (s *NeAlarmStateCheckLicenseProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_alarm_state_check_license:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params alarmParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) diff --git a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go index 0bd28f0a..aa7b188d 100644 --- a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go +++ b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go @@ -5,6 +5,7 @@ import ( "path/filepath" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" neDataService "be.ems/src/modules/network_data/service" neModel "be.ems/src/modules/network_element/model" @@ -36,6 +37,13 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_config_backup:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + neList := s.neInfoService.Find(neModel.NeInfo{}, false, false) for _, neInfo := range neList { neTypeAndId := fmt.Sprintf("%s_%s", neInfo.NeType, neInfo.NeId) diff --git a/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go b/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go index ad16557a..699a3b19 100644 --- a/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go +++ b/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go @@ -4,6 +4,7 @@ import ( "fmt" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" neDataService "be.ems/src/modules/network_data/service" neModel "be.ems/src/modules/network_element/model" @@ -35,6 +36,13 @@ func (s *NeDataUDM) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_data_udm:%d", sysJob.JobId) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + neList := s.neInfoService.Find(neModel.NeInfo{NeType: "UDM"}, false, false) for _, neInfo := range neList { result[fmt.Sprintf("AuthNumber_%s", neInfo.NeId)] = s.udmAuthService.ResetData(neInfo.NeId)