fix: 修复定时任务关闭不生效问题

This commit is contained in:
TsMask
2025-02-26 17:53:56 +08:00
parent f583f0bffd
commit f337dfa683
4 changed files with 83 additions and 89 deletions

View File

@@ -11,11 +11,11 @@ import (
var c *cron.Cron var c *cron.Cron
// 任务队列 // 任务队列
var queueMap map[string]Queue var queueMap map[string]*Queue
// StartCron 启动调度任务实例 // StartCron 启动调度任务实例
func StartCron() { func StartCron() {
queueMap = make(map[string]Queue) queueMap = make(map[string]*Queue)
c = cron.New(cron.WithSeconds()) c = cron.New(cron.WithSeconds())
c.Start() c.Start()
} }
@@ -26,22 +26,22 @@ func StopCron() {
} }
// CreateQueue 创建队列注册处理器 // CreateQueue 创建队列注册处理器
func CreateQueue(name string, processor QueueProcessor) Queue { func CreateQueue(name string, processor QueueProcessor) *Queue {
queue := Queue{ queue := Queue{
Name: name, Name: name,
Processor: processor, Processor: &processor,
Job: &[]*QueueJob{}, Job: []QueueJob{},
} }
queueMap[name] = queue queueMap[name] = &queue
return queue return &queue
} }
// GetQueue 通过名称获取队列实例 // GetQueue 通过名称获取队列实例
func GetQueue(name string) Queue { func GetQueue(name string) *Queue {
if v, ok := queueMap[name]; ok { if v, ok := queueMap[name]; ok {
return v return v
} }
return Queue{} return nil
} }
// QueueNames 获取注册的队列名称 // QueueNames 获取注册的队列名称
@@ -56,8 +56,8 @@ func QueueNames() []string {
// Queue 任务队列 // Queue 任务队列
type Queue struct { type Queue struct {
Name string // 队列名 Name string // 队列名
Processor QueueProcessor Processor *QueueProcessor
Job *[]*QueueJob Job []QueueJob
} }
// QueueProcessor 队列处理函数接口 // QueueProcessor 队列处理函数接口
@@ -70,12 +70,12 @@ type QueueProcessor interface {
// RunJob 运行任务data是传入的数据 // RunJob 运行任务data是传入的数据
func (q *Queue) RunJob(data any, opts JobOptions) int { func (q *Queue) RunJob(data any, opts JobOptions) int {
job := &QueueJob{ job := QueueJob{
Status: Waiting, Status: Waiting,
Data: data, Data: data,
Opts: opts, Opts: opts,
queueName: q.Name, queueName: q.Name,
queueProcessor: &q.Processor, queueProcessor: q.Processor,
} }
// 非重复任务立即执行 // 非重复任务立即执行
@@ -86,11 +86,9 @@ func (q *Queue) RunJob(data any, opts JobOptions) int {
return Active return Active
} }
// 从切片 jobs 中删除指定索引位置的元素 // 从切片 jobs 中删除指定索引位置的元素
for i, v := range *q.Job { for i, v := range q.Job {
if v.cid == 0 { if v.cid == 0 {
jobs := *q.Job q.Job = append(q.Job[:i], q.Job[i+1:]...)
jobs = append(jobs[:i], jobs[i+1:]...)
*q.Job = jobs
break break
} }
} }
@@ -101,27 +99,25 @@ func (q *Queue) RunJob(data any, opts JobOptions) int {
// 添加新任务 // 添加新任务
cid, err := c.AddJob(opts.Cron, job) cid, err := c.AddJob(opts.Cron, job)
if err != nil { if err != nil {
newLog.Error(err, "err") cronLog.Error(err, "err")
job.Status = Failed job.Status = Failed
} }
job.cid = cid job.cid = cid
} }
*q.Job = append(*q.Job, job) q.Job = append(q.Job, job)
newLog.Info("RunJob", job.cid, opts.JobId, job.Status) cronLog.Info("RunJob", job.cid, opts.JobId, job.Status)
return job.Status return job.Status
} }
// RemoveJob 移除任务 // RemoveJob 移除任务
func (q *Queue) RemoveJob(jobId string) bool { func (q *Queue) RemoveJob(jobId int64) bool {
for i, v := range *q.Job { for i, v := range q.Job {
if jobId == v.Opts.JobId { if jobId == v.Opts.JobId {
newLog.Info("RemoveJob", v.cid, jobId, v.Status) cronLog.Info("RemoveJob", v.cid, jobId, v.Status)
c.Remove(v.cid) c.Remove(v.cid)
// 从切片 jobs 中删除指定索引位置的元素 // 从切片 jobs 中删除指定索引位置的元素
jobs := *q.Job q.Job = append(q.Job[:i], q.Job[i+1:]...)
jobs = append(jobs[:i], jobs[i+1:]...)
*q.Job = jobs
return true return true
} }
} }
@@ -138,7 +134,7 @@ const (
// JobOptions 任务参数信息 // JobOptions 任务参数信息
type JobOptions struct { type JobOptions struct {
JobId string // 执行任务编号 JobId int64 // 执行任务编号
Cron string // 重复任务cron表达式 Cron string // 重复任务cron表达式
} }
@@ -150,29 +146,28 @@ type QueueJob struct {
Opts JobOptions Opts JobOptions
cid cron.EntryID // 执行ID cid cron.EntryID // 执行ID
queueName string // 队列名
queueName string //队列 queueProcessor *QueueProcessor // 队列处理器接口实现
queueProcessor *QueueProcessor
} }
// GetJob 获取当前执行任务 // GetJob 获取当前执行任务
func (job *QueueJob) GetJob(repeat bool) *QueueJob { func (qj QueueJob) GetJob(repeat bool) QueueJob {
q := GetQueue(job.queueName) q := GetQueue(qj.queueName)
for _, v := range *q.Job { for _, v := range q.Job {
if repeat && v.Opts.JobId == job.Opts.JobId { if repeat && v.Opts.JobId == qj.Opts.JobId {
return v return v
} }
if !repeat && v.cid == 0 { if !repeat && v.cid == 0 {
return v return v
} }
} }
return job return qj
} }
// Run 实现的接口函数 // Run 实现的接口函数
func (s QueueJob) Run() { func (qj QueueJob) Run() {
// 检查当前任务 // 检查当前任务
job := s.GetJob(s.cid != 0) job := qj.GetJob(qj.cid != 0)
// Active 状态不执行 // Active 状态不执行
if job.Status == Active { if job.Status == Active {
@@ -187,23 +182,23 @@ func (s QueueJob) Run() {
err = fmt.Errorf("%v", r) err = fmt.Errorf("%v", r)
} }
job.Status = Failed job.Status = Failed
newLog.Error(err, "failed", job) cronLog.Error(err, "failed", job)
} }
}() }()
// 开始执行 // 开始执行
job.Status = Active job.Status = Active
job.Timestamp = time.Now().UnixMilli() job.Timestamp = time.Now().UnixMilli()
newLog.Info("run", job.cid, job.Opts.JobId) cronLog.Info("run", job.cid, job.Opts.JobId)
// 获取队列处理器接口实现 // 获取队列处理器接口实现
processor := *job.queueProcessor processor := *job.queueProcessor
result, err := processor.Execute(job.Data) result, err := processor.Execute(job.Data)
if err != nil { if err != nil {
job.Status = Failed job.Status = Failed
newLog.Error(err, "failed", job) cronLog.Error(err, "failed", job)
} else { } else {
job.Status = Completed job.Status = Completed
newLog.Completed(result, "completed", job) cronLog.Completed(result, "completed", job)
} }
} }

