From 0a2d83fd7dd605c009e6d0c9362af8081b0be1ec Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Fri, 27 Jun 2025 11:38:28 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=85=B3=E9=97=AD=E4=B8=8D=E7=94=9F=E6=95=88?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/framework/cron/cron.go | 79 ++++++++++------------ src/framework/cron/{log.go => cron_log.go} | 66 +++++++++--------- src/framework/cron/cron_test.go | 33 +++++---- src/modules/monitor/service/sys_job.go | 7 +- 4 files changed, 90 insertions(+), 95 deletions(-) rename src/framework/cron/{log.go => cron_log.go} (53%) diff --git a/src/framework/cron/cron.go b/src/framework/cron/cron.go index e835d375..ce2bc811 100644 --- a/src/framework/cron/cron.go +++ b/src/framework/cron/cron.go @@ -11,11 +11,11 @@ import ( var c *cron.Cron // 任务队列 -var queueMap map[string]Queue +var queueMap map[string]*Queue // StartCron 启动调度任务实例 func StartCron() { - queueMap = make(map[string]Queue) + queueMap = make(map[string]*Queue) c = cron.New(cron.WithSeconds()) c.Start() } @@ -26,22 +26,22 @@ func StopCron() { } // CreateQueue 创建队列注册处理器 -func CreateQueue(name string, processor QueueProcessor) Queue { +func CreateQueue(name string, processor QueueProcessor) *Queue { queue := Queue{ Name: name, - Processor: processor, - Job: &[]*QueueJob{}, + Processor: &processor, + Job: []QueueJob{}, } - queueMap[name] = queue - return queue + queueMap[name] = &queue + return &queue } // GetQueue 通过名称获取队列实例 -func GetQueue(name string) Queue { +func GetQueue(name string) *Queue { if v, ok := queueMap[name]; ok { return v } - return Queue{} + return nil } // QueueNames 获取注册的队列名称 @@ -56,8 +56,8 @@ func QueueNames() []string { // Queue 任务队列 type Queue struct { Name string // 队列名 - Processor QueueProcessor - Job *[]*QueueJob + Processor *QueueProcessor + Job []QueueJob } // QueueProcessor 队列处理函数接口 @@ -70,12 +70,12 @@ type QueueProcessor interface { // RunJob 运行任务,data是传入的数据 func (q *Queue) RunJob(data any, opts JobOptions) int { - job := &QueueJob{ + job := QueueJob{ Status: Waiting, Data: data, Opts: opts, queueName: q.Name, - queueProcessor: &q.Processor, + queueProcessor: q.Processor, } // 非重复任务立即执行 @@ -86,11 +86,9 @@ func (q *Queue) RunJob(data any, opts JobOptions) int { return Active } // 从切片 jobs 中删除指定索引位置的元素 - for i, v := range *q.Job { + for i, v := range q.Job { if v.cid == 0 { - jobs := *q.Job - jobs = append(jobs[:i], jobs[i+1:]...) - *q.Job = jobs + q.Job = append(q.Job[:i], q.Job[i+1:]...) break } } @@ -101,27 +99,25 @@ func (q *Queue) RunJob(data any, opts JobOptions) int { // 添加新任务 cid, err := c.AddJob(opts.Cron, job) if err != nil { - newLog.Error(err, "err") + cronLog.Error(err, "err") job.Status = Failed } job.cid = cid } - *q.Job = append(*q.Job, job) - newLog.Info("RunJob", job.cid, opts.JobId, job.Status) + q.Job = append(q.Job, job) + cronLog.Info("RunJob", job.cid, opts.JobId, job.Status) return job.Status } // RemoveJob 移除任务 -func (q *Queue) RemoveJob(jobId string) bool { - for i, v := range *q.Job { +func (q *Queue) RemoveJob(jobId int64) bool { + for i, v := range q.Job { if jobId == v.Opts.JobId { - newLog.Info("RemoveJob", v.cid, jobId, v.Status) + cronLog.Info("RemoveJob", v.cid, jobId, v.Status) c.Remove(v.cid) // 从切片 jobs 中删除指定索引位置的元素 - jobs := *q.Job - jobs = append(jobs[:i], jobs[i+1:]...) - *q.Job = jobs + q.Job = append(q.Job[:i], q.Job[i+1:]...) return true } } @@ -138,7 +134,7 @@ const ( // JobOptions 任务参数信息 type JobOptions struct { - JobId string // 执行任务编号 + JobId int64 // 执行任务编号 Cron string // 重复任务cron表达式 } @@ -149,30 +145,29 @@ type QueueJob struct { Data any // 执行任务时传入的参数 Opts JobOptions - cid cron.EntryID // 执行ID - - queueName string //队列名 - queueProcessor *QueueProcessor + cid cron.EntryID // 执行ID + queueName string // 队列名 + queueProcessor *QueueProcessor // 队列处理器接口实现 } // GetJob 获取当前执行任务 -func (job *QueueJob) GetJob(repeat bool) *QueueJob { - q := GetQueue(job.queueName) - for _, v := range *q.Job { - if repeat && v.Opts.JobId == job.Opts.JobId { +func (qj QueueJob) GetJob(repeat bool) QueueJob { + q := GetQueue(qj.queueName) + for _, v := range q.Job { + if repeat && v.Opts.JobId == qj.Opts.JobId { return v } if !repeat && v.cid == 0 { return v } } - return job + return qj } // Run 实现的接口函数 -func (s QueueJob) Run() { +func (qj QueueJob) Run() { // 检查当前任务 - job := s.GetJob(s.cid != 0) + job := qj.GetJob(qj.cid != 0) // Active 状态不执行 if job.Status == Active { @@ -187,23 +182,23 @@ func (s QueueJob) Run() { err = fmt.Errorf("%v", r) } job.Status = Failed - newLog.Error(err, "failed", job) + cronLog.Error(err, "failed", job) } }() // 开始执行 job.Status = Active job.Timestamp = time.Now().UnixMilli() - newLog.Info("run", job.cid, job.Opts.JobId) + cronLog.Info("run", job.cid, job.Opts.JobId) // 获取队列处理器接口实现 processor := *job.queueProcessor result, err := processor.Execute(job.Data) if err != nil { job.Status = Failed - newLog.Error(err, "failed", job) + cronLog.Error(err, "failed", job) } else { job.Status = Completed - newLog.Completed(result, "completed", job) + cronLog.Completed(result, "completed", job) } } diff --git a/src/framework/cron/log.go b/src/framework/cron/cron_log.go similarity index 53% rename from src/framework/cron/log.go rename to src/framework/cron/cron_log.go index 22140a2d..6f2efd49 100644 --- a/src/framework/cron/log.go +++ b/src/framework/cron/cron_log.go @@ -4,32 +4,31 @@ import ( "encoding/json" "time" - "be.ems/src/framework/constants/common" - "be.ems/src/modules/monitor/model" - "be.ems/src/modules/monitor/repository" + "be.ems/src/framework/constants" + monitorModel "be.ems/src/modules/monitor/model" + monitorRepository "be.ems/src/modules/monitor/repository" ) -// 实例任务执行日志收集 -var newLog = cronlog{} +// cronLog 实例任务执行日志收集 +var cronLog = clog{} -// cronlog 任务执行日志收集 -type cronlog struct{} +// clog 任务执行日志收集 +type clog struct{} // Info 任务普通信息收集 -func (s cronlog) Info(msg string, keysAndValues ...any) { - // logger.Infof("Info msg: %v ====> kv: %v", msg, keysAndValues) - +func (clog) Info(msg string, keysAndValues ...any) { + //log.Printf("Info msg: %v ====> kv: %v", msg, keysAndValues) } // Error 任务异常错误收集 -func (s cronlog) Error(err error, msg string, keysAndValues ...any) { - // logger.Errorf("Error: %v -> msg: %v ====> kv: %v", err, msg, keysAndValues) - // logger.Errorf("k0: %v", keysAndValues[0].(*QueueJob)) +func (clog) Error(err error, msg string, keysAndValues ...any) { + //log.Printf("Error: %v -> msg: %v ====> kv: %v", err, msg, keysAndValues) + //log.Printf("k0: %v", keysAndValues[0].(QueueJob)) // 指定的错误收集 if msg == "failed" { // 任务对象 - job := keysAndValues[0].(*QueueJob) + job := keysAndValues[0].(QueueJob) // 读取任务信息进行保存日志 if data, ok := job.Data.(JobData); ok { @@ -39,20 +38,20 @@ func (s cronlog) Error(err error, msg string, keysAndValues ...any) { Data: data, Result: err.Error(), } - jobLog.SaveLog(common.STATUS_NO) + jobLog.SaveLog(constants.STATUS_NO) } } } // Completed 任务完成return的结果收集 -func (s cronlog) Completed(result any, msg string, keysAndValues ...any) { - // logger.Infof("Completed: %v -> msg: %v ====> kv: %v", result, msg, keysAndValues) - // logger.Infof("k0: %v", keysAndValues[0].(*QueueJob)) +func (clog) Completed(result any, msg string, keysAndValues ...any) { + //log.Printf("Completed: %v -> msg: %v ====> kv: %v", result, msg, keysAndValues) + //log.Printf("k0: %v", keysAndValues[0].(QueueJob)) // 指定的完成收集 if msg == "completed" { // 任务对象 - job := keysAndValues[0].(*QueueJob) + job := keysAndValues[0].(QueueJob) // 读取任务信息进行保存日志 if data, ok := job.Data.(JobData); ok { @@ -62,7 +61,7 @@ func (s cronlog) Completed(result any, msg string, keysAndValues ...any) { Data: data, Result: result, } - jobLog.SaveLog(common.STATUS_YES) + jobLog.SaveLog(constants.STATUS_YES) } } } @@ -75,45 +74,46 @@ type jobLogData struct { } // SaveLog 日志记录保存 -func (jl *jobLogData) SaveLog(status string) { +func (jl *jobLogData) SaveLog(statusFlag string) { // 读取任务信息 sysJob := jl.Data.SysJob // 任务日志不需要记录 - if sysJob.SaveLog == "" || sysJob.SaveLog == common.STATUS_NO { + if sysJob.SaveLog == "" || sysJob.SaveLog == constants.STATUS_NO { return } // 结果信息key的Name - resultNmae := "failed" - if status == common.STATUS_YES { - resultNmae = "completed" + resultName := "failed" + if statusFlag == constants.STATUS_YES { + resultName = "completed" } // 结果信息序列化字符串 jsonByte, _ := json.Marshal(map[string]any{ - "name": resultNmae, - "crom": jl.Data.Repeat, + "cron": jl.Data.Repeat, + "name": resultName, "message": jl.Result, }) jobMsg := string(jsonByte) - if len(jobMsg) > 500 { - jobMsg = jobMsg[:500] + if len(jobMsg) > 2000 { + jobMsg = jobMsg[:2000] } // 创建日志对象 duration := time.Since(time.UnixMilli(jl.Timestamp)) - sysJobLog := model.SysJobLog{ + sysJobLog := monitorModel.SysJobLog{ JobName: sysJob.JobName, JobGroup: sysJob.JobGroup, InvokeTarget: sysJob.InvokeTarget, TargetParams: sysJob.TargetParams, - Status: status, + Status: statusFlag, JobMsg: jobMsg, CostTime: duration.Milliseconds(), + CreateTime: time.Now().UnixMilli(), } // 插入数据 - repository.NewSysJobLogImpl.InsertJobLog(sysJobLog) + monitorRepository.NewSysJobLogImpl.InsertJobLog(sysJobLog) } // JobData 调度任务日志收集结构体,执行任务时传入的接收参数 @@ -121,5 +121,5 @@ type JobData struct { // 触发执行cron重复多次 Repeat bool // 定时任务调度表记录信息 - SysJob model.SysJob + SysJob monitorModel.SysJob } diff --git a/src/framework/cron/cron_test.go b/src/framework/cron/cron_test.go index 53af81a6..d21b7238 100644 --- a/src/framework/cron/cron_test.go +++ b/src/framework/cron/cron_test.go @@ -1,10 +1,9 @@ package cron import ( + "log" "testing" "time" - - "be.ems/src/framework/logger" ) // 参考文章: @@ -14,13 +13,13 @@ func init() { StartCron() } -// 简单示例 队列任务处理 +// NewSimple 简单示例 队列任务处理 var NewSimple = &Simple{} type Simple struct{} func (s *Simple) Execute(data any) (any, error) { - logger.Infof("执行=> %+v ", data) + log.Printf("执行=> %+v ", data) // 实现任务处理逻辑 return data, nil } @@ -32,7 +31,7 @@ func TestSimple(t *testing.T) { "ok": "ok", "data": "data", }, JobOptions{ - JobId: "101", + JobId: 101, }) simpleC := CreateQueue("simple", NewSimple) @@ -40,7 +39,7 @@ func TestSimple(t *testing.T) { "corn": "*/5 * * * * *", "id": "102", }, JobOptions{ - JobId: "102", + JobId: 102, Cron: "*/5 * * * * *", }) @@ -48,7 +47,7 @@ func TestSimple(t *testing.T) { // "corn": "*/15 * * * * *", // "id": "103", // }, JobOptions{ - // JobId: "103", + // JobId: 103, // Cron: "*/15 * * * * *", // }) @@ -57,7 +56,7 @@ func TestSimple(t *testing.T) { select {} } -// Foo 队列任务处理 +// NewFooProcessor 等待执行示例 队列任务处理 var NewFooProcessor = &FooProcessor{ progress: 0, count: 0, @@ -69,7 +68,7 @@ type FooProcessor struct { } func (s *FooProcessor) Execute(data any) (any, error) { - logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data) + log.Printf("执行 %d %d => %+v ", s.count, s.progress, data) s.count++ // 实现任务处理逻辑 @@ -78,7 +77,7 @@ func (s *FooProcessor) Execute(data any) (any, error) { for i < 10 { // 获取任务进度 progress := s.progress - logger.Infof("data: %v => 任务进度:%d", data, progress) + log.Printf("data: %v => 任务进度:%d", data, progress) // 延迟响应 time.Sleep(time.Second * 2) i++ @@ -94,21 +93,21 @@ func TestFoo(t *testing.T) { foo.RunJob(map[string]string{ "data": "2", }, JobOptions{ - JobId: "2", + JobId: 2, }) fooC := CreateQueue("foo", NewFooProcessor) fooC.RunJob(map[string]string{ "corn": "*/5 * * * * *", }, JobOptions{ - JobId: "3", + JobId: 3, Cron: "*/5 * * * * *", }) select {} } -// Bar 队列任务处理 +// NewBarProcessor 错误中断示例 队列任务处理 var NewBarProcessor = &BarProcessor{ progress: 0, count: 0, @@ -120,7 +119,7 @@ type BarProcessor struct { } func (s *BarProcessor) Execute(data any) (any, error) { - logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data) + log.Printf("执行 %d %d => %+v ", s.count, s.progress, data) s.count++ // 实现任务处理逻辑 @@ -129,7 +128,7 @@ func (s *BarProcessor) Execute(data any) (any, error) { for i < 5 { // 获取任务进度 progress := s.progress - logger.Infof("data: %v => 任务进度:%d", data, progress) + log.Printf("data: %v => 任务进度:%d", data, progress) // 延迟响应 time.Sleep(time.Second * 2) // 程序中途执行错误 @@ -154,14 +153,14 @@ func TestBar(t *testing.T) { bar.RunJob(map[string]string{ "data": "wdf", }, JobOptions{ - JobId: "81923", + JobId: 752, }) barC := CreateQueue("bar", NewBarProcessor) barC.RunJob(map[string]string{ "corn": "*/5 * * * * *", }, JobOptions{ - JobId: "789", + JobId: 756, Cron: "*/5 * * * * *", }) diff --git a/src/modules/monitor/service/sys_job.go b/src/modules/monitor/service/sys_job.go index af1ca91e..9111ed97 100644 --- a/src/modules/monitor/service/sys_job.go +++ b/src/modules/monitor/service/sys_job.go @@ -5,6 +5,7 @@ import ( "be.ems/src/framework/constants/common" "be.ems/src/framework/cron" + "be.ems/src/framework/utils/parse" "be.ems/src/modules/monitor/model" "be.ems/src/modules/monitor/repository" ) @@ -143,7 +144,7 @@ func (r *SysJob) insertQueueJob(sysJob model.SysJob, repeat bool) bool { if !repeat { // 执行单次任务 status := queue.RunJob(options, cron.JobOptions{ - JobId: sysJob.JobID, + JobId: parse.Number(sysJob.JobID), }) // 执行中或等待中的都返回正常 return status == cron.Active || status == cron.Waiting @@ -151,7 +152,7 @@ func (r *SysJob) insertQueueJob(sysJob model.SysJob, repeat bool) bool { // 执行重复任务 queue.RunJob(options, cron.JobOptions{ - JobId: sysJob.JobID, + JobId: parse.Number(sysJob.JobID), Cron: sysJob.CronExpression, }) @@ -165,5 +166,5 @@ func (r *SysJob) deleteQueueJob(sysJob model.SysJob) bool { if queue.Name != sysJob.InvokeTarget { return false } - return queue.RemoveJob(sysJob.JobID) + return queue.RemoveJob(parse.Number(sysJob.JobID)) }