From 88a6375b1816f34e9453c849e5ad15d688df4c36 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Tue, 15 Jul 2025 15:07:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9Eoam=E5=AF=B9=E5=A4=96?= =?UTF-8?q?=E5=BC=80=E6=94=BE=E6=97=A0=E9=99=90=E5=88=B6=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/modules/oam/controller/api_rest.go | 672 +++++++++++++++++++++++++ src/modules/oam/oam.go | 40 ++ src/modules/oam/service/kpi.go | 236 +++++++++ src/modules/oam/service/nb_state.go | 67 +++ 4 files changed, 1015 insertions(+) create mode 100644 src/modules/oam/controller/api_rest.go create mode 100644 src/modules/oam/oam.go create mode 100644 src/modules/oam/service/kpi.go create mode 100644 src/modules/oam/service/nb_state.go diff --git a/src/modules/oam/controller/api_rest.go b/src/modules/oam/controller/api_rest.go new file mode 100644 index 00000000..1da4e89a --- /dev/null +++ b/src/modules/oam/controller/api_rest.go @@ -0,0 +1,672 @@ +package controller + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/gin-gonic/gin" + "github.com/tsmask/go-oam" + + "be.ems/src/framework/logger" + "be.ems/src/framework/resp" + "be.ems/src/framework/utils/date" + "be.ems/src/framework/utils/parse" + neFetchlink "be.ems/src/modules/network_element/fetch_link" + neModel "be.ems/src/modules/network_element/model" + neService "be.ems/src/modules/network_element/service" + oamService "be.ems/src/modules/oam/service" +) + +// NewAPIRest 实例化控制层 +var NewAPIRest = &APIRestController{} + +// APIRestController 北向定义 控制层处理 +// +// PATH /api/rest +type APIRestController struct{} + +// ResolveAlarm 接收告警 +// +// POST /faultManagement/v1/elementType/:elementTypeValue/objectType/alarms +func (s APIRestController) ResolveAlarm(c *gin.Context) { + var body []struct { + AlarmSeq int `json:"alarmSeq"` + AlarmId string `json:"alarmId"` + NeId string `json:"neId"` // 收到实际是rmUID + AlarmCode int `json:"alarmCode"` + AlarmTitle string `json:"alarmTitle"` + EventTime string `json:"eventTime"` + AlarmType string `json:"alarmType"` + OrigSeverity string `json:"origSeverity"` + PerceivedSeverity string `json:"perceivedSeverity"` + PVFlag string `json:"pvFlag"` + NeName string `json:"neName"` + NeType string `json:"neType"` + ObjectUid string `json:"objectUid"` + ObjectName string `json:"objectName"` + ObjectType string `json:"objectType"` + LocationInfo string `json:"locationInfo"` + Province string `json:"province"` + AlarmStatus int `json:"alarmStatus"` + SpecificProblem string `json:"specificProblem"` + SpecificProblemID string `json:"specificProblemID"` + AddInfo string `json:"addInfo"` + } + if err := c.ShouldBindBodyWithJSON(&body); err != nil { + errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) + return + } + elementTypeValue := c.Param("elementTypeValue") + + // alarmTypeValue 映射值 + alarmTypeValue := func(str string) string { + arr := []string{ + oam.ALARM_TYPE_COMMUNICATION_ALARM, + oam.ALARM_TYPE_EQUIPMENT_ALARM, + oam.ALARM_TYPE_PROCESSING_FAILURE, + oam.ALARM_TYPE_ENVIRONMENTAL_ALARM, + oam.ALARM_TYPE_QUALITY_OF_SERVICE_ALARM, + } + for k, v := range arr { + if v == str { + return v + } + if fmt.Sprint(k+1) == str { + return v + } + } + return str + } + + // origSeverityValue 映射值 + origSeverityValue := func(str string) string { + arr := []string{ + oam.ALARM_SEVERITY_CRITICAL, + oam.ALARM_SEVERITY_MAJOR, + oam.ALARM_SEVERITY_MINOR, + oam.ALARM_SEVERITY_WARNING, + oam.ALARM_SEVERITY_EVENT, + } + for k, v := range arr { + if v == str { + return v + } + if fmt.Sprint(k+1) == str { + return v + } + } + return str + } + + // alarmStatusValue 映射值 + alarmStatusValue := func(value int) string { + arr := []string{ + oam.ALARM_STATUS_CLEAR, + oam.ALARM_STATUS_ACTIVE, + } + for k, v := range arr { + if k == value { + return v + } + } + return oam.ALARM_STATUS_ACTIVE + } + + alarmArr := make([]oam.Alarm, 0) + for _, v := range body { + if !strings.EqualFold(v.NeType, elementTypeValue) { + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType")) + return + } + // 产生时间 + eventTime := date.ParseStrToDate(v.EventTime, time.RFC3339) + // 创建告警 + alarm := oam.Alarm{ + NeUid: v.NeId, // 网元唯一标识 + AlarmTime: eventTime.UnixMilli(), // 事件产生时间 + AlarmId: v.AlarmId, // 告警ID 唯一,清除时对应 + AlarmCode: v.AlarmCode, // 告警状态码 + AlarmType: alarmTypeValue(v.AlarmType), // 告警类型 + AlarmTitle: v.AlarmTitle, // 告警标题 + PerceivedSeverity: origSeverityValue(v.OrigSeverity), // 告警级别 + AlarmStatus: alarmStatusValue(v.AlarmStatus), // 告警状态 + SpecificProblem: v.SpecificProblem, // 告警问题原因 + SpecificProblemID: v.SpecificProblemID, // 告警问题原因ID + AddInfo: v.AddInfo, // 告警辅助信息 + LocationInfo: v.LocationInfo, // 告警定位信息 + } + alarmArr = append(alarmArr, alarm) + } + + errArr := make([]string, 0) + for _, alarm := range alarmArr { + if err := oamService.NewAlarm.Resolve(alarm); err != nil { + errArr = append(errArr, err.Error()) + } + } + + if len(errArr) > 0 { + c.JSON(200, resp.ErrData(errArr)) + return + } + c.JSON(200, resp.Ok(nil)) +} + +// ResolveCDR 接收话单 +// +// POST /cdrManagement/v1/elementType/:elementTypeValue/objectType/cdrEvent +func (s APIRestController) ResolveCDR(c *gin.Context) { + var body struct { + NeType string `json:"neType" ` + NeName string `json:"neName" ` + RmUID string `json:"rmUID" ` + Timestamp int `json:"timestamp" ` + CDR map[string]any `json:"CDR" ` + } + if err := c.ShouldBindBodyWithJSON(&body); err != nil { + errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) + return + } + elementTypeValue := c.Param("elementTypeValue") + if !strings.EqualFold(body.NeType, elementTypeValue) { + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType")) + return + } + + recordTime := time.Now() + if body.Timestamp > 1e12 { + recordTime = time.UnixMilli(int64(body.Timestamp)) + } else if body.Timestamp > 1e9 { + recordTime = time.Unix(int64(body.Timestamp), 0) + } + // 创建CDR + cdr := oam.CDR{ + NeUid: body.RmUID, // 网元唯一标识 + RecordTime: recordTime.UnixMilli(), // 记录时间 时间戳毫秒,Push时自动填充 + Data: body.CDR, // 话单信息 + } + if err := oamService.NewCDR.Resolve(cdr); err != nil { + c.JSON(200, resp.ErrMsg(err.Error())) + return + } + c.JSON(200, resp.Ok(nil)) +} + +// ResolveKPI 接收KPI +// +// POST /performanceManagement/v1/elementType/:elementTypeValue/objectType/kpiReport/:index +func (s APIRestController) ResolveKPI(c *gin.Context) { + var body struct { + Timestamp string `json:"TimeStamp" binding:"required"` + Task struct { + Period struct { + StartTime string `json:"StartTime"` + EndTime string `json:"EndTime"` + } `json:"Period" binding:"required"` + NE struct { + NEName string `json:"NEName"` + RmUID string `json:"rmUID"` + NeType string `json:"NeType"` + KPIs []struct { + KPIID string `json:"KPIID"` + Value int64 `json:"Value"` + Err string `json:"Err"` + } `json:"KPIs" binding:"required"` + } `json:"NE" binding:"required"` + } `json:"Task" binding:"required"` + } + if err := c.ShouldBindBodyWithJSON(&body); err != nil { + errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) + return + } + elementTypeValue := c.Param("elementTypeValue") + if !strings.EqualFold(body.Task.NE.NeType, elementTypeValue) { + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType")) + return + } + // index := c.Param("index") + + timestamp := body.Timestamp + taskPeriod := body.Task.Period + taskNeKPIs := body.Task.NE.KPIs + // 时间数据处理 + receiverTime := date.ParseStrToDate(timestamp, date.YYYY_MM_DDTHH_MM_SSZ) + startTime := date.ParseStrToDate(taskPeriod.StartTime, date.YYYY_MM_DDTHH_MM_SSZ) + endTime := date.ParseStrToDate(taskPeriod.EndTime, date.YYYY_MM_DDTHH_MM_SSZ) + granularity := parse.Number(endTime.Sub(startTime).Seconds()) + // kpi data数据 + KpiValues := make(map[string]float64, 0) + for _, v := range taskNeKPIs { + KpiValues[v.KPIID] = float64(v.Value) + } + + // 创建KPI + kpi := oam.KPI{ + NeUid: body.Task.NE.RmUID, // 网元唯一标识 + RecordTime: receiverTime.UnixMilli(), // 记录时间 时间戳毫秒,Push时自动填充 + Granularity: granularity, // 时间间隔 5/10/.../60/300 (秒) + Data: KpiValues, // 指标信息 + } + if err := oamService.NewKPI.Resolve(kpi); err != nil { + c.JSON(200, resp.ErrMsg(err.Error())) + return + } + c.JSON(200, resp.Ok(nil)) +} + +// ResolveNBState 接收基站状态变更 +// +// POST /ueManagement/v1/elementType/:elementTypeValue/objectType/nbState +func (s APIRestController) ResolveNBState(c *gin.Context) { + var body struct { + NeType string `json:"neType" ` + NeName string `json:"neName" ` + RmUID string `json:"rmUID"` + StateList []struct { + Address string `json:"address" ` + Name string `json:"name" ` + Position string `json:"position" ` + NbName string `json:"nbName" ` + State string `json:"state" ` // "OFF" or "ON" + OffTime string `json:"offTime" ` //if State=OFF, will set it + OnTime string `json:"onTime" ` //if State=ON , will set it + } + } + if err := c.ShouldBindBodyWithJSON(&body); err != nil { + errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) + return + } + elementTypeValue := c.Param("elementTypeValue") + if !strings.EqualFold(body.NeType, elementTypeValue) { + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType")) + return + } + + if len(body.StateList) == 0 { + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "no stateList")) + return + } + + nbStateArr := make([]oam.NBState, 0) + for _, v := range body.StateList { + if v.Address == "" || v.State == "" { + continue + } + stateTime := date.ParseStrToDate(v.OffTime, time.RFC3339) + stateStr := oam.NB_STATE_OFF + if v.State == "ON" { + stateTime = date.ParseStrToDate(v.OnTime, time.RFC3339) + stateStr = oam.NB_STATE_ON + } + + // 创建NbState + nbState := oam.NBState{ + NeUid: body.RmUID, // 网元唯一标识 + RecordTime: time.Now().UnixMilli(), // 记录时间 时间戳毫秒,Push时自动填充 + Address: v.Address, // 基站地址 + DeviceName: v.NbName, // 基站设备名称 + State: stateStr, // 基站状态 ON/OFF + StateTime: stateTime.UnixMilli(), // 基站状态时间 时间戳毫秒 + Name: v.Name, // 基站名称 网元标记 + Position: v.Position, // 基站位置 网元标记 + } + nbStateArr = append(nbStateArr, nbState) + } + + errArr := make([]string, 0) + for _, nbState := range nbStateArr { + if err := oamService.NewNBState.Resolve(nbState); err != nil { + errArr = append(errArr, err.Error()) + } + } + + if len(errArr) > 0 { + c.JSON(200, resp.ErrData(errArr)) + return + } + c.JSON(200, resp.Ok(nil)) +} + +// ResolveUENB 接收终端接入基站 +// +// POST /logManagement/v1/elementType/:elementTypeValue/objectType/ueEvent +func (s APIRestController) ResolveUENB(c *gin.Context) { + var body struct { + NeType string `json:"neType" ` + NeName string `json:"neName" ` + RmUID string `json:"rmUID" ` + Timestamp int64 `json:"timestamp" ` + EventType string `json:"eventType" ` + EventJson map[string]any `json:"eventJSON" ` + } + if err := c.ShouldBindBodyWithJSON(&body); err != nil { + errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) + return + } + elementTypeValue := c.Param("elementTypeValue") + if !strings.EqualFold(body.NeType, elementTypeValue) { + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType")) + return + } + + // 记录时间 + recordTime := time.Now() + if body.Timestamp > 1e12 { + recordTime = time.UnixMilli(int64(body.Timestamp)) + } else if body.Timestamp > 1e9 { + recordTime = time.Unix(int64(body.Timestamp), 0) + } + + // 创建UENB + uenb := oam.UENB{ + NeUid: body.RmUID, // 网元唯一标识 + RecordTime: recordTime.UnixMilli(), // 记录时间 + NBId: "0", // 基站ID + CellId: "0", // 小区ID + TAC: "", // TAC + IMSI: "", // IMSI + Result: oam.UENB_RESULT_AUTH_SUCCESS, // 结果值 + Type: oam.UENB_TYPE_DETACH, // 终端接入基站类型 + } + + // 基站ID + if v, ok := body.EventJson["eNBID"]; ok && v != nil { + uenb.NBId = fmt.Sprint(v) + } + if v, ok := body.EventJson["gNBID"]; ok && v != nil { + uenb.NBId = fmt.Sprint(v) + } + // 小区ID + if v, ok := body.EventJson["cellID"]; ok && v != nil { + uenb.CellId = fmt.Sprint(v) + } + // TAC + if v, ok := body.EventJson["tacID"]; ok && v != nil { + uenb.TAC = fmt.Sprint(v) + } + // IMSI + if v, ok := body.EventJson["imsi"]; ok && v != nil { + uenb.IMSI = fmt.Sprint(v) + } + // 结果值 + if v, ok := body.EventJson["result"]; ok && v != nil { + uenb.Result = fmt.Sprint(v) + } + // 终端接入基站类型 + if v, ok := body.EventJson["type"]; ok && v != nil { + switch v := fmt.Sprint(v); v { + case "detach": + uenb.Type = oam.UENB_TYPE_DETACH + case "auth-result": + uenb.Type = oam.UENB_TYPE_AUTH + case "cm-state": + uenb.Type = oam.UENB_TYPE_CM + } + } + + if err := oamService.NewUENB.Resolve(uenb); err != nil { + c.JSON(200, resp.ErrMsg(err.Error())) + return + } + c.JSON(200, resp.Ok(nil)) +} + +// ResolveUENBByAMF 接收终端接入基站-AMF +// +// POST /upload-ue/v1/:eventType +func (s APIRestController) ResolveUENBByAMF(c *gin.Context) { + var body map[string]any + if err := c.ShouldBindBodyWithJSON(&body); err != nil { + errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err)) + c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs)) + return + } + + // 创建UENB + uenb := oam.UENB{ + NeUid: "4400HXAMF001", // 网元唯一标识 + RecordTime: 0, // 记录时间 + NBId: "0", // 基站ID + CellId: "0", // 小区ID + TAC: "", // TAC + IMSI: "", // IMSI + Result: oam.UENB_RESULT_AUTH_SUCCESS, // 结果值 + Type: oam.UENB_TYPE_DETACH, // 终端接入基站类型 + } + + // 从eventJson中获取rmUID + if v, ok := body["rmUID"]; ok { + uenb.NeUid = fmt.Sprint(v) + } + + // 统一格式 + eventType := c.Param("eventType") + switch eventType { + case "auth-result": + // {"authCode":"200","authMessage":"成功","authTime":"2024-12-07 16:48:37","cellID":"3","gNBID":"1","imsi":"460002082100000","onlineNumber":1,"tacID":"81"} + if v, ok := body["imsi"]; ok { + uenb.IMSI = fmt.Sprint(v) + } + if v, ok := body["cellID"]; ok { + uenb.CellId = fmt.Sprint(v) + } + if v, ok := body["gNBID"]; ok { + uenb.NBId = fmt.Sprint(v) + } + if v, ok := body["tacID"]; ok { + uenb.TAC = fmt.Sprint(v) + } + + if v, ok := body["authCode"]; ok { + uenb.Result = fmt.Sprint(v) + } + if v, ok := body["authTime"]; ok { + authTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS) + uenb.RecordTime = authTime.UnixMilli() + } + uenb.Type = oam.UENB_TYPE_AUTH + case "detach": + // {"detachResult":0,"detachTime":"2024-12-07 18:00:47","imsi":"460002082100000"} + if v, ok := body["imsi"]; ok { + uenb.IMSI = fmt.Sprint(v) + } + if v, ok := body["detachResult"]; ok { + if v == "0" { + uenb.Result = oam.UENB_RESULT_AUTH_SUCCESS + } else { + uenb.Result = fmt.Sprint(v) + } + } + if v, ok := body["detachTime"]; ok { + detachTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS) + uenb.RecordTime = detachTime.UnixMilli() + } + uenb.Type = oam.UENB_TYPE_DETACH + case "cm-state": + // {"changeTime":"2024-12-07 17:07:52","imsi":"460002082100000","onlineNumber":1,"status":2} + if v, ok := body["imsi"]; ok { + uenb.IMSI = fmt.Sprint(v) + } + if v, ok := body["status"]; ok { + uenb.Result = fmt.Sprint(v) + } + if v, ok := body["changeTime"]; ok { + changeTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS) + uenb.RecordTime = changeTime.UnixMilli() + } + uenb.Type = oam.UENB_TYPE_CM + } + + if err := oamService.NewUENB.Resolve(uenb); err != nil { + c.JSON(200, resp.ErrMsg(err.Error())) + return + } + c.JSON(200, resp.Ok(nil)) +} + +// ResolveAlarmHistory 拉取告警历史 +// +// GET /faultManagement/v1/elementType/:elementTypeValue/objectType/alarms +func (s APIRestController) ResolveAlarmHistory(c *gin.Context) { + elementTypeValue := c.Param("elementTypeValue") + + // Get alarms from OMC return 204 + if strings.ToLower(elementTypeValue) == "omc" { + c.JSON(200, resp.OkMsg("omc alarms no content")) + return + } + + // alarmTypeValue 映射值 + alarmTypeValue := func(str string) string { + arr := []string{ + oam.ALARM_TYPE_COMMUNICATION_ALARM, + oam.ALARM_TYPE_EQUIPMENT_ALARM, + oam.ALARM_TYPE_PROCESSING_FAILURE, + oam.ALARM_TYPE_ENVIRONMENTAL_ALARM, + oam.ALARM_TYPE_QUALITY_OF_SERVICE_ALARM, + } + for k, v := range arr { + if v == str { + return v + } + if fmt.Sprint(k+1) == str { + return v + } + } + return str + } + + // origSeverityValue 映射值 + origSeverityValue := func(str string) string { + arr := []string{ + oam.ALARM_SEVERITY_CRITICAL, + oam.ALARM_SEVERITY_MAJOR, + oam.ALARM_SEVERITY_MINOR, + oam.ALARM_SEVERITY_WARNING, + oam.ALARM_SEVERITY_EVENT, + } + for k, v := range arr { + if v == str { + return v + } + if fmt.Sprint(k+1) == str { + return v + } + } + return str + } + + // alarmStatusValue 映射值 + alarmStatusValue := func(value int) string { + arr := []string{ + oam.ALARM_STATUS_CLEAR, + oam.ALARM_STATUS_ACTIVE, + } + for k, v := range arr { + if k == value { + return v + } + } + return oam.ALARM_STATUS_ACTIVE + } + + alarmArr := make([]oam.Alarm, 0) + type body struct { + AlarmSeq int `json:"alarmSeq"` + AlarmId string `json:"alarmId"` + NeId string `json:"neId"` // 收到实际是rmUID + AlarmCode int `json:"alarmCode"` + AlarmTitle string `json:"alarmTitle"` + EventTime string `json:"eventTime"` + AlarmType string `json:"alarmType"` + OrigSeverity string `json:"origSeverity"` + PerceivedSeverity string `json:"perceivedSeverity"` + PVFlag string `json:"pvFlag"` + NeName string `json:"neName"` + NeType string `json:"neType"` + ObjectUid string `json:"objectUid"` + ObjectName string `json:"objectName"` + ObjectType string `json:"objectType"` + LocationInfo string `json:"locationInfo"` + Province string `json:"province"` + AlarmStatus int `json:"alarmStatus"` + SpecificProblem string `json:"specificProblem"` + SpecificProblemID string `json:"specificProblemID"` + AddInfo string `json:"addInfo"` + } + parseItem := func(v body) oam.Alarm { + // 产生时间 + eventTime := date.ParseStrToDate(v.EventTime, time.RFC3339) + // 创建告警 + alarm := oam.Alarm{ + NeUid: v.NeId, // 网元唯一标识 + AlarmTime: eventTime.UnixMilli(), // 事件产生时间 + AlarmId: v.AlarmId, // 告警ID 唯一,清除时对应 + AlarmCode: v.AlarmCode, // 告警状态码 + AlarmType: alarmTypeValue(v.AlarmType), // 告警类型 + AlarmTitle: v.AlarmTitle, // 告警标题 + PerceivedSeverity: origSeverityValue(v.OrigSeverity), // 告警级别 + AlarmStatus: alarmStatusValue(v.AlarmStatus), // 告警状态 + SpecificProblem: v.SpecificProblem, // 告警问题原因 + SpecificProblemID: v.SpecificProblemID, // 告警问题原因ID + AddInfo: v.AddInfo, // 告警辅助信息 + LocationInfo: v.LocationInfo, // 告警定位信息 + } + return alarm + } + var neInfos []neModel.NeInfo + if elementTypeValue == "all" { + neInfos = neService.NewNeInfo.Find(neModel.NeInfo{}, false, false) + } else { + neInfos = neService.NewNeInfo.FindByNeType(strings.ToUpper(elementTypeValue)) + } + for _, neInfo := range neInfos { + data, err := neFetchlink.AlarmHistory(neInfo) + if err != nil { + logger.Errorf("failed to fetch alarm history:%s", err.Error()) + continue + } + if len(data) == 0 { + logger.Warnf("not found sync alarms %s", neInfo.RmUID) + continue + } + + bodyArr := make([]body, 0) + // 将 []map[string]any 序列化为 JSON 字符串 + jsonData, err := json.Marshal(data) + if err != nil { + logger.Errorf("marshal error: %s", err.Error()) + continue + } + // 反序列化到结构体 + err = json.Unmarshal(jsonData, &bodyArr) + if err != nil { + logger.Errorf("Error unmarshal error: %s", err.Error()) + continue + } + + for _, v := range bodyArr { + alarmArr = append(alarmArr, parseItem(v)) + } + } + + errArr := make([]string, 0) + for _, alarm := range alarmArr { + if err := oamService.NewAlarm.Resolve(alarm); err != nil { + errArr = append(errArr, err.Error()) + } + } + + if len(errArr) > 0 { + c.JSON(200, resp.ErrData(errArr)) + return + } + c.JSON(200, resp.Ok(nil)) +} diff --git a/src/modules/oam/oam.go b/src/modules/oam/oam.go new file mode 100644 index 00000000..5a116ecf --- /dev/null +++ b/src/modules/oam/oam.go @@ -0,0 +1,40 @@ +package oam + +import ( + "github.com/gin-gonic/gin" + "github.com/tsmask/go-oam" + + "be.ems/src/framework/logger" + "be.ems/src/modules/oam/controller" + "be.ems/src/modules/oam/service" +) + +// Setup 模块路由注册 +func Setup(router *gin.Engine) { + logger.Infof("开始加载 ====> oam 模块路由") + + // 网管接收端收告警 + oam.AlarmReceiveRoute(router, service.NewAlarm.Resolve) + // 网管接收端收终端接入基站 + oam.UENBReceiveRoute(router, service.NewUENB.Resolve) + // 网管接收端收基站状态 + oam.NBStateReceiveRoute(router, service.NewNBState.Resolve) + // 网管接收端收话单 + oam.CDRReceiveRoute(router, service.NewCDR.Resolve) + // 网管接收端收KPI + oam.KPIReceiveRoute(router, service.NewKPI.Resolve) + + // APIRest 北向接收 + aprRest := controller.NewAPIRest + aprRestGroup := router.Group("/api/rest") + { + aprRestGroup.GET("/faultManagement/v1/elementType/:elementTypeValue/objectType/alarms", aprRest.ResolveAlarmHistory) + aprRestGroup.POST("/faultManagement/v1/elementType/:elementTypeValue/objectType/alarms", aprRest.ResolveAlarm) + aprRestGroup.POST("/cdrManagement/v1/elementType/:elementTypeValue/objectType/cdrEvent", aprRest.ResolveCDR) + aprRestGroup.POST("/performanceManagement/v1/elementType/:elementTypeValue/objectType/kpiReport/:index", aprRest.ResolveKPI) + aprRestGroup.POST("/ueManagement/v1/elementType/:elementTypeValue/objectType/nbState", aprRest.ResolveNBState) + aprRestGroup.POST("/logManagement/v1/elementType/:elementTypeValue/objectType/ueEvent", aprRest.ResolveUENB) + router.POST("/upload-ue/v1/:eventType", aprRest.ResolveUENBByAMF) // AMF特殊上报 + } + +} diff --git a/src/modules/oam/service/kpi.go b/src/modules/oam/service/kpi.go new file mode 100644 index 00000000..3d73d8c4 --- /dev/null +++ b/src/modules/oam/service/kpi.go @@ -0,0 +1,236 @@ +package service + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/date" + "be.ems/src/framework/utils/expr" + "be.ems/src/framework/utils/parse" + "github.com/tsmask/go-oam" + + neDataModel "be.ems/src/modules/network_data/model" + neDataService "be.ems/src/modules/network_data/service" + neModel "be.ems/src/modules/network_element/model" + neService "be.ems/src/modules/network_element/service" + wsService "be.ems/src/modules/ws/service" +) + +// 实例化服务层 KPI 结构体 +var NewKPI = &KPI{ + neInfoService: neService.NewNeInfo, + wsService: wsService.NewWSSend, + kpiReportService: neDataService.NewKpiReport, + kpiCReportService: neDataService.NewKpiCReport, +} + +// KPI 消息处理 +type KPI struct { + neInfoService *neService.NeInfo + wsService *wsService.WSSend + kpiReportService *neDataService.KpiReport + kpiCReportService *neDataService.KpiCReport + neInfo neModel.NeInfo +} + +// Resolve 接收处理 +func (s *KPI) Resolve(k oam.KPI) error { + if len(k.Data) == 0 { + return fmt.Errorf("kpi data is nil") + } + // 是否存在网元 + s.neInfo = s.neInfoService.FindByRmuid(k.NeUid) + if s.neInfo.NeType == "" || s.neInfo.RmUID != k.NeUid { + logger.Warnf("resolve kpi network element does not exist %s", k.NeUid) + return fmt.Errorf("resolve kpi network element does not exist %s", k.NeUid) + } + + // 时间片 + curTime := time.Now() + curSeconds := curTime.Hour()*3600 + curTime.Minute()*60 + curTime.Second() + index := int64(curSeconds) / k.Granularity + + if err := s.saveKPIData(k, index); err != nil { + logger.Warnf("resolve kpi data fail %s", k.NeUid) + return err + } + if err := s.saveKPIDataC(k, index); err != nil { + logger.Warnf("resolve kpic data fail %s", k.NeUid) + return err + } + return nil +} + +// saveKPIData 存储KPI数据并推送到ws订阅组 +func (s *KPI) saveKPIData(k oam.KPI, index int64) error { + // 时间数据处理 + recordTime := time.Now() + if k.RecordTime > 1e12 { + recordTime = time.UnixMilli(k.RecordTime) + } else if k.RecordTime > 1e9 { + recordTime = time.Unix(k.RecordTime, 0) + } + recordDate := date.ParseDateToStr(recordTime, "2006-01-02") + recordEndTime := date.ParseDateToStr(recordTime, "15:04:05") + startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second) + recordStartTime := date.ParseDateToStr(startTime, "15:04:05") + + // kpi data数据json + kpiTitles := s.kpiReportService.FindTitle(s.neInfo.NeType) + KpiValues := make([]map[string]any, 0) + for _, kt := range kpiTitles { + item := map[string]any{ + "kpiId": kt.KpiId, + "value": 0, + "err": "", + } + // 匹配指标记录 + for k, v := range k.Data { + if k == kt.KpiId { + item["value"] = v + } + } + KpiValues = append(KpiValues, item) + } + + KpiValuesByte, _ := json.Marshal(KpiValues) + + // KPI 信息 + kpiData := neDataModel.KpiReport{ + NeType: s.neInfo.NeType, + NeName: s.neInfo.NeName, + RmUid: s.neInfo.RmUID, + Date: recordDate, + StartTime: recordStartTime, + EndTime: recordEndTime, + Index: index, + Granularity: k.Granularity, + KpiValues: string(KpiValuesByte), + CreatedAt: k.RecordTime, + } + insertId := s.kpiReportService.Insert(kpiData) + if insertId <= 0 { + return fmt.Errorf("add kpi data fail") + } + kpiData.ID = insertId + + // 指标事件对象 + data := map[string]any{ + "neType": kpiData.NeType, + "neName": kpiData.NeName, + "rmUID": kpiData.RmUid, + "startIndex": kpiData.Index, + "timeGroup": kpiData.CreatedAt, + // kip_id ... + } + for _, v := range KpiValues { + data[fmt.Sprint(v["kpiId"])] = v["value"] + } + + // 推送到ws订阅组 + s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, s.neInfo.NeType, s.neInfo.NeId), data) + // 更新UPF总流量 + if s.neInfo.NeType == "UPF" { + upValue := parse.Number(data["UPF.03"]) + downValue := parse.Number(data["UPF.06"]) + s.kpiReportService.UPFTodayFlowUpdate(s.neInfo.RmUID, upValue, downValue) + } + return nil +} + +// saveKPIDataC 存储自定义KPI数据并推送到ws订阅组 +func (s *KPI) saveKPIDataC(k oam.KPI, index int64) error { + // 时间数据处理 + recordTime := time.Now() + if k.RecordTime > 1e12 { + recordTime = time.UnixMilli(k.RecordTime) + } else if k.RecordTime > 1e9 { + recordTime = time.Unix(k.RecordTime, 0) + } + recordDate := date.ParseDateToStr(recordTime, "2006-01-02") + recordEndTime := date.ParseDateToStr(recordTime, "15:04:05") + startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second) + recordStartTime := date.ParseDateToStr(startTime, "15:04:05") + + // kpi data数据json + kpiCTitles := s.kpiCReportService.FindTitle(s.neInfo.NeType) + KpiValues := make([]map[string]any, 0) + // 自定义指标的表达式环境变量 + KpiExprEnv := make(map[string]any, 0) + for k, v := range k.Data { + KpiExprEnv[k] = v + } + // 自定义指标的计算 + for _, v := range kpiCTitles { + item := map[string]any{ + "kpiId": v.KpiId, + "value": 0, + "err": "", + } + + // 匹配指标记录 + if envValue, envOk := KpiExprEnv[v.KpiId]; envOk { + item["value"] = envValue + } + + // 计算结果 + exprStr, exprEnv := expr.ParseExprEnv(v.Expression, KpiExprEnv) + result, err := expr.Eval(exprStr, exprEnv) + if err != nil { + item["value"] = 0 + item["err"] = err.Error() + } else { + if v.Unit == "%" { + resultInt64 := parse.Number(result) + if resultInt64 > 100 { + result = 100 + } + if resultInt64 < 0 { + result = 0 + } + } + + item["value"] = result + } + KpiValues = append(KpiValues, item) + } + KpiValuesByte, _ := json.Marshal(KpiValues) + + // KPI 信息 + kpiCData := neDataModel.KpiCReport{ + NeType: s.neInfo.NeType, + NeName: s.neInfo.NeName, + RmUid: s.neInfo.RmUID, + Date: recordDate, + StartTime: recordStartTime, + EndTime: recordEndTime, + Index: index, + Granularity: k.Granularity, + KpiValues: string(KpiValuesByte), + CreatedAt: k.RecordTime, + } + insertId := s.kpiCReportService.Insert(kpiCData) + if insertId <= 0 { + return fmt.Errorf("add kpic data fail") + } + kpiCData.ID = insertId + + // 指标事件对象 + data := map[string]any{ + "neType": kpiCData.NeType, + "neName": kpiCData.NeName, + "rmUID": kpiCData.RmUid, + "startIndex": kpiCData.Index, + "timeGroup": kpiCData.CreatedAt, + // kip_id ... + } + for _, v := range KpiValues { + data[fmt.Sprint(v["kpiId"])] = v["value"] + } + + // 推送到ws订阅组 + s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, s.neInfo.NeType, s.neInfo.NeId), data) + return nil +} diff --git a/src/modules/oam/service/nb_state.go b/src/modules/oam/service/nb_state.go new file mode 100644 index 00000000..bc283a54 --- /dev/null +++ b/src/modules/oam/service/nb_state.go @@ -0,0 +1,67 @@ +package service + +import ( + "fmt" + "time" + + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/date" + "github.com/tsmask/go-oam" + + neDataModel "be.ems/src/modules/network_data/model" + neDataService "be.ems/src/modules/network_data/service" + neService "be.ems/src/modules/network_element/service" + wsService "be.ems/src/modules/ws/service" +) + +// 实例化服务层 NBState 结构体 +var NewNBState = &NBState{ + neInfoService: neService.NewNeInfo, + wsService: wsService.NewWSSend, + nbStateService: neDataService.NewNBState, +} + +// NBState 消息处理 +type NBState struct { + neInfoService *neService.NeInfo + wsService *wsService.WSSend + nbStateService *neDataService.NBState +} + +// Resolve 接收处理 +func (s *NBState) Resolve(n oam.NBState) error { + // 是否存在网元 + neInfo := s.neInfoService.FindByRmuid(n.NeUid) + if neInfo.NeType == "" || neInfo.RmUID != n.NeUid { + logger.Warnf("resolve nb_state network element does not exist %s", n.NeUid) + return fmt.Errorf("resolve nb_state network element does not exist %s", n.NeUid) + } + + nbState := neDataModel.NBState{ + NeType: neInfo.NeType, + NeId: neInfo.NeId, + RmUid: neInfo.RmUID, + Address: n.Address, + Name: n.Name, + Position: n.Position, + NbName: n.DeviceName, + State: n.State, + Time: date.ParseDateToStr(n.StateTime, time.RFC3339), + } + insertId := s.nbStateService.Insert(nbState) + if insertId <= 0 { + return fmt.Errorf("add nb_state data fail") + } + nbState.ID = insertId + + // 推送到ws订阅组 + switch neInfo.NeType { + case "AMF": + s.wsService.ByGroupID(wsService.GROUP_AMF_NB, nbState) + s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_AMF_NB, neInfo.NeId), nbState) + case "MME": + s.wsService.ByGroupID(wsService.GROUP_MME_NB, nbState) + s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_MME_NB, neInfo.NeId), nbState) + } + return nil +}