fix: add new crontask module

This commit is contained in:
2023-10-21 16:15:08 +08:00
parent bdc6fdaa89
commit 7ddc5fa937
7 changed files with 238 additions and 11 deletions

10
.vscode/launch.json vendored
View File

@@ -37,6 +37,14 @@
"program": "d:/local.git/ems.agt/tools/loadpconf",
"args": ["-p","../../config/param/upf_param_config.yaml"],
"console": "integratedTerminal"
}
},
{
"name": "debug crontask",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "d:/local.git/ems.agt/crontask",
"console": "integratedTerminal"
}
]
}

View File

@@ -20,6 +20,15 @@ tasks:
unit: Seconds #Seconds/Minutes/Hours/Days/Weeks, Monday/Tuesday/.../Sunday,
at: 00:10:00 # do at time such as xx:xx:xx when unit such as Day/Days/Mondays...
do: TaskHelloWorld # (Do what: callback function)
- name: Cron user login OMC as startup
status: Inactive
uri: /login
params:
body: '{"username":"cronuser","password":"tcu@1000OMC!","code":"","uuid":""}'
interval: 0
unit: Startup
at: 00:00:00
do: TaskCronUserLoginOMC
- name: clear expired history alarm
uri: /api/rest/databaseManagement/v1/omc_db/alarm
params: WHERE=now()+>+ADDDATE(event_time,+interval+(SELECT+`value`+FROM+config+WHERE+config_tag='historyDuration')+day)+and+alarm_status='0'
@@ -46,8 +55,8 @@ tasks:
params: WHERE=now()+>+ADDDATE(`create_time`,+interval+IFNULL((SELECT+`value`+FROM+config+WHERE+config_tag='BackUpSaveTime'),30)+day)
interval: 1
unit: Days
at: 20:12:00
do: TaskDeleteExpiredRecord
at: 15:02:00
do: TaskRemoveExpiredFile
- name: update expired user session
uri: /api/rest/databaseManagement/v1/omc_db/session
params: WHERE=NOW()+>+ADDDATE(shake_time,+interval+expires+second)+and+status='online'

View File

