feat: 性能数据上报/资源数据上报

This commit is contained in:
TsMask
2024-03-19 11:24:18 +08:00
parent 66e0f1358d
commit ba6824e623
5 changed files with 189 additions and 70 deletions

View File

@@ -6,6 +6,7 @@ import (
"nms_nbi/src/framework/utils/ctx"
"nms_nbi/src/framework/vo/result"
neDataModel "nms_nbi/src/modules/network_data/model"
neDataService "nms_nbi/src/modules/network_data/service"
neService "nms_nbi/src/modules/network_element/service"
"nms_nbi/src/modules/nms_cxy/service"
"nms_nbi/src/modules/nms_cxy/utils/kafka"
@@ -17,6 +18,7 @@ import (
// 实例化控制层 TestController 结构体
var NewTest = &TestController{
neInfoService: neService.NewNeInfoImpl,
alarmDataService: neDataService.NewAlarmImpl,
alarmService: service.NewAlarmImpl,
configService: service.NewConfigImpl,
performanceService: service.NewPerformanceImpl,
@@ -29,6 +31,8 @@ var NewTest = &TestController{
type TestController struct {
// 网元信息服务
neInfoService neService.INeInfo
// 告警信息服务
alarmDataService neDataService.IAlarm
// 告警数据服务
alarmService service.IAlarm
// 配置数据处理服务
@@ -39,6 +43,100 @@ type TestController struct {
resourceService service.IResource
}
// 默认测试
//
// POST /
func (s *TestController) Demo(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
OperType string `json:"operType" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if body.OperType == "alarm-kafka" {
data := s.alarmDataService.SelectPage(neDataModel.AlarmQuery{
PageNum: 1,
PageSize: 1,
SortField: "timestamp",
SortOrder: "desc",
})
rows := data["rows"].([]neDataModel.Alarm)
if len(rows) > 0 {
err := s.alarmService.KafkaPush(rows[0])
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
}
if body.OperType == "config" {
err := s.configService.ConfigUploadOSS("OMC")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
if body.OperType == "performance" {
err := s.performanceService.PerformanceUploadOSS("AMF", "2024-02-15 10:16:30", "2024-03-07 10:20:08")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
if body.OperType == "resource" {
err := s.resourceService.ResourceeUploadOSS("AMF")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
if body.OperType == "resource-kafka" {
err := s.resourceService.KafkaPush("AMF", "", map[string]any{
"MsgID": "1",
"Ruid": "CZZY001BRTabcdefgh12345678",
"ChangeTime": "2024-03-15 08:34:24",
"ChangeType": "add",
"EvpnT5.rd": "100:100",
"EvpnT5.ipversion": "ipv4",
"EvpnT5.prefix": "10.0.0.0/24",
"EvpnT5.ESI": 0,
"EvpnT5.ethertagid": 0,
"EvpnT5.gwipaddress": "0.0.0.0",
"EvpnT5.mplslable": 3,
"EvpnT5.originator": "192.168.1.1",
"EvpnT5.nexthop": "8000::1",
"EvpnT5.aspath": "",
"EvpnT5.localpreference": 10,
"EvpnT5.community": "",
"EvpnT5.extcommunity": "rt:100:100 color:10",
"EvpnT5.srv6sid": "8000::1:100",
})
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
})
}
// kafka发送消息
//
// POST /kafka/send
@@ -185,51 +283,3 @@ func (s *TestController) OssUpload(c *gin.Context) {
"msg": msg,
})
}
// OSS文件上传测试上报
//
// POST /oss/testUp
func (s *TestController) OssTestUp(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
OperType string `json:"operType" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if body.OperType == "config" {
err := s.configService.ConfigUploadOSS("OMC")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
if body.OperType == "performance" {
err := s.performanceService.PerformanceUploadOSS("AMF")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
if body.OperType == "resource" {
err := s.resourceService.ResourceeUploadOSS("AMF")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
})
}

View File

@@ -3,5 +3,5 @@ package service
// 性能数据处理服务 服务层接口
type IPerformance interface {
// PerformanceUploadOSS 性能数据上报
PerformanceUploadOSS(neType string) error
PerformanceUploadOSS(neType, startTime, endTime string) error
}

View File

@@ -21,29 +21,49 @@ type PerformanceImpl struct {
}
// PerformanceUploadOSS 性能数据上报
func (s *PerformanceImpl) PerformanceUploadOSS(neType string) error {
// 性能数据文件的上报周期为15分钟。
func (s *PerformanceImpl) PerformanceUploadOSS(neType, startTime, endTime string) error {
dataArr := [][]string{}
// 标题行
titleData := s.perfKPIService.SelectGoldKPITitle(neType)
titleArr := []string{}
for _, v := range titleData {
titleArr = append(titleArr, v.KPIID)
}
titleArr = append(titleArr, "neType", "neName", "startIndex", "timeGroup")
dataArr = append(dataArr, titleArr)
// 数据行
query := neDataModel.GoldKPIQuery{
NeType: neType,
StartTime: "2024-02-15 10:16:30",
EndTime: "2024-03-07 10:20:08",
StartTime: startTime,
EndTime: endTime,
Interval: 900,
}
data := s.perfKPIService.SelectGoldKPI(query)
dataArr := [][]string{}
for i, v := range data {
if i == 0 {
dataItem := []string{}
dataItem[0] = fmt.Sprint(v["AMF.25"])
dataArr = append(dataArr, dataItem)
for _, v := range data {
itemLen := len(v)
dataItem := make([]string, len(v))
for j, titleStr := range titleArr {
if strV, ok := v[titleStr]; ok && strV != nil {
fmt.Println("titleArr ", titleStr, strV)
dataItem[j] = fmt.Sprint(strV)
}
}
dataItem := []string{}
dataItem[0] = fmt.Sprint(v["AMF.25"])
dataItem[itemLen-4] = fmt.Sprint(v["neType"])
dataItem[itemLen-3] = fmt.Sprint(v["neName"])
dataItem[itemLen-2] = fmt.Sprint(v["startIndex"])
dataItem[itemLen-1] = fmt.Sprint(v["timeGroup"])
dataArr = append(dataArr, dataItem)
}
// 文件名
dataSaveFileName := common.DataSaveFileName{}
dataSaveFileName := common.DataSaveFileName{
ResCode: neType,
}
fileName := dataSaveFileName.CSV()
filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName)
err := file.WriterFileCSV(dataArr, filePath)

View File

@@ -4,4 +4,9 @@ package service
type IResource interface {
// ResourceeUploadOSS 资源数据上报
ResourceeUploadOSS(neType string) error
// KafkaPush 推送数据Kafka
//
// acronyms 资源对象类型简称,没有就填空字符串
KafkaPush(neType string, acronyms string, data any) error
}

View File

@@ -1,10 +1,13 @@
package service
import (
"encoding/json"
"fmt"
"nms_nbi/src/framework/constants/uploadsubpath"
"nms_nbi/src/framework/logger"
"nms_nbi/src/framework/utils/file"
"nms_nbi/src/modules/nms_cxy/utils/common"
"nms_nbi/src/modules/nms_cxy/utils/kafka"
)
// 实例化数据层 ResourceImpl 结构体
@@ -14,19 +17,60 @@ var NewResourceImpl = &ResourceImpl{}
type ResourceImpl struct{}
// ResourceeUploadOSS 资源数据上报
// OMC在每天的0时及12时上报网元的全量资源数据文件至OSS
func (s *ResourceImpl) ResourceeUploadOSS(neType string) error {
// 配置文件目录
dir := "C:\\usr\\local\\omc\\upload\\export\\2023\\12"
// 资源数据
dataArr := []any{}
dataArr = append(dataArr, map[string]any{
"RUID": "WXZX001CLL0000115349200113",
"UserLabel": "cell1000",
"CellBarred": 0,
"UeInactiveTimer": 10000,
"SupportRRCNumbers": 200,
})
// 文件名
dataSaveFileName := common.DataSaveFileName{}
fileName := dataSaveFileName.ZIP()
dataSaveFileName := common.DataSaveFileName{
ResCode: neType,
}
fileName := dataSaveFileName.JSON()
filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName)
err := file.CompressZipByDir(filePath, dir)
// 数据写入json文件
err := file.WriterFileJSONLine(dataArr, filePath)
if err != nil {
return err
}
return common.UploadOSSByZip(filePath, neType, "CM")
return common.UploadOSSByJSONToZip(filePath, neType, "NRM")
}
// KafkaPush 推送数据Kafka
//
// acronyms 资源对象类型简称,没有就填空字符串
func (s *ResourceImpl) KafkaPush(neType string, acronyms string, data any) error {
if neType == "" {
return nil
}
// 数据序列化
bytes, err := json.Marshal(data)
if err != nil {
logger.Errorf("KafkaPush ResourceChangeData err => %s", err.Error())
return err
}
// 订阅topic名称为专业编码-厂商编码-OMC编号-网元类型-数据类别固定为NRM-资源对象类型简称
basePath := common.BasePath("-")
topic := fmt.Sprintf("%s-%s-NRM", basePath, neType)
if acronyms != "" {
topic = topic + "-" + acronyms
}
// 发送消息
partition, offset, err := kafka.KInitConm.MessageSyncSend(topic, 0, string(bytes))
if err != nil {
logger.Errorf("KafkaPush MessageSyncSend err => %s", err.Error())
return err
}
logger.Infof("KafkaPush MessageSyncSend Partition:%d, Offset:%d", partition, offset)
return err
}