feat: 添加定时任务性能数据上报/资源数据上报

This commit is contained in:
TsMask
2024-03-19 11:25:01 +08:00
parent ba6824e623
commit c8c33d4dff
6 changed files with 172 additions and 7 deletions

View File

@@ -124,6 +124,8 @@ func initModulesRoute(app *gin.Engine) {
networkelement.Setup(app)
// 网元数据模块
networkdata.Setup(app)
// 北向模块 - 中国星网
nmscxy.Setup(app)
// 跟踪模块
trace.Setup(app)
// 图表模块
@@ -134,6 +136,4 @@ func initModulesRoute(app *gin.Engine) {
crontask.Setup(app)
// 监控模块 - 含调度处理加入队列,放最后
monitor.Setup(app)
// 北向模块 - 中国星网
nmscxy.Setup(app)
}

View File

@@ -25,6 +25,16 @@ nmsCXY:
- "192.168.5.59:19092"
- "192.168.5.59:29092"
- "192.168.5.59:39092"
# 启用 Kerberos 认证
krb5:
enable: false
config:
configPath: "/path/to/krb5.conf"
keyTabPath: "/path/to/keytab"
serviceName: "kafka"
realm: "EXAMPLE.COM"
username: "client"
# OSS配置
oss:
bucketname: "omc-bucket"
@@ -33,3 +43,22 @@ nmsCXY:
accessKeyID: "aOW0r1gfw74G88Z3XZJ6"
secretAccessKey: "9tDErvtCEuVox6LoQu5BOVtycQKcQjlXOGvjl1eD"
```
## 数据库添加定时任务
```sql
-- 性能数据文件的上报
INSERT IGNORE INTO `sys_job` (`job_name`, `job_group`, `invoke_target`, `target_params`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `save_log`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES ('job.nms_cxy.performance_upload_oss', 'SYSTEM', 'performance_upload_oss', '{\"interval\":15,\"neTypes\":[\"AMF\",\"UPF\"]}', '* 0/15 * * * ?', '3', '0', '1', '1', 'supervisor', 1698478134839, 'supervisor', 1710817306404, 'job.nms_cxy.performance_upload_oss_remark');
INSERT IGNORE INTO `sys_dict_data` VALUES (210, 210, 'job.nms_cxy.performance_upload_oss', '性能数据文件的上报', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
INSERT IGNORE INTO `sys_dict_data` VALUES (211, 211, 'job.nms_cxy.performance_upload_oss_remark', 'interval周期为15分钟。neTypes为数组对应上报数据的网元类型。', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
INSERT IGNORE INTO `sys_dict_data` VALUES (212, 212, 'job.nms_cxy.performance_upload_oss', 'Reporting of performance data files', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
INSERT IGNORE INTO `sys_dict_data` VALUES (213, 213, 'job.nms_cxy.performance_upload_oss_remark', 'The interval period is 15 minutes. neTypes is an array corresponding to the type of network element for which the data was reported.', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
-- 资源数据上报接口
INSERT IGNORE INTO `sys_job` (`job_name`, `job_group`, `invoke_target`, `target_params`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `save_log`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES ('job.nms_cxy.resource_upload_oss', 'SYSTEM', 'resource_upload_oss', '{\"interval\":15}', '* * 0,12 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134839, 'supervisor', 1710818082110, 'job.nms_cxy.resource_upload_oss_remark');
INSERT IGNORE INTO `sys_dict_data` VALUES (215, 215, 'job.nms_cxy.resource_upload_oss', '资源数据上报接口', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
INSERT IGNORE INTO `sys_dict_data` VALUES (216, 216, 'job.nms_cxy.resource_upload_oss_remark', '每天的 0 时及 12 时上报网元的全量资源数据文件至 OSS。', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
INSERT IGNORE INTO `sys_dict_data` VALUES (217, 217, 'job.nms_cxy.resource_upload_oss', 'Resource data reporting interface', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
INSERT IGNORE INTO `sys_dict_data` VALUES (218, 218, 'job.nms_cxy.resource_upload_oss_remark', 'Full resource data files for network elements are reported to the OSS at 0000 and 1200 hours each day.', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, '');
```

View File

@@ -5,6 +5,7 @@ import (
"nms_nbi/src/framework/logger"
"nms_nbi/src/framework/middleware"
"nms_nbi/src/modules/nms_cxy/controller"
"nms_nbi/src/modules/nms_cxy/processor"
"nms_nbi/src/modules/nms_cxy/utils/kafka"
"nms_nbi/src/modules/nms_cxy/utils/oss"
@@ -132,6 +133,11 @@ func Setup(router *gin.Engine) {
// 测试管理
testManagementGroup := apiRest.Group("/testManagement")
{
// 默认测试
testManagementGroup.POST("/",
middleware.PreAuthorize(nil),
controller.NewTest.Demo,
)
// kafka发送消息
testManagementGroup.POST("/kafka/send",
middleware.PreAuthorize(nil),
@@ -152,11 +158,6 @@ func Setup(router *gin.Engine) {
middleware.PreAuthorize(nil),
controller.NewTest.OssUpload,
)
// OSS文件上传测试上报
testManagementGroup.POST("/oss/testUp",
middleware.PreAuthorize(nil),
controller.NewTest.OssTestUp,
)
}
}
@@ -166,4 +167,6 @@ func InitLoad() {
kafka.InitConfig()
// 初始化连接OSS
oss.InitConfig()
// 初始化定时任务处理
processor.InitCronQueue()
}

View File

@@ -0,0 +1,64 @@
package performance
import (
"encoding/json"
"fmt"
"nms_nbi/src/framework/cron"
"nms_nbi/src/framework/logger"
"nms_nbi/src/framework/utils/date"
"nms_nbi/src/modules/nms_cxy/service"
"time"
)
var NewProcessor = &PerformanceProcessor{
count: 0,
performanceService: service.NewPerformanceImpl,
}
// 性能指标 队列任务处理
type PerformanceProcessor struct {
// 执行次数
count int
// 性能数据处理服务
performanceService service.IPerformance
}
func (s *PerformanceProcessor) Execute(data any) (any, error) {
logger.Infof("执行 %d 次", s.count)
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID)
// 读取参数值
var params struct {
NeTypes []string `json:"neTypes"`
Interval int `json:"interval"`
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params 'interval' err: %v", err)
}
// 当前时间减去周期时间
currentTime := time.Now()
currentTimeStr := date.ParseDateToStr(currentTime, date.YYYY_MM_DD_HH_MM_SS)
minus15Minutes := currentTime.Add(-time.Duration(params.Interval) * time.Minute)
minus15MinutesStr := date.ParseDateToStr(minus15Minutes, date.YYYY_MM_DD_HH_MM_SS)
// 遍历网元
for _, neType := range params.NeTypes {
err = s.performanceService.PerformanceUploadOSS(neType, minus15MinutesStr, currentTimeStr)
if err != nil {
return nil, err
}
}
// 返回结果,用于记录执行结果
result := map[string]any{
"repeat": options.Repeat,
"count": s.count,
"jobName": sysJob.JobName,
}
return result, nil
}

View File

@@ -0,0 +1,13 @@
package processor
import (
"nms_nbi/src/framework/cron"
"nms_nbi/src/modules/nms_cxy/processor/performance"
"nms_nbi/src/modules/nms_cxy/processor/resource"
)
// InitCronQueue 初始定时任务队列
func InitCronQueue() {
cron.CreateQueue("performance_upload_oss", performance.NewProcessor)
cron.CreateQueue("resource_upload_oss", resource.NewProcessor)
}

View File

@@ -0,0 +1,56 @@
package resource
import (
"encoding/json"
"fmt"
"nms_nbi/src/framework/cron"
"nms_nbi/src/framework/logger"
"nms_nbi/src/modules/nms_cxy/service"
)
var NewProcessor = &ResourceProcessor{
count: 0,
resourceService: service.NewResourceImpl,
}
// 资源上报 队列任务处理
type ResourceProcessor struct {
// 执行次数
count int
// 资源数据处理服务
resourceService service.IResource
}
func (s *ResourceProcessor) Execute(data any) (any, error) {
logger.Infof("执行 %d 次", s.count)
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID)
// 读取参数值
var params struct {
NeTypes []string `json:"neTypes"`
}
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, fmt.Errorf("json params 'interval' err: %v", err)
}
// 遍历网元
for _, neType := range params.NeTypes {
err = s.resourceService.ResourceeUploadOSS(neType)
if err != nil {
return nil, err
}
}
// 返回结果,用于记录执行结果
result := map[string]any{
"repeat": options.Repeat,
"count": s.count,
"jobName": sysJob.JobName,
}
return result, nil
}