diff --git a/src/modules/nms_cxy/controller/test.go b/src/modules/nms_cxy/controller/test.go index 84a4c1c..f1241de 100644 --- a/src/modules/nms_cxy/controller/test.go +++ b/src/modules/nms_cxy/controller/test.go @@ -6,6 +6,7 @@ import ( "nms_nbi/src/framework/utils/ctx" "nms_nbi/src/framework/vo/result" neDataModel "nms_nbi/src/modules/network_data/model" + neDataService "nms_nbi/src/modules/network_data/service" neService "nms_nbi/src/modules/network_element/service" "nms_nbi/src/modules/nms_cxy/service" "nms_nbi/src/modules/nms_cxy/utils/kafka" @@ -17,6 +18,7 @@ import ( // 实例化控制层 TestController 结构体 var NewTest = &TestController{ neInfoService: neService.NewNeInfoImpl, + alarmDataService: neDataService.NewAlarmImpl, alarmService: service.NewAlarmImpl, configService: service.NewConfigImpl, performanceService: service.NewPerformanceImpl, @@ -29,6 +31,8 @@ var NewTest = &TestController{ type TestController struct { // 网元信息服务 neInfoService neService.INeInfo + // 告警信息服务 + alarmDataService neDataService.IAlarm // 告警数据服务 alarmService service.IAlarm // 配置数据处理服务 @@ -39,6 +43,100 @@ type TestController struct { resourceService service.IResource } +// 默认测试 +// +// POST / +func (s *TestController) Demo(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + OperType string `json:"operType" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + if body.OperType == "alarm-kafka" { + data := s.alarmDataService.SelectPage(neDataModel.AlarmQuery{ + PageNum: 1, + PageSize: 1, + SortField: "timestamp", + SortOrder: "desc", + }) + rows := data["rows"].([]neDataModel.Alarm) + if len(rows) > 0 { + err := s.alarmService.KafkaPush(rows[0]) + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + "err": err, + }) + return + } + } + if body.OperType == "config" { + err := s.configService.ConfigUploadOSS("OMC") + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + "err": err, + }) + return + } + if body.OperType == "performance" { + err := s.performanceService.PerformanceUploadOSS("AMF", "2024-02-15 10:16:30", "2024-03-07 10:20:08") + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + "err": err, + }) + return + } + if body.OperType == "resource" { + err := s.resourceService.ResourceeUploadOSS("AMF") + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + "err": err, + }) + return + } + if body.OperType == "resource-kafka" { + err := s.resourceService.KafkaPush("AMF", "", map[string]any{ + "MsgID": "1", + "Ruid": "CZZY001BRTabcdefgh12345678", + "ChangeTime": "2024-03-15 08:34:24", + "ChangeType": "add", + "EvpnT5.rd": "100:100", + "EvpnT5.ipversion": "ipv4", + "EvpnT5.prefix": "10.0.0.0/24", + "EvpnT5.ESI": 0, + "EvpnT5.ethertagid": 0, + "EvpnT5.gwipaddress": "0.0.0.0", + "EvpnT5.mplslable": 3, + "EvpnT5.originator": "192.168.1.1", + "EvpnT5.nexthop": "8000::1", + "EvpnT5.aspath": "", + "EvpnT5.localpreference": 10, + "EvpnT5.community": "", + "EvpnT5.extcommunity": "rt:100:100 color:10", + "EvpnT5.srv6sid": "8000::1:100", + }) + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + "err": err, + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + }) +} + // kafka发送消息 // // POST /kafka/send @@ -185,51 +283,3 @@ func (s *TestController) OssUpload(c *gin.Context) { "msg": msg, }) } - -// OSS文件上传测试上报 -// -// POST /oss/testUp -func (s *TestController) OssTestUp(c *gin.Context) { - language := ctx.AcceptLanguage(c) - var body struct { - RequestId string `json:"requestId" binding:"required"` - OperType string `json:"operType" binding:"required"` - } - if err := c.ShouldBindJSON(&body); err != nil { - c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) - return - } - - if body.OperType == "config" { - err := s.configService.ConfigUploadOSS("OMC") - c.JSON(200, map[string]any{ - "requestId": body.RequestId, - "operType": body.OperType, - "err": err, - }) - return - } - if body.OperType == "performance" { - err := s.performanceService.PerformanceUploadOSS("AMF") - c.JSON(200, map[string]any{ - "requestId": body.RequestId, - "operType": body.OperType, - "err": err, - }) - return - } - if body.OperType == "resource" { - err := s.resourceService.ResourceeUploadOSS("AMF") - c.JSON(200, map[string]any{ - "requestId": body.RequestId, - "operType": body.OperType, - "err": err, - }) - return - } - - c.JSON(200, map[string]any{ - "requestId": body.RequestId, - "operType": body.OperType, - }) -} diff --git a/src/modules/nms_cxy/service/performance.go b/src/modules/nms_cxy/service/performance.go index 1b56321..cb23a7a 100644 --- a/src/modules/nms_cxy/service/performance.go +++ b/src/modules/nms_cxy/service/performance.go @@ -3,5 +3,5 @@ package service // 性能数据处理服务 服务层接口 type IPerformance interface { // PerformanceUploadOSS 性能数据上报 - PerformanceUploadOSS(neType string) error + PerformanceUploadOSS(neType, startTime, endTime string) error } diff --git a/src/modules/nms_cxy/service/performance.impl.go b/src/modules/nms_cxy/service/performance.impl.go index b85659f..638dc66 100644 --- a/src/modules/nms_cxy/service/performance.impl.go +++ b/src/modules/nms_cxy/service/performance.impl.go @@ -21,29 +21,49 @@ type PerformanceImpl struct { } // PerformanceUploadOSS 性能数据上报 -func (s *PerformanceImpl) PerformanceUploadOSS(neType string) error { +// 性能数据文件的上报周期为15分钟。 +func (s *PerformanceImpl) PerformanceUploadOSS(neType, startTime, endTime string) error { + dataArr := [][]string{} + + // 标题行 + titleData := s.perfKPIService.SelectGoldKPITitle(neType) + titleArr := []string{} + for _, v := range titleData { + titleArr = append(titleArr, v.KPIID) + } + titleArr = append(titleArr, "neType", "neName", "startIndex", "timeGroup") + dataArr = append(dataArr, titleArr) + + // 数据行 query := neDataModel.GoldKPIQuery{ NeType: neType, - StartTime: "2024-02-15 10:16:30", - EndTime: "2024-03-07 10:20:08", + StartTime: startTime, + EndTime: endTime, Interval: 900, } - data := s.perfKPIService.SelectGoldKPI(query) - dataArr := [][]string{} - for i, v := range data { - if i == 0 { - dataItem := []string{} - dataItem[0] = fmt.Sprint(v["AMF.25"]) - dataArr = append(dataArr, dataItem) + for _, v := range data { + itemLen := len(v) + dataItem := make([]string, len(v)) + + for j, titleStr := range titleArr { + if strV, ok := v[titleStr]; ok && strV != nil { + fmt.Println("titleArr ", titleStr, strV) + dataItem[j] = fmt.Sprint(strV) + } } - dataItem := []string{} - dataItem[0] = fmt.Sprint(v["AMF.25"]) + + dataItem[itemLen-4] = fmt.Sprint(v["neType"]) + dataItem[itemLen-3] = fmt.Sprint(v["neName"]) + dataItem[itemLen-2] = fmt.Sprint(v["startIndex"]) + dataItem[itemLen-1] = fmt.Sprint(v["timeGroup"]) dataArr = append(dataArr, dataItem) } // 文件名 - dataSaveFileName := common.DataSaveFileName{} + dataSaveFileName := common.DataSaveFileName{ + ResCode: neType, + } fileName := dataSaveFileName.CSV() filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName) err := file.WriterFileCSV(dataArr, filePath) diff --git a/src/modules/nms_cxy/service/resource.go b/src/modules/nms_cxy/service/resource.go index 885583c..3d7a212 100644 --- a/src/modules/nms_cxy/service/resource.go +++ b/src/modules/nms_cxy/service/resource.go @@ -4,4 +4,9 @@ package service type IResource interface { // ResourceeUploadOSS 资源数据上报 ResourceeUploadOSS(neType string) error + + // KafkaPush 推送数据Kafka + // + // acronyms 资源对象类型简称,没有就填空字符串 + KafkaPush(neType string, acronyms string, data any) error } diff --git a/src/modules/nms_cxy/service/resource.impl.go b/src/modules/nms_cxy/service/resource.impl.go index aa07e56..e25dba7 100644 --- a/src/modules/nms_cxy/service/resource.impl.go +++ b/src/modules/nms_cxy/service/resource.impl.go @@ -1,10 +1,13 @@ package service import ( + "encoding/json" "fmt" "nms_nbi/src/framework/constants/uploadsubpath" + "nms_nbi/src/framework/logger" "nms_nbi/src/framework/utils/file" "nms_nbi/src/modules/nms_cxy/utils/common" + "nms_nbi/src/modules/nms_cxy/utils/kafka" ) // 实例化数据层 ResourceImpl 结构体 @@ -14,19 +17,60 @@ var NewResourceImpl = &ResourceImpl{} type ResourceImpl struct{} // ResourceeUploadOSS 资源数据上报 +// OMC在每天的0时及12时上报网元的全量资源数据文件至OSS func (s *ResourceImpl) ResourceeUploadOSS(neType string) error { - // 配置文件目录 - dir := "C:\\usr\\local\\omc\\upload\\export\\2023\\12" + // 资源数据 + dataArr := []any{} + dataArr = append(dataArr, map[string]any{ + "RUID": "WXZX001CLL0000115349200113", + "UserLabel": "cell1000", + "CellBarred": 0, + "UeInactiveTimer": 10000, + "SupportRRCNumbers": 200, + }) // 文件名 - dataSaveFileName := common.DataSaveFileName{} - fileName := dataSaveFileName.ZIP() + dataSaveFileName := common.DataSaveFileName{ + ResCode: neType, + } + fileName := dataSaveFileName.JSON() filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName) - - err := file.CompressZipByDir(filePath, dir) + // 数据写入json文件 + err := file.WriterFileJSONLine(dataArr, filePath) if err != nil { return err } - - return common.UploadOSSByZip(filePath, neType, "CM") + return common.UploadOSSByJSONToZip(filePath, neType, "NRM") +} + +// KafkaPush 推送数据Kafka +// +// acronyms 资源对象类型简称,没有就填空字符串 +func (s *ResourceImpl) KafkaPush(neType string, acronyms string, data any) error { + if neType == "" { + return nil + } + + // 数据序列化 + bytes, err := json.Marshal(data) + if err != nil { + logger.Errorf("KafkaPush ResourceChangeData err => %s", err.Error()) + return err + } + + // 订阅topic,名称为:专业编码-厂商编码-OMC编号-网元类型-数据类别(固定为NRM)-资源对象类型简称 + basePath := common.BasePath("-") + topic := fmt.Sprintf("%s-%s-NRM", basePath, neType) + if acronyms != "" { + topic = topic + "-" + acronyms + } + + // 发送消息 + partition, offset, err := kafka.KInitConm.MessageSyncSend(topic, 0, string(bytes)) + if err != nil { + logger.Errorf("KafkaPush MessageSyncSend err => %s", err.Error()) + return err + } + logger.Infof("KafkaPush MessageSyncSend Partition:%d, Offset:%d", partition, offset) + return err }