diff --git a/src/framework/cron/cron.go b/src/framework/cron/cron.go index ce2bc811..e45c01bf 100644 --- a/src/framework/cron/cron.go +++ b/src/framework/cron/cron.go @@ -1,6 +1,7 @@ package cron import ( + "errors" "fmt" "time" @@ -194,6 +195,9 @@ func (qj QueueJob) Run() { // 获取队列处理器接口实现 processor := *job.queueProcessor result, err := processor.Execute(job.Data) + if errors.Is(err, ErrTaskRunning) { + return + } if err != nil { job.Status = Failed cronLog.Error(err, "failed", job) diff --git a/src/framework/cron/cron_log.go b/src/framework/cron/cron_log.go index 6f2efd49..5853e781 100644 --- a/src/framework/cron/cron_log.go +++ b/src/framework/cron/cron_log.go @@ -2,6 +2,7 @@ package cron import ( "encoding/json" + "errors" "time" "be.ems/src/framework/constants" @@ -123,3 +124,6 @@ type JobData struct { // 定时任务调度表记录信息 SysJob monitorModel.SysJob } + +// ErrTaskRunning 任务正在运行错误 +var ErrTaskRunning = errors.New("task is running") diff --git a/src/framework/database/redis/expand.go b/src/framework/database/redis/expand.go index 85a8076a..1ede2195 100644 --- a/src/framework/database/redis/expand.go +++ b/src/framework/database/redis/expand.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "net" + "os" "sync" "time" @@ -173,3 +175,49 @@ func Expire(source, key string, expiration time.Duration) error { } return nil } + +// SetNX 设置分布式锁,不存在时设置 +func SetNX(source, key string, expiration time.Duration) bool { + // 数据源 + rdb := RDB(source) + if rdb == nil { + return false + } + hostname, ipAddr := hostnameAndIP() + value := fmt.Sprintf("%s@%s", hostname, ipAddr) + // 过期时间设置 + ctx := context.Background() + ok, err := rdb.SetNX(ctx, key, value, expiration).Result() + if err != nil { + logger.Errorf("redis SetNX err %v", err) + return false + } + return ok +} + +// hostnameAndIP 获取当前主机的名称和 IP 地址 +func hostnameAndIP() (string, string) { + hostname, err := os.Hostname() + if err != nil { + return "unknown", "unknown" + } + + // 获取本机的第一个非 loopback 的 IP 地址 + addrs, err := net.InterfaceAddrs() + if err != nil { + return hostname, "unknown" + } + var ipAddr string + for _, addr := range addrs { + // 只选择 IPv4 地址且不是回环地址 + ipNet, ok := addr.(*net.IPNet) + if ok && ipNet.IP.To4() != nil && !ipNet.IP.IsLoopback() { + ipAddr = ipNet.IP.String() + break + } + } + if ipAddr == "" { + return hostname, "unknown" + } + return hostname, ipAddr +} diff --git a/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go b/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go index 6de71d8c..f6ca98d0 100644 --- a/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go +++ b/src/modules/crontask/processor/backup_export_cdr/backup_export_cdr.go @@ -10,6 +10,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" @@ -55,6 +56,13 @@ func (s *BackupExportCDRProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_cdr:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 ims/smsc/smf/sgwc FileType string `json:"fileType"` // 文件类型 csv/xlsx diff --git a/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go b/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go index 6a91280c..5c8474b4 100644 --- a/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go +++ b/src/modules/crontask/processor/backup_export_kpi/backup_export_kpi.go @@ -11,6 +11,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/file" @@ -48,6 +49,13 @@ func (s *BackupExportKPIProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_kpi:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 ims/amf/udm/smf/pcf/upf/mme/smsc FileType string `json:"fileType"` // 文件类型 csv/xlsx diff --git a/src/modules/crontask/processor/backup_export_log/backup_export_log.go b/src/modules/crontask/processor/backup_export_log/backup_export_log.go index deeaef50..4580ac3b 100644 --- a/src/modules/crontask/processor/backup_export_log/backup_export_log.go +++ b/src/modules/crontask/processor/backup_export_log/backup_export_log.go @@ -9,6 +9,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" @@ -44,6 +45,13 @@ func (s *BackupExportLogProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_log:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 operate/login FileType string `json:"fileType"` // 文件类型 csv/xlsx diff --git a/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go b/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go index 864a7856..16466353 100644 --- a/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go +++ b/src/modules/crontask/processor/backup_export_udm/backup_export_udm.go @@ -8,6 +8,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/file" neDataModel "be.ems/src/modules/network_data/model" @@ -47,6 +48,13 @@ func (s *BackupExportUDMProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_export_udm:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + var params struct { DataType []string `json:"dataType"` // 类型支持 auth/sub/voip/volte FileType string `json:"fileType"` // 文件类型 csv/txt diff --git a/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go b/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go index d969eaad..93faff94 100644 --- a/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go +++ b/src/modules/crontask/processor/backup_remove_file/backup_remove_file.go @@ -11,6 +11,7 @@ import ( "time" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -38,6 +39,13 @@ func (s *BackupRemoveFileProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:backup_remove_file:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params []struct { BackupPath string `json:"backupPath"` // 备份路径 /usr/local/omc/backup/{backupPath} diff --git a/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go b/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go index 63aa7895..027e670b 100644 --- a/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go +++ b/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go @@ -6,6 +6,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -28,6 +29,13 @@ func (s *DeleteAlarmRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_alarm_record:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go b/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go index fdcd55cb..b9bd0b27 100644 --- a/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go +++ b/src/modules/crontask/processor/delete_cdr_record/delete_cdr_record.go @@ -7,6 +7,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -29,6 +30,13 @@ func (s *DeleteCDRRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_cdr_record:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go b/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go index 5b02150c..4cb07fe1 100644 --- a/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go +++ b/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go @@ -7,6 +7,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -29,6 +30,13 @@ func (s *DeleteKPIRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_kpi_record:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go index 7eb3414d..a6aeb4fb 100644 --- a/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go +++ b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go @@ -10,6 +10,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/date" neModel "be.ems/src/modules/network_element/model" @@ -37,6 +38,13 @@ func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_ne_config_backup:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // 保留天数 diff --git a/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go b/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go index 6a7f69b7..d0ad556c 100644 --- a/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go +++ b/src/modules/crontask/processor/delete_uenb_record/delete_uenb_record.go @@ -1,4 +1,4 @@ -package delete_UENB_record +package delete_uenb_record import ( "encoding/json" @@ -7,6 +7,7 @@ import ( "be.ems/src/framework/cron" "be.ems/src/framework/database/db" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" ) @@ -29,6 +30,13 @@ func (s *DeleteUENBRecordProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:delete_uenb_record:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { StoreDays int `json:"storeDays"` // store days diff --git a/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go index 401d0233..6ad158af 100644 --- a/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go +++ b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go @@ -5,6 +5,7 @@ import ( "fmt" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" monitorService "be.ems/src/modules/monitor/service" ) @@ -31,6 +32,13 @@ func (s *MonitorSysResourceProcessor) Execute(data any) (any, error) { sysJob := options.SysJob logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID) + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:monitor_sys_resource:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params struct { Interval float64 `json:"interval"` diff --git a/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go b/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go index 27c190cb..ff445544 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go +++ b/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go @@ -7,6 +7,7 @@ import ( "be.ems/src/framework/constants" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/parse" @@ -57,6 +58,13 @@ func (s *NeAlarmStateCheckProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_alarm_state_check:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params alarmParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) diff --git a/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go b/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go index 22bfa2a1..16d72983 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go +++ b/src/modules/crontask/processor/ne_alarm_state_check_cmd/ne_alarm_state_check_cmd.go @@ -12,6 +12,7 @@ import ( "be.ems/src/framework/constants" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/parse" neDataModel "be.ems/src/modules/network_data/model" @@ -72,6 +73,13 @@ func (s *NeAlarmStateCheckCMDProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_alarm_state_check_cmd:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params alarmParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) diff --git a/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go b/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go index 1a67a266..c417de1c 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go +++ b/src/modules/crontask/processor/ne_alarm_state_check_license/ne_alarm_state_check_license.go @@ -8,6 +8,7 @@ import ( "be.ems/src/framework/constants" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" "be.ems/src/framework/utils/parse" @@ -59,6 +60,13 @@ func (s *NeAlarmStateCheckLicenseProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_alarm_state_check_license:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params alarmParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) diff --git a/src/modules/crontask/processor/ne_alarm_state_check_udmdb_sync/ne_alarm_state_check_udmdb_sync.go b/src/modules/crontask/processor/ne_alarm_state_check_udmdb_sync/ne_alarm_state_check_udmdb_sync.go index fcda5b52..a2bae84c 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check_udmdb_sync/ne_alarm_state_check_udmdb_sync.go +++ b/src/modules/crontask/processor/ne_alarm_state_check_udmdb_sync/ne_alarm_state_check_udmdb_sync.go @@ -59,6 +59,13 @@ func (s *NeAlarmStateCheckUDMDBProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_alarm_state_check_udmdb_sync:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + // 读取参数值 var params alarmParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) diff --git a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go index a6f21a58..e47e743b 100644 --- a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go +++ b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go @@ -5,6 +5,7 @@ import ( "path/filepath" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" neDataService "be.ems/src/modules/network_data/service" neModel "be.ems/src/modules/network_element/model" @@ -36,6 +37,13 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_config_backup:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + neList := s.neInfoService.SelectList(neModel.NeInfo{}, false, false) for _, neInfo := range neList { neTypeAndId := fmt.Sprintf("%s_%s", neInfo.NeType, neInfo.NeId) diff --git a/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go b/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go index ae583ae9..4d7fef75 100644 --- a/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go +++ b/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go @@ -4,6 +4,7 @@ import ( "fmt" "be.ems/src/framework/cron" + "be.ems/src/framework/database/redis" "be.ems/src/framework/logger" neDataService "be.ems/src/modules/network_data/service" neModel "be.ems/src/modules/network_element/model" @@ -35,6 +36,13 @@ func (s *NeDataUDM) Execute(data any) (any, error) { "count": s.count, } + // 分布式锁,防止多个任务同时执行 + lockKey := fmt.Sprintf("processor:ne_data_udm:%s", sysJob.JobID) + if ok := redis.SetNX("", lockKey, 0); !ok { + return nil, cron.ErrTaskRunning + } + defer redis.Del("", lockKey) + neList := s.neInfoService.SelectList(neModel.NeInfo{NeType: "UDM"}, false, false) for _, neInfo := range neList { result[fmt.Sprintf("AuthNumber_%s", neInfo.NeId)] = s.udmAuthService.ResetData(neInfo.NeId)