@@ -127,6 +127,8 @@ func initCronTasks() {
} else {
gocron.Every(t.Interval).Sunday().At(t.At).DoSafely(taskFunc, t.Uri, t.Params, t.Body)
}
case "Startup":
gocron.Every(0).DoSafely(taskFunc, t.Uri, t.Params, t.Body)
default:
log.Error("Error config:", t)
}
@@ -161,6 +163,14 @@ func initCronTabs() {
job.Start()
}
type LoginRespone struct {
Code int `json:"code"`
Data struct {
AccessToken string `json:"access_token"`
} `json:"data"`
Msg string `json:"msg"`
}
type TaskJob struct {
Do interface{}
Uri string
@@ -200,6 +210,43 @@ func (t *TaskFunc) TaskWithParams(a int, b string) {
log.Trace(a, b)
}
func (t *TaskFunc) TaskCronUserLoginOMC(uri, params, body string) {
log.Debug("TaskCronUserLoginOMC processing... ")
var response *resty.Response
requestURI := fmt.Sprintf("%s?%s", uri, params)
requestURL := fmt.Sprintf("%s%s", yamlConfig.OMC.HostUri, requestURI)
log.Debug("requestURL: POST ", requestURL)
var loginBody string
if body != "" {
loginBody = body
} else {
loginBody = "{\"username\": \"cronuser\",\"password\": \"tcu@1000OMC!\",\"code\": \"\", \"uuid\": \"\"}"
}
client := resty.New()
response, err := client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(loginBody).
Post(requestURL)
if err != nil {
log.Error("Failed to post:", err)
}
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
default:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
}
}
func (t *TaskFunc) TaskDeleteExpiredRecord(uri, params, body string) {
log.Debug("TaskDeleteExpiredRecord processing... ")
@@ -267,6 +314,77 @@ func (t *TaskFunc) TaskUpdateTable(uri, params, body string) {
}
}
func (t *TaskFunc) TaskRemoveExpiredFile(uri, params, body string) {
log.Debug("TaskRemoveExpiredFile processing... ")
var response *resty.Response
loginUri := "/login"
loginBody := "{\"username\": \"cronuser\",\"password\": \"tcu@1000OMC!\",\"code\": \"\", \"uuid\": \"\"}"
t.TaskCronUserLoginOMC(loginUri, "", loginBody)
loginURI := fmt.Sprintf("%s?%s", loginUri, "")
loginURL := fmt.Sprintf("%s%s", yamlConfig.OMC.HostUri, loginURI)
log.Debug("requestURL: Post ", loginURL)
client := resty.New()
loginResponse, err := client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(loginBody).
Post(loginURL)
if err != nil {
log.Error("Failed to post:", err)
return
}
var accessToken string
log.Debug("StatusCode: ", loginResponse.StatusCode())
switch loginResponse.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
log.Debug("response body:", string(loginResponse.Body()))
var loginResp LoginRespone
err = json.Unmarshal(loginResponse.Body(), &loginResp)
if err != nil {
log.Error("Failed to unmarshal:", err)
return
}
if loginResp.Code == 1 {
accessToken = loginResp.Data.AccessToken
} else {
log.Error("Failed to login: %s", loginResp.Msg)
return
}
default:
log.Debug("response body:", string(response.Body()))
return
}
requestURI := fmt.Sprintf("%s?%s", uri, params)
requestURL := fmt.Sprintf("%s%s", yamlConfig.OMC.HostUri, requestURI)
log.Debug("requestURL: DELETE ", requestURL)
response, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"Authorization": "Bearer " + accessToken}).
SetHeaders(map[string]string{"User-Agent": GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Delete(requestURL)
if err != nil {
log.Error("Failed to delete:", err)
return
}
log.Debug("StatusCode: ", response.StatusCode())
switch response.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
default:
log.Debug("response body:", string(response.Body()))
body := new(map[string]interface{})
_ = json.Unmarshal(response.Body(), &body)
}
}
func (t *TaskFunc) GetTableNameFromUri(uri string) string {
sa := global.SplitString(uri, "/")
n := len(sa)

View File

@@ -143,8 +143,6 @@ UPDATE `omc_db`.`sys_config` SET `config_name` = '账号自助-是否开启用
UPDATE `omc_db`.`sys_config` SET `config_name` = '用户管理-密码最大错误次数', `config_key` = 'sys.user.maxRetryCount', `config_value` = '5', `config_type` = 'Y', `create_by` = 'admin', `create_time` = 1693908079680, `update_by` = '', `update_time` = 0, `remark` = '密码最大错误次数' WHERE `config_id` = 4;
UPDATE `omc_db`.`sys_config` SET `config_name` = '测试', `config_key` = 'test', `config_value` = 'test', `config_type` = 'Y', `create_by` = 'admin', `create_time` = 1693911541269, `update_by` = 'admin', `update_time` = 1693911586418, `remark` = '测试' WHERE `config_id` = 100;
INSERT IGNORE INTO `omc_db`.`sys_dept` (`dept_id`, `parent_id`, `ancestors`, `dept_name`, `order_num`, `leader`, `phone`, `email`, `status`, `del_flag`, `create_by`, `create_time`, `update_by`, `update_time`) VALUES (100, 0, '0', '运维部', 0, 'admin', '', '', '1', '0', 'admin', 1697091866188, '', NULL);
INSERT IGNORE INTO `omc_db`.`sys_dict_data` (`dict_code`, `dict_sort`, `dict_label`, `dict_value`, `dict_type`, `tag_class`, `tag_type`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (29, 0, '接口跟踪', 'Interface', 'trace_type', NULL, 'blue ', '1', 'admin', 1697443290808, '', 0, '接口跟踪');
INSERT IGNORE INTO `omc_db`.`sys_dict_data` (`dict_code`, `dict_sort`, `dict_label`, `dict_value`, `dict_type`, `tag_class`, `tag_type`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (30, 1, '设备跟踪', 'Device', 'trace_type', NULL, 'gold', '1', 'admin', 1697443307336, 'admin', 1697443541082, '设备跟踪');
INSERT IGNORE INTO `omc_db`.`sys_dict_data` (`dict_code`, `dict_sort`, `dict_label`, `dict_value`, `dict_type`, `tag_class`, `tag_type`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (31, 2, '用户跟踪', 'UE', 'trace_type', NULL, 'green', '1', 'admin', 1697443562042, 'admin', 1697443566327, '用户跟踪');
@@ -222,6 +220,11 @@ INSERT IGNORE INTO `omc_db`.`sys_job` (`job_id`, `job_name`, `job_group`, `invok
INSERT IGNORE INTO `omc_db`.`sys_job` (`job_id`, `job_name`, `job_group`, `invoke_target`, `target_params`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (3, '异常执行', 'SYSTEM', 'bar', '{\"t\":20}', '0/20 * * * * ?', '3', '0', '0', 'admin', 1697091151529, '', 0, '');
-- -- add default dept
INSERT IGNORE INTO `omc_db`.`sys_dept` (`dept_id`, `parent_id`, `ancestors`, `dept_name`, `order_num`, `leader`, `phone`, `email`, `status`, `del_flag`, `create_by`, `create_time`, `update_by`, `update_time`) VALUES (100, 0, '0', '运维部', 0, 'admin', '', '', '1', '0', 'admin', 1697091866188, '', NULL);
INSERT IGNORE INTO `omc_db`.`sys_dept` (`dept_id`, `parent_id`, `ancestors`, `dept_name`, `order_num`, `leader`, `phone`, `email`, `status`, `del_flag`, `create_by`, `create_time`, `update_by`, `update_time`) VALUES (101, 100, '0,100', '运维一部', 1, 'admin', '', '', '1', '0', 'admin', 1697091866192, '', NULL);
-- add default post
INSERT IGNORE INTO `omc_db`.`sys_post` (`post_id`, `post_code`, `post_name`, `post_sort`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (1, 'administrator', '管理人员', 1, '1', 'admin', 1697110106499, '', 0, '');
INSERT IGNORE INTO `omc_db`.`sys_post` (`post_id`, `post_code`, `post_name`, `post_sort`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (2, 'operator', '运维人员', 2, '1', 'admin', 1697110106502, '', 0, '');
INSERT IGNORE INTO `omc_db`.`sys_post` (`post_id`, `post_code`, `post_name`, `post_sort`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (3, 'auditor', '审计员', 3, '1', 'admin', 1697110106504, '', 0, '');
@@ -230,16 +233,17 @@ INSERT IGNORE INTO `omc_db`.`sys_post` (`post_id`, `post_code`, `post_name`, `po
DELETE FROM `omc_db`.`sys_role` WHERE `role_id` = 3;
UPDATE `omc_db`.`sys_role` SET `role_name` = '管理员', `role_key` = 'admin', `role_sort` = 1, `data_scope` = '1', `menu_check_strictly` = '1', `dept_check_strictly` = '1', `status` = '1', `del_flag` = '0', `create_by` = 'admin', `create_time` = 1697091437683, `update_by` = '', `update_time` = 0, `remark` = '管理员' WHERE `role_id` = 1;
UPDATE `omc_db`.`sys_role` SET `role_name` = '普通角色', `role_key` = 'user', `role_sort` = 100, `data_scope` = '2', `menu_check_strictly` = '1', `dept_check_strictly` = '1', `status` = '1', `del_flag` = '0', `create_by` = 'admin', `create_time` = 1697091437686, `update_by` = 'admin', `update_time` = 1697511172013, `remark` = '普通角色' WHERE `role_id` = 2;
UPDATE `omc_db`.`sys_role` SET `role_name` = '普通角色', `role_key` = 'user', `role_sort` = 2, `data_scope` = '2', `menu_check_strictly` = '1', `dept_check_strictly` = '1', `status` = '1', `del_flag` = '0', `create_by` = 'admin', `create_time` = 1697091437686, `update_by` = 'admin', `update_time` = 1697511172013, `remark` = '普通角色' WHERE `role_id` = 2;
INSERT IGNORE INTO `omc_db`.`sys_role_menu` (`role_id`, `menu_id`) VALUES (100, 1);
INSERT IGNORE INTO `omc_db`.`sys_role_menu` (`role_id`, `menu_id`) VALUES (100, 100);
INSERT IGNORE INTO `omc_db`.`sys_role_menu` (`role_id`, `menu_id`) VALUES (100, 1000);
INSERT IGNORE INTO `omc_db`.`sys_role_menu` (`role_id`, `menu_id`) VALUES (2, 1);
INSERT IGNORE INTO `omc_db`.`sys_role_menu` (`role_id`, `menu_id`) VALUES (2, 100);
INSERT IGNORE INTO `omc_db`.`sys_role_menu` (`role_id`, `menu_id`) VALUES (2, 1000);
INSERT IGNORE INTO `omc_db`.`sys_user` (`user_id`, `dept_id`, `user_name`, `nick_name`, `user_type`, `email`, `phonenumber`, `sex`, `avatar`, `password`, `status`, `del_flag`, `login_ip`, `login_date`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (1, 103, 'admin', '管理员', 'sys', '', '', '0', '', '$2a$10$QgIcp6yuOEGrEU0TNU12K.uQRLbcufesEU7hiRYlRSSdUO7OAkoTq', '1', '0', '127.0.0.1', 1697091656500, 'admin', 1697091656500, 'admin', 1697525176913, '管理员');
INSERT IGNORE INTO `omc_db`.`sys_user` (`user_id`, `dept_id`, `user_name`, `nick_name`, `user_type`, `email`, `phonenumber`, `sex`, `avatar`, `password`, `status`, `del_flag`, `login_ip`, `login_date`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (1, 100, 'admin', '管理员', 'sys', '', '', '0', '', '$2a$10$QgIcp6yuOEGrEU0TNU12K.uQRLbcufesEU7hiRYlRSSdUO7OAkoTq', '1', '0', '127.0.0.1', 1697091656500, '-', 1697091656500, '-', 1697525176913, '管理员');
INSERT IGNORE INTO `omc_db`.`sys_user` (`user_id`, `dept_id`, `user_name`, `nick_name`, `user_type`, `email`, `phonenumber`, `sex`, `avatar`, `password`, `status`, `del_flag`, `login_ip`, `login_date`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (2, 100, 'cronuser', '管理员', 'sys', '', '', '0', '', '$2a$10$QgIcp6yuOEGrEU0TNU12K.uQRLbcufesEU7hiRYlRSSdUO7OAkoTq', '1', '0', '127.0.0.1', 1697091656500, 'admin', 1697091656500, 'admin', 1697525176913, '计划任务管理员');
INSERT IGNORE INTO `omc_db`.`sys_user_post` (`user_id`, `post_id`) VALUES (1, 1);
INSERT IGNORE INTO `omc_db`.`sys_user_post` (`user_id`, `post_id`) VALUES (2, 1);
DELETE FROM `omc_db`.`sys_user_role` WHERE `user_id` = 176 AND `role_id` = 2;
DELETE FROM `omc_db`.`sys_user_role` WHERE `user_id` = 177 AND `role_id` = 2;

View File

@@ -8,6 +8,7 @@ import (
"ems.agt/src/framework/middleware"
"ems.agt/src/framework/middleware/security"
"ems.agt/src/modules/common"
"ems.agt/src/modules/crontask"
"ems.agt/src/modules/monitor"
"ems.agt/src/modules/system"
@@ -116,4 +117,5 @@ func initModulesRoute(app *gin.Engine) {
common.Setup(app)
monitor.Setup(app)
system.Setup(app)
crontask.Setup(app)
}

View File

@@ -0,0 +1,23 @@
package crontask
import (
"ems.agt/src/framework/cron"
"ems.agt/src/framework/logger"
"ems.agt/src/modules/crontask/tasks"
"github.com/gin-gonic/gin"
)
// Setup 模块路由注册
func Setup(router *gin.Engine) {
logger.Infof("开始加载 ====> monitor 模块路由")
// 启动时需要的初始参数
InitCronQueue()
}
// InitCronQueue 初始定时任务队列
func InitCronQueue() {
cron.CreateQueue("tasks", tasks.NewProcessor)
}

View File

@@ -0,0 +1,63 @@
package tasks
import (
"ems.agt/lib/dborm"
"ems.agt/lib/log"
"ems.agt/src/framework/cron"
)
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
func (s *BarProcessor) Execute(data any) (any, error) {
log.Infof("执行 %d 次,上次进度: %d ", s.count, s.progress)
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
log.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID)
// // 实现任务处理逻辑
// i := 0
// s.progress = i
// for i < 5 {
// // 获取任务进度
// progress := s.progress
// log.Infof("jonId: %s => 任务进度:%d", sysJob.JobID, progress)
// // 延迟响应
// time.Sleep(time.Second * 2)
// // 程序中途执行错误
// if i == 3 {
// // arr := [1]int{1}
// // arr[i] = 3
// // fmt.Println(arr)
// // return "i = 3"
// panic("程序中途执行错误")
// }
// i++
// // 改变任务进度
// s.progress = i
// }
where := "NOW()>ADDDATE(`create_time`,interval IFNULL((SELECT `value` FROM config WHERE config_tag='BackUpSaveTime'),30) day)"
affected, err := dborm.XormDeleteDataByWhere(where, "ne_backup")
if err != nil {
// panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err))
return nil, err
}
// 返回结果,用于记录执行结果
return map[string]any{
"err": err.Error(),
"affected": affected,
}, nil
}