From c8c33d4dff8468ab7e6f6d409a847e8522191f79 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Tue, 19 Mar 2024 11:25:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=80=A7=E8=83=BD=E6=95=B0=E6=8D=AE=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5/=E8=B5=84=E6=BA=90=E6=95=B0=E6=8D=AE=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app.go | 4 +- src/modules/nms_cxy/README.md | 29 +++++++++ src/modules/nms_cxy/nms_cxy.go | 13 ++-- .../processor/performance/performance.go | 64 +++++++++++++++++++ src/modules/nms_cxy/processor/processor.go | 13 ++++ .../nms_cxy/processor/resource/resource.go | 56 ++++++++++++++++ 6 files changed, 172 insertions(+), 7 deletions(-) create mode 100644 src/modules/nms_cxy/processor/performance/performance.go create mode 100644 src/modules/nms_cxy/processor/processor.go create mode 100644 src/modules/nms_cxy/processor/resource/resource.go diff --git a/src/app.go b/src/app.go index 3d44056..9f834f9 100644 --- a/src/app.go +++ b/src/app.go @@ -124,6 +124,8 @@ func initModulesRoute(app *gin.Engine) { networkelement.Setup(app) // 网元数据模块 networkdata.Setup(app) + // 北向模块 - 中国星网 + nmscxy.Setup(app) // 跟踪模块 trace.Setup(app) // 图表模块 @@ -134,6 +136,4 @@ func initModulesRoute(app *gin.Engine) { crontask.Setup(app) // 监控模块 - 含调度处理加入队列,放最后 monitor.Setup(app) - // 北向模块 - 中国星网 - nmscxy.Setup(app) } diff --git a/src/modules/nms_cxy/README.md b/src/modules/nms_cxy/README.md index c4e876a..88e7bdc 100644 --- a/src/modules/nms_cxy/README.md +++ b/src/modules/nms_cxy/README.md @@ -25,6 +25,16 @@ nmsCXY: - "192.168.5.59:19092" - "192.168.5.59:29092" - "192.168.5.59:39092" + # 启用 Kerberos 认证 + krb5: + enable: false + config: + configPath: "/path/to/krb5.conf" + keyTabPath: "/path/to/keytab" + serviceName: "kafka" + realm: "EXAMPLE.COM" + username: "client" + # OSS配置 oss: bucketname: "omc-bucket" @@ -33,3 +43,22 @@ nmsCXY: accessKeyID: "aOW0r1gfw74G88Z3XZJ6" secretAccessKey: "9tDErvtCEuVox6LoQu5BOVtycQKcQjlXOGvjl1eD" ``` + +## 数据库添加定时任务 + +```sql +-- 性能数据文件的上报 +INSERT IGNORE INTO `sys_job` (`job_name`, `job_group`, `invoke_target`, `target_params`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `save_log`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES ('job.nms_cxy.performance_upload_oss', 'SYSTEM', 'performance_upload_oss', '{\"interval\":15,\"neTypes\":[\"AMF\",\"UPF\"]}', '* 0/15 * * * ?', '3', '0', '1', '1', 'supervisor', 1698478134839, 'supervisor', 1710817306404, 'job.nms_cxy.performance_upload_oss_remark'); +INSERT IGNORE INTO `sys_dict_data` VALUES (210, 210, 'job.nms_cxy.performance_upload_oss', '性能数据文件的上报', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); +INSERT IGNORE INTO `sys_dict_data` VALUES (211, 211, 'job.nms_cxy.performance_upload_oss_remark', 'interval周期为15分钟。neTypes为数组对应上报数据的网元类型。', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); +INSERT IGNORE INTO `sys_dict_data` VALUES (212, 212, 'job.nms_cxy.performance_upload_oss', 'Reporting of performance data files', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); +INSERT IGNORE INTO `sys_dict_data` VALUES (213, 213, 'job.nms_cxy.performance_upload_oss_remark', 'The interval period is 15 minutes. neTypes is an array corresponding to the type of network element for which the data was reported.', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); + +-- 资源数据上报接口 +INSERT IGNORE INTO `sys_job` (`job_name`, `job_group`, `invoke_target`, `target_params`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `save_log`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES ('job.nms_cxy.resource_upload_oss', 'SYSTEM', 'resource_upload_oss', '{\"interval\":15}', '* * 0,12 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134839, 'supervisor', 1710818082110, 'job.nms_cxy.resource_upload_oss_remark'); +INSERT IGNORE INTO `sys_dict_data` VALUES (215, 215, 'job.nms_cxy.resource_upload_oss', '资源数据上报接口', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); +INSERT IGNORE INTO `sys_dict_data` VALUES (216, 216, 'job.nms_cxy.resource_upload_oss_remark', '每天的 0 时及 12 时上报网元的全量资源数据文件至 OSS。', 'i18n_zh', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); +INSERT IGNORE INTO `sys_dict_data` VALUES (217, 217, 'job.nms_cxy.resource_upload_oss', 'Resource data reporting interface', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); +INSERT IGNORE INTO `sys_dict_data` VALUES (218, 218, 'job.nms_cxy.resource_upload_oss_remark', 'Full resource data files for network elements are reported to the OSS at 0000 and 1200 hours each day.', 'i18n_en', '', '', '1', 'supervisor', 1705550000000, '', 0, ''); + +``` diff --git a/src/modules/nms_cxy/nms_cxy.go b/src/modules/nms_cxy/nms_cxy.go index 301f9d2..ba08bf7 100644 --- a/src/modules/nms_cxy/nms_cxy.go +++ b/src/modules/nms_cxy/nms_cxy.go @@ -5,6 +5,7 @@ import ( "nms_nbi/src/framework/logger" "nms_nbi/src/framework/middleware" "nms_nbi/src/modules/nms_cxy/controller" + "nms_nbi/src/modules/nms_cxy/processor" "nms_nbi/src/modules/nms_cxy/utils/kafka" "nms_nbi/src/modules/nms_cxy/utils/oss" @@ -132,6 +133,11 @@ func Setup(router *gin.Engine) { // 测试管理 testManagementGroup := apiRest.Group("/testManagement") { + // 默认测试 + testManagementGroup.POST("/", + middleware.PreAuthorize(nil), + controller.NewTest.Demo, + ) // kafka发送消息 testManagementGroup.POST("/kafka/send", middleware.PreAuthorize(nil), @@ -152,11 +158,6 @@ func Setup(router *gin.Engine) { middleware.PreAuthorize(nil), controller.NewTest.OssUpload, ) - // OSS文件上传测试上报 - testManagementGroup.POST("/oss/testUp", - middleware.PreAuthorize(nil), - controller.NewTest.OssTestUp, - ) } } @@ -166,4 +167,6 @@ func InitLoad() { kafka.InitConfig() // 初始化连接OSS oss.InitConfig() + // 初始化定时任务处理 + processor.InitCronQueue() } diff --git a/src/modules/nms_cxy/processor/performance/performance.go b/src/modules/nms_cxy/processor/performance/performance.go new file mode 100644 index 0000000..f2d0d37 --- /dev/null +++ b/src/modules/nms_cxy/processor/performance/performance.go @@ -0,0 +1,64 @@ +package performance + +import ( + "encoding/json" + "fmt" + "nms_nbi/src/framework/cron" + "nms_nbi/src/framework/logger" + "nms_nbi/src/framework/utils/date" + "nms_nbi/src/modules/nms_cxy/service" + "time" +) + +var NewProcessor = &PerformanceProcessor{ + count: 0, + performanceService: service.NewPerformanceImpl, +} + +// 性能指标 队列任务处理 +type PerformanceProcessor struct { + // 执行次数 + count int + // 性能数据处理服务 + performanceService service.IPerformance +} + +func (s *PerformanceProcessor) Execute(data any) (any, error) { + logger.Infof("执行 %d 次", s.count) + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID) + + // 读取参数值 + var params struct { + NeTypes []string `json:"neTypes"` + Interval int `json:"interval"` + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params 'interval' err: %v", err) + } + + // 当前时间减去周期时间 + currentTime := time.Now() + currentTimeStr := date.ParseDateToStr(currentTime, date.YYYY_MM_DD_HH_MM_SS) + minus15Minutes := currentTime.Add(-time.Duration(params.Interval) * time.Minute) + minus15MinutesStr := date.ParseDateToStr(minus15Minutes, date.YYYY_MM_DD_HH_MM_SS) + // 遍历网元 + for _, neType := range params.NeTypes { + err = s.performanceService.PerformanceUploadOSS(neType, minus15MinutesStr, currentTimeStr) + if err != nil { + return nil, err + } + } + + // 返回结果,用于记录执行结果 + result := map[string]any{ + "repeat": options.Repeat, + "count": s.count, + "jobName": sysJob.JobName, + } + return result, nil +} diff --git a/src/modules/nms_cxy/processor/processor.go b/src/modules/nms_cxy/processor/processor.go new file mode 100644 index 0000000..3f04170 --- /dev/null +++ b/src/modules/nms_cxy/processor/processor.go @@ -0,0 +1,13 @@ +package processor + +import ( + "nms_nbi/src/framework/cron" + "nms_nbi/src/modules/nms_cxy/processor/performance" + "nms_nbi/src/modules/nms_cxy/processor/resource" +) + +// InitCronQueue 初始定时任务队列 +func InitCronQueue() { + cron.CreateQueue("performance_upload_oss", performance.NewProcessor) + cron.CreateQueue("resource_upload_oss", resource.NewProcessor) +} diff --git a/src/modules/nms_cxy/processor/resource/resource.go b/src/modules/nms_cxy/processor/resource/resource.go new file mode 100644 index 0000000..19d4d2b --- /dev/null +++ b/src/modules/nms_cxy/processor/resource/resource.go @@ -0,0 +1,56 @@ +package resource + +import ( + "encoding/json" + "fmt" + "nms_nbi/src/framework/cron" + "nms_nbi/src/framework/logger" + "nms_nbi/src/modules/nms_cxy/service" +) + +var NewProcessor = &ResourceProcessor{ + count: 0, + resourceService: service.NewResourceImpl, +} + +// 资源上报 队列任务处理 +type ResourceProcessor struct { + // 执行次数 + count int + // 资源数据处理服务 + resourceService service.IResource +} + +func (s *ResourceProcessor) Execute(data any) (any, error) { + logger.Infof("执行 %d 次", s.count) + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID) + + // 读取参数值 + var params struct { + NeTypes []string `json:"neTypes"` + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params 'interval' err: %v", err) + } + + // 遍历网元 + for _, neType := range params.NeTypes { + err = s.resourceService.ResourceeUploadOSS(neType) + if err != nil { + return nil, err + } + } + + // 返回结果,用于记录执行结果 + result := map[string]any{ + "repeat": options.Repeat, + "count": s.count, + "jobName": sysJob.JobName, + } + return result, nil +}