1
0

feat: 合并代码

This commit is contained in:
TsMask
2023-10-26 09:33:51 +08:00
parent d63db9dd6c
commit 08a908c3f4
38 changed files with 780 additions and 203 deletions

View File

@@ -63,7 +63,9 @@ type Queue struct {
// QueueProcessor 队列处理函数接口
type QueueProcessor interface {
// Execute 实际执行函数
Execute(data any) any
// any 返回有效值最终序列化为字符串,记录为成功
// error 存在错误,记录为失败
Execute(data any) (any, error)
}
// RunJob 运行任务data是传入的数据
@@ -196,7 +198,12 @@ func (s QueueJob) Run() {
// 获取队列处理器接口实现
processor := *job.queueProcessor
result := processor.Execute(job.Data)
job.Status = Completed
newLog.Completed(result, "completed", job)
result, err := processor.Execute(job.Data)
if err != nil {
job.Status = Failed
newLog.Error(err, "failed", job)
} else {
job.Status = Completed
newLog.Completed(result, "completed", job)
}
}

View File

@@ -19,10 +19,10 @@ var NewSimple = &Simple{}
type Simple struct{}
func (s *Simple) Execute(data any) any {
func (s *Simple) Execute(data any) (any, error) {
logger.Infof("执行=> %+v ", data)
// 实现任务处理逻辑
return data
return data, nil
}
func TestSimple(t *testing.T) {
@@ -68,7 +68,7 @@ type FooProcessor struct {
count int
}
func (s *FooProcessor) Execute(data any) any {
func (s *FooProcessor) Execute(data any) (any, error) {
logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data)
s.count++
@@ -85,7 +85,7 @@ func (s *FooProcessor) Execute(data any) any {
// 改变任务进度
s.progress = i
}
return data
return data, nil
}
func TestFoo(t *testing.T) {
@@ -119,7 +119,7 @@ type BarProcessor struct {
count int
}
func (s *BarProcessor) Execute(data any) any {
func (s *BarProcessor) Execute(data any) (any, error) {
logger.Infof("执行 %d %d => %+v ", s.count, s.progress, data)
s.count++
@@ -145,7 +145,7 @@ func (s *BarProcessor) Execute(data any) any {
s.progress = i
}
return data
return data, nil
}
func TestBar(t *testing.T) {

View File

@@ -31,33 +31,16 @@ func (s cronlog) Error(err error, msg string, keysAndValues ...any) {
// 任务对象
job := keysAndValues[0].(*QueueJob)
// 结果信息序列化字符串
jsonByte, _ := json.Marshal(map[string]any{
"name": "failed",
"message": err.Error(),
})
jobMsg := string(jsonByte)
if len(jobMsg) > 500 {
jobMsg = jobMsg[:500]
}
// 读取任务信息创建日志对象
// 读取任务信息进行保存日志
if data, ok := job.Data.(JobData); ok {
duration := time.Since(time.UnixMilli(job.Timestamp))
sysJob := data.SysJob
if sysJob.JobID == job.Opts.JobId {
sysJobLog := model.SysJobLog{
JobName: sysJob.JobName,
JobGroup: sysJob.JobGroup,
InvokeTarget: sysJob.InvokeTarget,
TargetParams: sysJob.TargetParams,
Status: common.STATUS_NO,
JobMsg: jobMsg,
CostTime: duration.Milliseconds(),
}
// 插入数据
repository.NewSysJobLogImpl.InsertJobLog(sysJobLog)
// 日志数据
jobLog := jobLogData{
JobID: job.Opts.JobId,
Timestamp: job.Timestamp,
Data: data,
Result: err.Error(),
}
jobLog.SaveLog(common.STATUS_NO)
}
}
}
@@ -72,37 +55,75 @@ func (s cronlog) Completed(result any, msg string, keysAndValues ...any) {
// 任务对象
job := keysAndValues[0].(*QueueJob)
// 结果信息序列化字符串
jsonByte, _ := json.Marshal(map[string]any{
"name": "completed",
"message": result,
})
jobMsg := string(jsonByte)
if len(jobMsg) > 500 {
jobMsg = jobMsg[:500]
}
// 读取任务信息创建日志对象
// 读取任务信息进行保存日志
if data, ok := job.Data.(JobData); ok {
duration := time.Since(time.UnixMilli(job.Timestamp))
sysJob := data.SysJob
if sysJob.JobID == job.Opts.JobId {
sysJobLog := model.SysJobLog{
JobName: sysJob.JobName,
JobGroup: sysJob.JobGroup,
InvokeTarget: sysJob.InvokeTarget,
TargetParams: sysJob.TargetParams,
Status: common.STATUS_YES,
JobMsg: jobMsg,
CostTime: duration.Milliseconds(),
}
// 插入数据
repository.NewSysJobLogImpl.InsertJobLog(sysJobLog)
// 日志数据
jobLog := jobLogData{
JobID: job.Opts.JobId,
Timestamp: job.Timestamp,
Data: data,
Result: result,
}
jobLog.SaveLog(common.STATUS_YES)
}
}
}
// jobLogData 日志记录数据
type jobLogData struct {
JobID string
Timestamp int64
Data JobData
Result any
}
// SaveLog 日志记录保存
func (jl *jobLogData) SaveLog(status string) {
// 读取任务信息
sysJob := jl.Data.SysJob
// 任务ID与任务信息ID不相同
if jl.JobID == "" || jl.JobID != sysJob.JobID {
return
}
// 任务日志不需要记录
if sysJob.SaveLog == "" || sysJob.SaveLog == common.STATUS_NO {
return
}
// 结果信息key的Name
resultNmae := "failed"
if status == common.STATUS_YES {
resultNmae = "completed"
}
// 结果信息序列化字符串
jsonByte, _ := json.Marshal(map[string]any{
"name": resultNmae,
"crom": jl.Data.Repeat,
"message": jl.Result,
})
jobMsg := string(jsonByte)
if len(jobMsg) > 500 {
jobMsg = jobMsg[:500]
}
// 创建日志对象
duration := time.Since(time.UnixMilli(jl.Timestamp))
sysJobLog := model.SysJobLog{
JobName: sysJob.JobName,
JobGroup: sysJob.JobGroup,
InvokeTarget: sysJob.InvokeTarget,
TargetParams: sysJob.TargetParams,
Status: status,
JobMsg: jobMsg,
CostTime: duration.Milliseconds(),
}
// 插入数据
repository.NewSysJobLogImpl.InsertJobLog(sysJobLog)
}
// JobData 调度任务日志收集结构体,执行任务时传入的接收参数
type JobData struct {
// 触发执行cron重复多次

View File

@@ -47,6 +47,10 @@ func Create(loginUser *vo.LoginUser, ilobArgs ...string) string {
// 设置用户令牌有效期并存入缓存
Cache(loginUser)
// 设置登录IP和登录时间
loginUser.User.LoginIP = loginUser.IPAddr
loginUser.User.LoginDate = loginUser.LoginTime
// 令牌算法 HS256 HS384 HS512
algorithm := config.Get("jwt.algorithm").(string)
var method *jwt.SigningMethodHMAC