diff --git a/src/modules/nms_cxy/processor/config/config.go b/src/modules/nms_cxy/processor/config/config.go new file mode 100644 index 0000000..9e0179c --- /dev/null +++ b/src/modules/nms_cxy/processor/config/config.go @@ -0,0 +1,56 @@ +package config + +import ( + "encoding/json" + "fmt" + "nms_cxy/src/framework/cron" + "nms_cxy/src/framework/logger" + "nms_cxy/src/modules/nms_cxy/service" +) + +var NewProcessor = &ConfigProcessor{ + count: 0, + configService: service.NewConfigImpl, +} + +// 配置上报 队列任务处理 +type ConfigProcessor struct { + // 执行次数 + count int + // 配置数据处理服务 + configService service.IConfig +} + +func (s *ConfigProcessor) Execute(data any) (any, error) { + logger.Infof("执行 %d 次", s.count) + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复 %v 任务ID %s", options.Repeat, sysJob.JobID) + + // 读取参数值 + var params struct { + NeTypes []string `json:"neTypes"` + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params 'neTypes' err: %v", err) + } + + // 遍历网元 + for _, neType := range params.NeTypes { + err = s.configService.ConfigUploadOSS(neType) + if err != nil { + return nil, err + } + } + + // 返回结果,用于记录执行结果 + result := map[string]any{ + "repeat": options.Repeat, + "count": s.count, + "jobName": sysJob.JobName, + } + return result, nil +} diff --git a/src/modules/nms_cxy/processor/performance/performance.go b/src/modules/nms_cxy/processor/performance/performance.go index 19c9a00..e101e85 100644 --- a/src/modules/nms_cxy/processor/performance/performance.go +++ b/src/modules/nms_cxy/processor/performance/performance.go @@ -38,7 +38,7 @@ func (s *PerformanceProcessor) Execute(data any) (any, error) { } err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) if err != nil { - return nil, fmt.Errorf("json params 'interval' err: %v", err) + return nil, fmt.Errorf("json params 'neTypes' err: %v", err) } // 当前时间减去周期时间 diff --git a/src/modules/nms_cxy/processor/processor.go b/src/modules/nms_cxy/processor/processor.go index 7c65612..fe23ff9 100644 --- a/src/modules/nms_cxy/processor/processor.go +++ b/src/modules/nms_cxy/processor/processor.go @@ -2,6 +2,7 @@ package processor import ( "nms_cxy/src/framework/cron" + "nms_cxy/src/modules/nms_cxy/processor/config" "nms_cxy/src/modules/nms_cxy/processor/performance" "nms_cxy/src/modules/nms_cxy/processor/resource" ) @@ -10,4 +11,5 @@ import ( func InitCronQueue() { cron.CreateQueue("performance_upload_oss", performance.NewProcessor) cron.CreateQueue("resource_upload_oss", resource.NewProcessor) + cron.CreateQueue("config_upload_oss", config.NewProcessor) } diff --git a/src/modules/nms_cxy/processor/resource/resource.go b/src/modules/nms_cxy/processor/resource/resource.go index 03b0438..d27acc1 100644 --- a/src/modules/nms_cxy/processor/resource/resource.go +++ b/src/modules/nms_cxy/processor/resource/resource.go @@ -35,7 +35,7 @@ func (s *ResourceProcessor) Execute(data any) (any, error) { } err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) if err != nil { - return nil, fmt.Errorf("json params 'interval' err: %v", err) + return nil, fmt.Errorf("json params 'neTypes' err: %v", err) } // 遍历网元