feat: 合并Gin_Vue
This commit is contained in:
202
src/framework/cron/cron.go
Normal file
202
src/framework/cron/cron.go
Normal file
@@ -0,0 +1,202 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
// 定义内部调度任务实例
|
||||
var c *cron.Cron
|
||||
|
||||
// 任务队列
|
||||
var queueMap map[string]Queue
|
||||
|
||||
// StartCron 启动调度任务实例
|
||||
func StartCron() {
|
||||
queueMap = make(map[string]Queue)
|
||||
c = cron.New(cron.WithSeconds())
|
||||
c.Start()
|
||||
}
|
||||
|
||||
// StopCron 停止调度任务实例
|
||||
func StopCron() {
|
||||
c.Stop()
|
||||
}
|
||||
|
||||
// CreateQueue 创建队列注册处理器
|
||||
func CreateQueue(name string, processor QueueProcessor) Queue {
|
||||
queue := Queue{
|
||||
Name: name,
|
||||
Processor: processor,
|
||||
Job: &[]*QueueJob{},
|
||||
}
|
||||
queueMap[name] = queue
|
||||
return queue
|
||||
}
|
||||
|
||||
// GetQueue 通过名称获取队列实例
|
||||
func GetQueue(name string) Queue {
|
||||
if v, ok := queueMap[name]; ok {
|
||||
return v
|
||||
}
|
||||
return Queue{}
|
||||
}
|
||||
|
||||
// QueueNames 获取注册的队列名称
|
||||
func QueueNames() []string {
|
||||
keys := make([]string, 0, len(queueMap))
|
||||
for k := range queueMap {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// Queue 任务队列
|
||||
type Queue struct {
|
||||
Name string // 队列名
|
||||
Processor QueueProcessor
|
||||
Job *[]*QueueJob
|
||||
}
|
||||
|
||||
// QueueProcessor 队列处理函数接口
|
||||
type QueueProcessor interface {
|
||||
// Execute 实际执行函数
|
||||
Execute(data any) any
|
||||
}
|
||||
|
||||
// RunJob 运行任务,data是传入的数据
|
||||
func (q *Queue) RunJob(data any, opts JobOptions) int {
|
||||
job := &QueueJob{
|
||||
Status: Waiting,
|
||||
Data: data,
|
||||
Opts: opts,
|
||||
queueName: q.Name,
|
||||
queueProcessor: &q.Processor,
|
||||
}
|
||||
|
||||
// 非重复任务立即执行
|
||||
if opts.Cron == "" {
|
||||
// 获取执行的任务
|
||||
currentJob := job.GetJob(false)
|
||||
if currentJob.Status == Active {
|
||||
return Active
|
||||
}
|
||||
// 从切片 jobs 中删除指定索引位置的元素
|
||||
for i, v := range *q.Job {
|
||||
if v.cid == 0 {
|
||||
jobs := *q.Job
|
||||
jobs = append(jobs[:i], jobs[i+1:]...)
|
||||
*q.Job = jobs
|
||||
break
|
||||
}
|
||||
}
|
||||
go job.Run()
|
||||
} else {
|
||||
// 移除已存的任务ID
|
||||
q.RemoveJob(opts.JobId)
|
||||
// 添加新任务
|
||||
cid, err := c.AddJob(opts.Cron, job)
|
||||
if err != nil {
|
||||
newLog.Error(err, "err")
|
||||
job.Status = Failed
|
||||
}
|
||||
job.cid = cid
|
||||
}
|
||||
|
||||
*q.Job = append(*q.Job, job)
|
||||
newLog.Info("RunJob", job.cid, opts.JobId, job.Status)
|
||||
return job.Status
|
||||
}
|
||||
|
||||
// RemoveJob 移除任务
|
||||
func (q *Queue) RemoveJob(jobId string) bool {
|
||||
for i, v := range *q.Job {
|
||||
if jobId == v.Opts.JobId {
|
||||
newLog.Info("RemoveJob", v.cid, jobId, v.Status)
|
||||
c.Remove(v.cid)
|
||||
// 从切片 jobs 中删除指定索引位置的元素
|
||||
jobs := *q.Job
|
||||
jobs = append(jobs[:i], jobs[i+1:]...)
|
||||
*q.Job = jobs
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Status 任务执行状态
|
||||
const (
|
||||
Waiting = iota
|
||||
Active
|
||||
Completed
|
||||
Failed
|
||||
)
|
||||
|
||||
// JobOptions 任务参数信息
|
||||
type JobOptions struct {
|
||||
JobId string // 执行任务编号
|
||||
Cron string // 重复任务cron表达式
|
||||
}
|
||||
|
||||
// QueueJob 队列内部执行任务
|
||||
type QueueJob struct {
|
||||
Status int // 任务执行状态
|
||||
Timestamp int64 // 执行时间
|
||||
Data any // 执行任务时传入的参数
|
||||
Opts JobOptions
|
||||
|
||||
cid cron.EntryID // 执行ID
|
||||
|
||||
queueName string //队列名
|
||||
queueProcessor *QueueProcessor
|
||||
}
|
||||
|
||||
// GetJob 获取当前执行任务
|
||||
func (job *QueueJob) GetJob(repeat bool) *QueueJob {
|
||||
q := GetQueue(job.queueName)
|
||||
for _, v := range *q.Job {
|
||||
if repeat && v.Opts.JobId == job.Opts.JobId {
|
||||
return v
|
||||
}
|
||||
if !repeat && v.cid == 0 {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return job
|
||||
}
|
||||
|
||||
// Run 实现的接口函数
|
||||
func (s QueueJob) Run() {
|
||||
// 检查当前任务
|
||||
job := s.GetJob(s.cid != 0)
|
||||
|
||||
// Active 状态不执行
|
||||
if job.Status == Active {
|
||||
return
|
||||
}
|
||||
|
||||
// panics 异常收集
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err, ok := r.(error)
|
||||
if !ok {
|
||||
err = fmt.Errorf("%v", r)
|
||||
}
|
||||
job.Status = Failed
|
||||
newLog.Error(err, "failed", job)
|
||||
}
|
||||
}()
|
||||
|
||||
// 开始执行
|
||||
job.Status = Active
|
||||
job.Timestamp = time.Now().UnixMilli()
|
||||
newLog.Info("run", job.cid, job.Opts.JobId)
|
||||
|
||||
// 获取队列处理器接口实现
|
||||
processor := *job.queueProcessor
|
||||
result := processor.Execute(job.Data)
|
||||
job.Status = Completed
|
||||
newLog.Completed(result, "completed", job)
|
||||
}
|
||||
179
src/framework/cron/cron_test.go
Normal file
179
src/framework/cron/cron_test.go
Normal file
@@ -0,0 +1,179 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"ems.agt/src/framework/logger"
|
||||
)
|
||||
|
||||
// 参考文章:
|
||||
// https://blog.csdn.net/zjbyough/article/details/113853582
|
||||
// https://mp.weixin.qq.com/s/Ak7RBv1NuS-VBeDNo8_fww
|
||||
func init() {
|
||||
StartCron()
|
||||
}
|
||||
|
||||
// 简单示例 队列任务处理
|
||||
var NewSimple = &Simple{}
|
||||
|
||||
type Simple struct{}
|
||||
|
||||
func (s *Simple) Execute(data any) any {
|
||||
logger.Infof("执行=> %+v ", data)
|
||||
// 实现任务处理逻辑
|
||||
return data
|
||||
}
|
||||
|
||||
func TestSimple(t *testing.T) {
|
||||
|
||||
simple := CreateQueue("simple", NewSimple)
|
||||
simple.RunJob(map[string]string{
|
||||
"ok": "ok",
|
||||
"data": "data",
|
||||
}, JobOptions{
|
||||
JobId: "101",
|
||||
})
|
||||
|
||||
simpleC := CreateQueue("simple", NewSimple)
|
||||
simpleC.RunJob(map[string]string{
|
||||
"corn": "*/5 * * * * *",
|
||||
"id": "102",
|
||||
}, JobOptions{
|
||||
JobId: "102",
|
||||
Cron: "*/5 * * * * *",
|
||||
})
|
||||
|
||||
// simpleC.RunJob(map[string]string{
|
||||
// "corn": "*/15 * * * * *",
|
||||
// "id": "103",
|
||||
// }, JobOptions{
|
||||
// JobId: "103",
|
||||
// Cron: "*/15 * * * * *",
|
||||
// })
|
||||
|
||||
// simpleC.RemoveJob("102")
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
// Foo 队列任务处理
|
||||
var NewFooProcessor = &FooProcessor{
|
||||
progress: 0,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
type FooProcessor struct {
|
||||
progress int
|
||||
count int
|
||||
}
|
||||
|
||||
func (s *FooProcessor) Execute(data any) any {
|
||||
logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data)
|
||||
s.count++
|
||||
|
||||
// 实现任务处理逻辑
|
||||
i := 0
|
||||
s.progress = i
|
||||
for i < 10 {
|
||||
// 获取任务进度
|
||||
progress := s.progress
|
||||
logger.Infof("data: %v => 任务进度:%d", data, progress)
|
||||
// 延迟响应
|
||||
time.Sleep(time.Second * 2)
|
||||
i++
|
||||
// 改变任务进度
|
||||
s.progress = i
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func TestFoo(t *testing.T) {
|
||||
|
||||
foo := CreateQueue("foo", NewFooProcessor)
|
||||
foo.RunJob(map[string]string{
|
||||
"data": "2",
|
||||
}, JobOptions{
|
||||
JobId: "2",
|
||||
})
|
||||
|
||||
fooC := CreateQueue("foo", NewFooProcessor)
|
||||
fooC.RunJob(map[string]string{
|
||||
"corn": "*/5 * * * * *",
|
||||
}, JobOptions{
|
||||
JobId: "3",
|
||||
Cron: "*/5 * * * * *",
|
||||
})
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
// Bar 队列任务处理
|
||||
var NewBarProcessor = &BarProcessor{
|
||||
progress: 0,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
type BarProcessor struct {
|
||||
progress int
|
||||
count int
|
||||
}
|
||||
|
||||
func (s *BarProcessor) Execute(data any) any {
|
||||
logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data)
|
||||
s.count++
|
||||
|
||||
// 实现任务处理逻辑
|
||||
i := 0
|
||||
s.progress = i
|
||||
for i < 5 {
|
||||
// 获取任务进度
|
||||
progress := s.progress
|
||||
logger.Infof("data: %v => 任务进度:%d", data, progress)
|
||||
// 延迟响应
|
||||
time.Sleep(time.Second * 2)
|
||||
// 程序中途执行错误
|
||||
if i == 3 {
|
||||
// arr := [1]int{1}
|
||||
// arr[i] = 3
|
||||
// fmt.Println(arr)
|
||||
// return "i = 3"
|
||||
panic("程序中途执行错误")
|
||||
}
|
||||
i++
|
||||
// 改变任务进度
|
||||
s.progress = i
|
||||
}
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
func TestBar(t *testing.T) {
|
||||
|
||||
bar := CreateQueue("bar", NewBarProcessor)
|
||||
bar.RunJob(map[string]string{
|
||||
"data": "wdf",
|
||||
}, JobOptions{
|
||||
JobId: "81923",
|
||||
})
|
||||
|
||||
barC := CreateQueue("bar", NewBarProcessor)
|
||||
barC.RunJob(map[string]string{
|
||||
"corn": "*/5 * * * * *",
|
||||
}, JobOptions{
|
||||
JobId: "789",
|
||||
Cron: "*/5 * * * * *",
|
||||
})
|
||||
|
||||
// barDB := CreateQueue("barDB", NewBarProcessor)
|
||||
// barDB.RunJob(JobData{
|
||||
// SysJob: model.SysJob{
|
||||
// JobID: "9123",
|
||||
// JobName: "测试任务",
|
||||
// },
|
||||
// }, JobOptions{
|
||||
// JobId: "9123",
|
||||
// })
|
||||
|
||||
select {}
|
||||
}
|
||||
112
src/framework/cron/log.go
Normal file
112
src/framework/cron/log.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"ems.agt/src/framework/constants/common"
|
||||
"ems.agt/src/modules/monitor/model"
|
||||
"ems.agt/src/modules/monitor/repository"
|
||||
)
|
||||
|
||||
// 实例任务执行日志收集
|
||||
var newLog = cronlog{}
|
||||
|
||||
// cronlog 任务执行日志收集
|
||||
type cronlog struct{}
|
||||
|
||||
// Info 任务普通信息收集
|
||||
func (s cronlog) Info(msg string, keysAndValues ...any) {
|
||||
// logger.Infof("Info msg: %v ====> kv: %v", msg, keysAndValues)
|
||||
|
||||
}
|
||||
|
||||
// Error 任务异常错误收集
|
||||
func (s cronlog) Error(err error, msg string, keysAndValues ...any) {
|
||||
// logger.Errorf("Error: %v -> msg: %v ====> kv: %v", err, msg, keysAndValues)
|
||||
// logger.Errorf("k0: %v", keysAndValues[0].(*QueueJob))
|
||||
|
||||
// 指定的错误收集
|
||||
if msg == "failed" {
|
||||
// 任务对象
|
||||
job := keysAndValues[0].(*QueueJob)
|
||||
|
||||
// 结果信息序列化字符串
|
||||
jsonByte, _ := json.Marshal(map[string]any{
|
||||
"name": "failed",
|
||||
"message": err.Error(),
|
||||
})
|
||||
jobMsg := string(jsonByte)
|
||||
if len(jobMsg) > 500 {
|
||||
jobMsg = jobMsg[:500]
|
||||
}
|
||||
|
||||
// 读取任务信息创建日志对象
|
||||
if data, ok := job.Data.(JobData); ok {
|
||||
duration := time.Since(time.UnixMilli(job.Timestamp))
|
||||
sysJob := data.SysJob
|
||||
if sysJob.JobID == job.Opts.JobId {
|
||||
sysJobLog := model.SysJobLog{
|
||||
JobName: sysJob.JobName,
|
||||
JobGroup: sysJob.JobGroup,
|
||||
InvokeTarget: sysJob.InvokeTarget,
|
||||
TargetParams: sysJob.TargetParams,
|
||||
Status: common.STATUS_NO,
|
||||
JobMsg: jobMsg,
|
||||
CostTime: duration.Milliseconds(),
|
||||
}
|
||||
// 插入数据
|
||||
repository.NewSysJobLogImpl.InsertJobLog(sysJobLog)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Completed 任务完成return的结果收集
|
||||
func (s cronlog) Completed(result any, msg string, keysAndValues ...any) {
|
||||
// logger.Infof("Completed: %v -> msg: %v ====> kv: %v", result, msg, keysAndValues)
|
||||
// logger.Infof("k0: %v", keysAndValues[0].(*QueueJob))
|
||||
|
||||
// 指定的完成收集
|
||||
if msg == "completed" {
|
||||
// 任务对象
|
||||
job := keysAndValues[0].(*QueueJob)
|
||||
|
||||
// 结果信息序列化字符串
|
||||
jsonByte, _ := json.Marshal(map[string]any{
|
||||
"name": "completed",
|
||||
"message": result,
|
||||
})
|
||||
jobMsg := string(jsonByte)
|
||||
if len(jobMsg) > 500 {
|
||||
jobMsg = jobMsg[:500]
|
||||
}
|
||||
|
||||
// 读取任务信息创建日志对象
|
||||
if data, ok := job.Data.(JobData); ok {
|
||||
duration := time.Since(time.UnixMilli(job.Timestamp))
|
||||
sysJob := data.SysJob
|
||||
if sysJob.JobID == job.Opts.JobId {
|
||||
sysJobLog := model.SysJobLog{
|
||||
JobName: sysJob.JobName,
|
||||
JobGroup: sysJob.JobGroup,
|
||||
InvokeTarget: sysJob.InvokeTarget,
|
||||
TargetParams: sysJob.TargetParams,
|
||||
Status: common.STATUS_YES,
|
||||
JobMsg: jobMsg,
|
||||
CostTime: duration.Milliseconds(),
|
||||
}
|
||||
// 插入数据
|
||||
repository.NewSysJobLogImpl.InsertJobLog(sysJobLog)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// JobData 调度任务日志收集结构体,执行任务时传入的接收参数
|
||||
type JobData struct {
|
||||
// 触发执行cron重复多次
|
||||
Repeat bool
|
||||
// 定时任务调度表记录信息
|
||||
SysJob model.SysJob
|
||||
}
|
||||
Reference in New Issue
Block a user