View File

@@ -5,31 +5,30 @@ import (
"time" "time"
"be.ems/src/framework/constants" "be.ems/src/framework/constants"
"be.ems/src/modules/monitor/model" monitorModel "be.ems/src/modules/monitor/model"
"be.ems/src/modules/monitor/repository" monitorRepository "be.ems/src/modules/monitor/repository"
) )
// 实例任务执行日志收集 // cronLog 实例任务执行日志收集
var newLog = cronlog{} var cronLog = clog{}
// cronlog 任务执行日志收集 // clog 任务执行日志收集
type cronlog struct{} type clog struct{}
// Info 任务普通信息收集 // Info 任务普通信息收集
func (s cronlog) Info(msg string, keysAndValues ...any) { func (clog) Info(msg string, keysAndValues ...any) {
// logger.Infof("Info msg: %v ====> kv: %v", msg, keysAndValues) //log.Printf("Info msg: %v ====> kv: %v", msg, keysAndValues)
} }
// Error 任务异常错误收集 // Error 任务异常错误收集
func (s cronlog) Error(err error, msg string, keysAndValues ...any) { func (clog) Error(err error, msg string, keysAndValues ...any) {
// logger.Errorf("Error: %v -> msg: %v ====> kv: %v", err, msg, keysAndValues) //log.Printf("Error: %v -> msg: %v ====> kv: %v", err, msg, keysAndValues)
// logger.Errorf("k0: %v", keysAndValues[0].(*QueueJob)) //log.Printf("k0: %v", keysAndValues[0].(QueueJob))
// 指定的错误收集 // 指定的错误收集
if msg == "failed" { if msg == "failed" {
// 任务对象 // 任务对象
job := keysAndValues[0].(*QueueJob) job := keysAndValues[0].(QueueJob)
// 读取任务信息进行保存日志 // 读取任务信息进行保存日志
if data, ok := job.Data.(JobData); ok { if data, ok := job.Data.(JobData); ok {
@@ -45,14 +44,14 @@ func (s cronlog) Error(err error, msg string, keysAndValues ...any) {
} }
// Completed 任务完成return的结果收集 // Completed 任务完成return的结果收集
func (s cronlog) Completed(result any, msg string, keysAndValues ...any) { func (clog) Completed(result any, msg string, keysAndValues ...any) {
// logger.Infof("Completed: %v -> msg: %v ====> kv: %v", result, msg, keysAndValues) //log.Printf("Completed: %v -> msg: %v ====> kv: %v", result, msg, keysAndValues)
// logger.Infof("k0: %v", keysAndValues[0].(*QueueJob)) //log.Printf("k0: %v", keysAndValues[0].(QueueJob))
// 指定的完成收集 // 指定的完成收集
if msg == "completed" { if msg == "completed" {
// 任务对象 // 任务对象
job := keysAndValues[0].(*QueueJob) job := keysAndValues[0].(QueueJob)
// 读取任务信息进行保存日志 // 读取任务信息进行保存日志
if data, ok := job.Data.(JobData); ok { if data, ok := job.Data.(JobData); ok {
@@ -75,7 +74,7 @@ type jobLogData struct {
} }
// SaveLog 日志记录保存 // SaveLog 日志记录保存
func (jl *jobLogData) SaveLog(status string) { func (jl *jobLogData) SaveLog(statusFlag string) {
// 读取任务信息 // 读取任务信息
sysJob := jl.Data.SysJob sysJob := jl.Data.SysJob
@@ -85,15 +84,15 @@ func (jl *jobLogData) SaveLog(status string) {
} }
// 结果信息key的Name // 结果信息key的Name
resultNmae := "failed" resultName := "failed"
if status == constants.STATUS_YES { if statusFlag == constants.STATUS_YES {
resultNmae = "completed" resultName = "completed"
} }
// 结果信息序列化字符串 // 结果信息序列化字符串
jsonByte, _ := json.Marshal(map[string]any{ jsonByte, _ := json.Marshal(map[string]any{
"name": resultNmae, "cron": jl.Data.Repeat,
"crom": jl.Data.Repeat, "name": resultName,
"message": jl.Result, "message": jl.Result,
}) })
jobMsg := string(jsonByte) jobMsg := string(jsonByte)
@@ -103,17 +102,18 @@ func (jl *jobLogData) SaveLog(status string) {
// 创建日志对象 // 创建日志对象
duration := time.Since(time.UnixMilli(jl.Timestamp)) duration := time.Since(time.UnixMilli(jl.Timestamp))
sysJobLog := model.SysJobLog{ sysJobLog := monitorModel.SysJobLog{
JobName: sysJob.JobName, JobName: sysJob.JobName,
JobGroup: sysJob.JobGroup, JobGroup: sysJob.JobGroup,
InvokeTarget: sysJob.InvokeTarget, InvokeTarget: sysJob.InvokeTarget,
TargetParams: sysJob.TargetParams, TargetParams: sysJob.TargetParams,
StatusFlag: status, StatusFlag: statusFlag,
JobMsg: jobMsg, JobMsg: jobMsg,
CostTime: duration.Milliseconds(), CostTime: duration.Milliseconds(),
CreateTime: time.Now().UnixMilli(),
} }
// 插入数据 // 插入数据
repository.NewSysJobLog.Insert(sysJobLog) monitorRepository.NewSysJobLog.Insert(sysJobLog)
} }
// JobData 调度任务日志收集结构体,执行任务时传入的接收参数 // JobData 调度任务日志收集结构体,执行任务时传入的接收参数
@@ -121,5 +121,5 @@ type JobData struct {
// 触发执行cron重复多次 // 触发执行cron重复多次
Repeat bool Repeat bool
// 定时任务调度表记录信息 // 定时任务调度表记录信息
SysJob model.SysJob SysJob monitorModel.SysJob
} }

View File

@@ -1,10 +1,9 @@
package cron package cron
import ( import (
"log"
"testing" "testing"
"time" "time"
"be.ems/src/framework/logger"
) )
// 参考文章: // 参考文章:
@@ -14,13 +13,13 @@ func init() {
StartCron() StartCron()
} }
// 简单示例 队列任务处理 // NewSimple 简单示例 队列任务处理
var NewSimple = &Simple{} var NewSimple = &Simple{}
type Simple struct{} type Simple struct{}
func (s *Simple) Execute(data any) (any, error) { func (s *Simple) Execute(data any) (any, error) {
logger.Infof("执行=> %+v ", data) log.Printf("执行=> %+v ", data)
// 实现任务处理逻辑 // 实现任务处理逻辑
return data, nil return data, nil
} }
@@ -32,7 +31,7 @@ func TestSimple(t *testing.T) {
"ok": "ok", "ok": "ok",
"data": "data", "data": "data",
}, JobOptions{ }, JobOptions{
JobId: "101", JobId: 101,
}) })
simpleC := CreateQueue("simple", NewSimple) simpleC := CreateQueue("simple", NewSimple)
@@ -40,7 +39,7 @@ func TestSimple(t *testing.T) {
"corn": "*/5 * * * * *", "corn": "*/5 * * * * *",
"id": "102", "id": "102",
}, JobOptions{ }, JobOptions{
JobId: "102", JobId: 102,
Cron: "*/5 * * * * *", Cron: "*/5 * * * * *",
}) })
@@ -48,7 +47,7 @@ func TestSimple(t *testing.T) {
// "corn": "*/15 * * * * *", // "corn": "*/15 * * * * *",
// "id": "103", // "id": "103",
// }, JobOptions{ // }, JobOptions{
// JobId: "103", // JobId: 103,
// Cron: "*/15 * * * * *", // Cron: "*/15 * * * * *",
// }) // })
@@ -57,7 +56,7 @@ func TestSimple(t *testing.T) {
select {} select {}
} }
// Foo 队列任务处理 // NewFooProcessor 等待执行示例 队列任务处理
var NewFooProcessor = &FooProcessor{ var NewFooProcessor = &FooProcessor{
progress: 0, progress: 0,
count: 0, count: 0,
@@ -69,7 +68,7 @@ type FooProcessor struct {
} }
func (s *FooProcessor) Execute(data any) (any, error) { func (s *FooProcessor) Execute(data any) (any, error) {
logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data) log.Printf("执行 %d %d => %+v ", s.count, s.progress, data)
s.count++ s.count++
// 实现任务处理逻辑 // 实现任务处理逻辑
@@ -78,7 +77,7 @@ func (s *FooProcessor) Execute(data any) (any, error) {
for i < 10 { for i < 10 {
// 获取任务进度 // 获取任务进度
progress := s.progress progress := s.progress
logger.Infof("data: %v => 任务进度:%d", data, progress) log.Printf("data: %v => 任务进度:%d", data, progress)
// 延迟响应 // 延迟响应
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
i++ i++
@@ -94,21 +93,21 @@ func TestFoo(t *testing.T) {
foo.RunJob(map[string]string{ foo.RunJob(map[string]string{
"data": "2", "data": "2",
}, JobOptions{ }, JobOptions{
JobId: "2", JobId: 2,
}) })
fooC := CreateQueue("foo", NewFooProcessor) fooC := CreateQueue("foo", NewFooProcessor)
fooC.RunJob(map[string]string{ fooC.RunJob(map[string]string{
"corn": "*/5 * * * * *", "corn": "*/5 * * * * *",
}, JobOptions{ }, JobOptions{
JobId: "3", JobId: 3,
Cron: "*/5 * * * * *", Cron: "*/5 * * * * *",
}) })
select {} select {}
} }
// Bar 队列任务处理 // NewBarProcessor 错误中断示例 队列任务处理
var NewBarProcessor = &BarProcessor{ var NewBarProcessor = &BarProcessor{
progress: 0, progress: 0,
count: 0, count: 0,
@@ -120,7 +119,7 @@ type BarProcessor struct {
} }
func (s *BarProcessor) Execute(data any) (any, error) { func (s *BarProcessor) Execute(data any) (any, error) {
logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data) log.Printf("执行 %d %d => %+v ", s.count, s.progress, data)
s.count++ s.count++
// 实现任务处理逻辑 // 实现任务处理逻辑
@@ -129,7 +128,7 @@ func (s *BarProcessor) Execute(data any) (any, error) {
for i < 5 { for i < 5 {
// 获取任务进度 // 获取任务进度
progress := s.progress progress := s.progress
logger.Infof("data: %v => 任务进度:%d", data, progress) log.Printf("data: %v => 任务进度:%d", data, progress)
// 延迟响应 // 延迟响应
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
// 程序中途执行错误 // 程序中途执行错误
@@ -154,14 +153,14 @@ func TestBar(t *testing.T) {
bar.RunJob(map[string]string{ bar.RunJob(map[string]string{
"data": "wdf", "data": "wdf",
}, JobOptions{ }, JobOptions{
JobId: "81923", JobId: 752,
}) })
barC := CreateQueue("bar", NewBarProcessor) barC := CreateQueue("bar", NewBarProcessor)
barC.RunJob(map[string]string{ barC.RunJob(map[string]string{
"corn": "*/5 * * * * *", "corn": "*/5 * * * * *",
}, JobOptions{ }, JobOptions{
JobId: "789", JobId: 756,
Cron: "*/5 * * * * *", Cron: "*/5 * * * * *",
}) })

View File

@@ -124,7 +124,7 @@ func (s SysJob) insertQueueJob(sysJob model.SysJob, repeat bool) bool {
if !repeat { if !repeat {
// 执行单次任务 // 执行单次任务
status := queue.RunJob(options, cron.JobOptions{ status := queue.RunJob(options, cron.JobOptions{
JobId: fmt.Sprint(sysJob.JobId), JobId: sysJob.JobId,
}) })
// 执行中或等待中的都返回正常 // 执行中或等待中的都返回正常
return status == cron.Active || status == cron.Waiting return status == cron.Active || status == cron.Waiting
@@ -132,7 +132,7 @@ func (s SysJob) insertQueueJob(sysJob model.SysJob, repeat bool) bool {
// 执行重复任务 // 执行重复任务
queue.RunJob(options, cron.JobOptions{ queue.RunJob(options, cron.JobOptions{
JobId: fmt.Sprint(sysJob.JobId), JobId: sysJob.JobId,
Cron: sysJob.CronExpression, Cron: sysJob.CronExpression,
}) })
return true return true
@@ -145,7 +145,7 @@ func (s SysJob) deleteQueueJob(sysJob model.SysJob) bool {
if queue.Name != sysJob.InvokeTarget { if queue.Name != sysJob.InvokeTarget {
return false return false
} }
return queue.RemoveJob(fmt.Sprint(sysJob.JobId)) return queue.RemoveJob(sysJob.JobId)
} }
// Reset 重置初始调度任务 // Reset 重置初始调度任务