feat: 添加oam模块

This commit is contained in:
TsMask
2025-09-25 15:20:49 +08:00
parent 1f0e0cfce2
commit 960a31645b
39 changed files with 4810 additions and 124 deletions

View File

@@ -0,0 +1,721 @@
package controller
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/tsmask/go-oam"
goOamState "github.com/tsmask/go-oam/modules/state/service"
"be.ems/src/framework/logger"
"be.ems/src/framework/resp"
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/parse"
neFetchlink "be.ems/src/modules/network_element/fetch_link"
neModel "be.ems/src/modules/network_element/model"
neService "be.ems/src/modules/network_element/service"
oamService "be.ems/src/modules/oam/service"
)
// NewAPIRest 实例化控制层
var NewAPIRest = &APIRestController{}
// APIRestController 北向定义 控制层处理
//
// PATH /api/rest
type APIRestController struct{}
// ResolveAlarm 接收告警
//
// POST /faultManagement/v1/elementType/:elementTypeValue/objectType/alarms
func (s APIRestController) ResolveAlarm(c *gin.Context) {
var body []struct {
AlarmSeq int `json:"alarmSeq"`
AlarmId string `json:"alarmId"`
NeId string `json:"neId"` // 收到实际是rmUID
AlarmCode int `json:"alarmCode"`
AlarmTitle string `json:"alarmTitle"`
EventTime string `json:"eventTime"`
AlarmType string `json:"alarmType"`
OrigSeverity string `json:"origSeverity"`
PerceivedSeverity string `json:"perceivedSeverity"`
PVFlag string `json:"pvFlag"`
NeName string `json:"neName"`
NeType string `json:"neType"`
ObjectUid string `json:"objectUid"`
ObjectName string `json:"objectName"`
ObjectType string `json:"objectType"`
LocationInfo string `json:"locationInfo"`
Province string `json:"province"`
AlarmStatus int `json:"alarmStatus"`
SpecificProblem string `json:"specificProblem"`
SpecificProblemID string `json:"specificProblemID"`
AddInfo string `json:"addInfo"`
}
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err))
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs))
return
}
elementTypeValue := c.Param("elementTypeValue")
// alarmTypeValue 映射值
alarmTypeValue := func(str string) string {
arr := []string{
oam.ALARM_TYPE_COMMUNICATION_ALARM,
oam.ALARM_TYPE_EQUIPMENT_ALARM,
oam.ALARM_TYPE_PROCESSING_FAILURE,
oam.ALARM_TYPE_ENVIRONMENTAL_ALARM,
oam.ALARM_TYPE_QUALITY_OF_SERVICE_ALARM,
}
for k, v := range arr {
if v == str {
return v
}
if fmt.Sprint(k+1) == str {
return v
}
}
return str
}
// origSeverityValue 映射值
origSeverityValue := func(str string) string {
arr := []string{
oam.ALARM_SEVERITY_CRITICAL,
oam.ALARM_SEVERITY_MAJOR,
oam.ALARM_SEVERITY_MINOR,
oam.ALARM_SEVERITY_WARNING,
oam.ALARM_SEVERITY_EVENT,
}
for k, v := range arr {
if v == str {
return v
}
if fmt.Sprint(k+1) == str {
return v
}
}
return str
}
// alarmStatusValue 映射值
alarmStatusValue := func(value int) string {
arr := []string{
oam.ALARM_STATUS_CLEAR,
oam.ALARM_STATUS_ACTIVE,
}
for k, v := range arr {
if k == value {
return v
}
}
return oam.ALARM_STATUS_ACTIVE
}
alarmArr := make([]oam.Alarm, 0)
for _, v := range body {
if !strings.EqualFold(v.NeType, elementTypeValue) {
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType"))
return
}
// 产生时间
eventTime := date.ParseStrToDate(v.EventTime, time.RFC3339)
// 创建告警
alarm := oam.Alarm{
NeUid: v.NeId, // 网元唯一标识
AlarmTime: eventTime.UnixMilli(), // 事件产生时间
AlarmId: v.AlarmId, // 告警ID 唯一,清除时对应
AlarmCode: v.AlarmCode, // 告警状态码
AlarmType: alarmTypeValue(v.AlarmType), // 告警类型
AlarmTitle: v.AlarmTitle, // 告警标题
PerceivedSeverity: origSeverityValue(v.OrigSeverity), // 告警级别
AlarmStatus: alarmStatusValue(v.AlarmStatus), // 告警状态
SpecificProblem: v.SpecificProblem, // 告警问题原因
SpecificProblemID: v.SpecificProblemID, // 告警问题原因ID
AddInfo: v.AddInfo, // 告警辅助信息
LocationInfo: v.LocationInfo, // 告警定位信息
}
alarmArr = append(alarmArr, alarm)
}
errArr := make([]string, 0)
for _, alarm := range alarmArr {
if err := oamService.NewAlarm.Resolve(alarm); err != nil {
errArr = append(errArr, err.Error())
}
}
if len(errArr) > 0 {
c.JSON(200, resp.ErrData(errArr))
return
}
c.JSON(200, resp.Ok(nil))
}
// ResolveCDR 接收话单
//
// POST /cdrManagement/v1/elementType/:elementTypeValue/objectType/cdrEvent
func (s APIRestController) ResolveCDR(c *gin.Context) {
var body struct {
NeType string `json:"neType" `
NeName string `json:"neName" `
RmUID string `json:"rmUID" `
Timestamp int `json:"timestamp" `
CDR map[string]any `json:"CDR" `
}
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err))
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs))
return
}
elementTypeValue := c.Param("elementTypeValue")
if !strings.EqualFold(body.NeType, elementTypeValue) {
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType"))
return
}
recordTime := time.Now()
if body.Timestamp > 1e12 {
recordTime = time.UnixMilli(int64(body.Timestamp))
} else if body.Timestamp > 1e9 {
recordTime = time.Unix(int64(body.Timestamp), 0)
}
// 创建CDR
cdr := oam.CDR{
NeUid: body.RmUID, // 网元唯一标识
RecordTime: recordTime.UnixMilli(), // 记录时间 时间戳毫秒Push时自动填充
Data: body.CDR, // 话单信息
}
if err := oamService.NewCDR.Resolve(cdr); err != nil {
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
c.JSON(200, resp.Ok(nil))
}
// ResolveKPI 接收KPI
//
// POST /performanceManagement/v1/elementType/:elementTypeValue/objectType/kpiReport/:index
func (s APIRestController) ResolveKPI(c *gin.Context) {
var body struct {
Timestamp string `json:"TimeStamp" binding:"required"`
Task struct {
Period struct {
StartTime string `json:"StartTime"`
EndTime string `json:"EndTime"`
} `json:"Period" binding:"required"`
NE struct {
NEName string `json:"NEName"`
RmUID string `json:"rmUID"`
NeType string `json:"NeType"`
KPIs []struct {
KPIID string `json:"KPIID"`
Value int64 `json:"Value"`
Err string `json:"Err"`
} `json:"KPIs" binding:"required"`
} `json:"NE" binding:"required"`
} `json:"Task" binding:"required"`
}
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err))
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs))
return
}
elementTypeValue := c.Param("elementTypeValue")
if !strings.EqualFold(body.Task.NE.NeType, elementTypeValue) {
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType"))
return
}
// index := c.Param("index")
timestamp := body.Timestamp
taskPeriod := body.Task.Period
taskNeKPIs := body.Task.NE.KPIs
// 时间数据处理
receiverTime := date.ParseStrToDate(timestamp, date.YYYY_MM_DDTHH_MM_SSZ)
startTime := date.ParseStrToDate(taskPeriod.StartTime, date.YYYY_MM_DDTHH_MM_SSZ)
endTime := date.ParseStrToDate(taskPeriod.EndTime, date.YYYY_MM_DDTHH_MM_SSZ)
granularity := parse.Number(endTime.Sub(startTime).Seconds())
// kpi data数据
KpiValues := make(map[string]float64, 0)
for _, v := range taskNeKPIs {
KpiValues[v.KPIID] = float64(v.Value)
}
// 创建KPI
kpi := oam.KPI{
NeUid: body.Task.NE.RmUID, // 网元唯一标识
RecordTime: receiverTime.UnixMilli(), // 记录时间 时间戳毫秒Push时自动填充
Granularity: granularity, // 时间间隔 5/10/.../60/300 (秒)
Data: KpiValues, // 指标信息
}
if err := oamService.NewKPI.Resolve(kpi); err != nil {
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
c.JSON(200, resp.Ok(nil))
}
// ResolveNBState 接收基站状态变更
//
// POST /ueManagement/v1/elementType/:elementTypeValue/objectType/nbState
func (s APIRestController) ResolveNBState(c *gin.Context) {
var body struct {
NeType string `json:"neType" `
NeName string `json:"neName" `
RmUID string `json:"rmUID"`
StateList []struct {
Address string `json:"address" `
Name string `json:"name" `
Position string `json:"position" `
NbName string `json:"nbName" `
State string `json:"state" ` // "OFF" or "ON"
OffTime string `json:"offTime" ` //if State=OFF, will set it
OnTime string `json:"onTime" ` //if State=ON , will set it
}
}
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err))
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs))
return
}
elementTypeValue := c.Param("elementTypeValue")
if !strings.EqualFold(body.NeType, elementTypeValue) {
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType"))
return
}
if len(body.StateList) == 0 {
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "no stateList"))
return
}
nbStateArr := make([]oam.NBState, 0)
for _, v := range body.StateList {
if v.Address == "" || v.State == "" {
continue
}
stateTime := date.ParseStrToDate(v.OffTime, time.RFC3339)
stateStr := oam.NB_STATE_OFF
if v.State == "ON" {
stateTime = date.ParseStrToDate(v.OnTime, time.RFC3339)
stateStr = oam.NB_STATE_ON
}
// 创建NbState
nbState := oam.NBState{
NeUid: body.RmUID, // 网元唯一标识
RecordTime: time.Now().UnixMilli(), // 记录时间 时间戳毫秒Push时自动填充
Address: v.Address, // 基站地址
DeviceName: v.NbName, // 基站设备名称
State: stateStr, // 基站状态 ON/OFF
StateTime: stateTime.UnixMilli(), // 基站状态时间 时间戳毫秒
Name: v.Name, // 基站名称 网元标记
Position: v.Position, // 基站位置 网元标记
}
nbStateArr = append(nbStateArr, nbState)
}
errArr := make([]string, 0)
for _, nbState := range nbStateArr {
if err := oamService.NewNBState.Resolve(nbState); err != nil {
errArr = append(errArr, err.Error())
}
}
if len(errArr) > 0 {
c.JSON(200, resp.ErrData(errArr))
return
}
c.JSON(200, resp.Ok(nil))
}
// ResolveUENB 接收终端接入基站
//
// POST /logManagement/v1/elementType/:elementTypeValue/objectType/ueEvent
func (s APIRestController) ResolveUENB(c *gin.Context) {
var body struct {
NeType string `json:"neType" `
NeName string `json:"neName" `
RmUID string `json:"rmUID" `
Timestamp int64 `json:"timestamp" `
EventType string `json:"eventType" `
EventJson map[string]any `json:"eventJSON" `
}
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err))
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs))
return
}
elementTypeValue := c.Param("elementTypeValue")
if !strings.EqualFold(body.NeType, elementTypeValue) {
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_CHEACK, "elementType is inconsistent with neType"))
return
}
// 记录时间
recordTime := time.Now()
if body.Timestamp > 1e12 {
recordTime = time.UnixMilli(int64(body.Timestamp))
} else if body.Timestamp > 1e9 {
recordTime = time.Unix(int64(body.Timestamp), 0)
}
// 创建UENB
uenb := oam.UENB{
NeUid: body.RmUID, // 网元唯一标识
RecordTime: recordTime.UnixMilli(), // 记录时间
NBId: "0", // 基站ID
CellId: "0", // 小区ID
TAC: "", // TAC
IMSI: "", // IMSI
Result: oam.UENB_RESULT_AUTH_SUCCESS, // 结果值
Type: oam.UENB_TYPE_DETACH, // 终端接入基站类型
}
// 基站ID
if v, ok := body.EventJson["eNBID"]; ok && v != nil {
uenb.NBId = fmt.Sprint(v)
}
if v, ok := body.EventJson["gNBID"]; ok && v != nil {
uenb.NBId = fmt.Sprint(v)
}
// 小区ID
if v, ok := body.EventJson["cellID"]; ok && v != nil {
uenb.CellId = fmt.Sprint(v)
}
// TAC
if v, ok := body.EventJson["tacID"]; ok && v != nil {
uenb.TAC = fmt.Sprint(v)
}
// IMSI
if v, ok := body.EventJson["imsi"]; ok && v != nil {
uenb.IMSI = fmt.Sprint(v)
}
// 结果值
if v, ok := body.EventJson["result"]; ok && v != nil {
uenb.Result = fmt.Sprint(v)
}
// 终端接入基站类型
if v, ok := body.EventJson["type"]; ok && v != nil {
switch v := fmt.Sprint(v); v {
case "detach":
uenb.Type = oam.UENB_TYPE_DETACH
case "auth-result":
uenb.Type = oam.UENB_TYPE_AUTH
case "cm-state":
uenb.Type = oam.UENB_TYPE_CM
}
}
if err := oamService.NewUENB.Resolve(uenb); err != nil {
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
c.JSON(200, resp.Ok(nil))
}
// ResolveUENBByAMF 接收终端接入基站-AMF
//
// POST /upload-ue/v1/:eventType
func (s APIRestController) ResolveUENBByAMF(c *gin.Context) {
var body map[string]any
if err := c.ShouldBindBodyWithJSON(&body); err != nil {
errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err))
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs))
return
}
// 创建UENB
uenb := oam.UENB{
NeUid: "4400HXAMF001", // 网元唯一标识
RecordTime: 0, // 记录时间
NBId: "0", // 基站ID
CellId: "0", // 小区ID
TAC: "", // TAC
IMSI: "", // IMSI
Result: oam.UENB_RESULT_AUTH_SUCCESS, // 结果值
Type: oam.UENB_TYPE_DETACH, // 终端接入基站类型
}
// 从eventJson中获取rmUID
if v, ok := body["rmUID"]; ok {
uenb.NeUid = fmt.Sprint(v)
}
// 统一格式
eventType := c.Param("eventType")
switch eventType {
case "auth-result":
// {"authCode":"200","authMessage":"成功","authTime":"2024-12-07 16:48:37","cellID":"3","gNBID":"1","imsi":"460002082100000","onlineNumber":1,"tacID":"81"}
if v, ok := body["imsi"]; ok {
uenb.IMSI = fmt.Sprint(v)
}
if v, ok := body["cellID"]; ok {
uenb.CellId = fmt.Sprint(v)
}
if v, ok := body["gNBID"]; ok {
uenb.NBId = fmt.Sprint(v)
}
if v, ok := body["tacID"]; ok {
uenb.TAC = fmt.Sprint(v)
}
if v, ok := body["authCode"]; ok {
uenb.Result = fmt.Sprint(v)
}
if v, ok := body["authTime"]; ok {
authTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS)
uenb.RecordTime = authTime.UnixMilli()
}
uenb.Type = oam.UENB_TYPE_AUTH
case "detach":
// {"detachResult":0,"detachTime":"2024-12-07 18:00:47","imsi":"460002082100000"}
if v, ok := body["imsi"]; ok {
uenb.IMSI = fmt.Sprint(v)
}
if v, ok := body["detachResult"]; ok {
if v == "0" {
uenb.Result = oam.UENB_RESULT_AUTH_SUCCESS
} else {
uenb.Result = fmt.Sprint(v)
}
}
if v, ok := body["detachTime"]; ok {
detachTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS)
uenb.RecordTime = detachTime.UnixMilli()
}
uenb.Type = oam.UENB_TYPE_DETACH
case "cm-state":
// {"changeTime":"2024-12-07 17:07:52","imsi":"460002082100000","onlineNumber":1,"status":2}
if v, ok := body["imsi"]; ok {
uenb.IMSI = fmt.Sprint(v)
}
if v, ok := body["status"]; ok {
uenb.Result = fmt.Sprint(v)
}
if v, ok := body["changeTime"]; ok {
changeTime := date.ParseStrToDate(fmt.Sprint(v), date.YYYY_MM_DD_HH_MM_SS)
uenb.RecordTime = changeTime.UnixMilli()
}
uenb.Type = oam.UENB_TYPE_CM
}
if err := oamService.NewUENB.Resolve(uenb); err != nil {
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
c.JSON(200, resp.Ok(nil))
}
// ResolveAlarmHistory 拉取告警历史
//
// GET /faultManagement/v1/elementType/:elementTypeValue/objectType/alarms
func (s APIRestController) ResolveAlarmHistory(c *gin.Context) {
elementTypeValue := c.Param("elementTypeValue")
// Get alarms from OMC return 204
if strings.ToLower(elementTypeValue) == "omc" {
c.JSON(200, resp.OkMsg("omc alarms no content"))
return
}
// alarmTypeValue 映射值
alarmTypeValue := func(str string) string {
arr := []string{
oam.ALARM_TYPE_COMMUNICATION_ALARM,
oam.ALARM_TYPE_EQUIPMENT_ALARM,
oam.ALARM_TYPE_PROCESSING_FAILURE,
oam.ALARM_TYPE_ENVIRONMENTAL_ALARM,
oam.ALARM_TYPE_QUALITY_OF_SERVICE_ALARM,
}
for k, v := range arr {
if v == str {
return v
}
if fmt.Sprint(k+1) == str {
return v
}
}
return str
}
// origSeverityValue 映射值
origSeverityValue := func(str string) string {
arr := []string{
oam.ALARM_SEVERITY_CRITICAL,
oam.ALARM_SEVERITY_MAJOR,
oam.ALARM_SEVERITY_MINOR,
oam.ALARM_SEVERITY_WARNING,
oam.ALARM_SEVERITY_EVENT,
}
for k, v := range arr {
if v == str {
return v
}
if fmt.Sprint(k+1) == str {
return v
}
}
return str
}
// alarmStatusValue 映射值
alarmStatusValue := func(value int) string {
arr := []string{
oam.ALARM_STATUS_CLEAR,
oam.ALARM_STATUS_ACTIVE,
}
for k, v := range arr {
if k == value {
return v
}
}
return oam.ALARM_STATUS_ACTIVE
}
alarmArr := make([]oam.Alarm, 0)
type body struct {
AlarmSeq int `json:"alarmSeq"`
AlarmId string `json:"alarmId"`
NeId string `json:"neId"` // 收到实际是rmUID
AlarmCode int `json:"alarmCode"`
AlarmTitle string `json:"alarmTitle"`
EventTime string `json:"eventTime"`
AlarmType string `json:"alarmType"`
OrigSeverity string `json:"origSeverity"`
PerceivedSeverity string `json:"perceivedSeverity"`
PVFlag string `json:"pvFlag"`
NeName string `json:"neName"`
NeType string `json:"neType"`
ObjectUid string `json:"objectUid"`
ObjectName string `json:"objectName"`
ObjectType string `json:"objectType"`
LocationInfo string `json:"locationInfo"`
Province string `json:"province"`
AlarmStatus int `json:"alarmStatus"`
SpecificProblem string `json:"specificProblem"`
SpecificProblemID string `json:"specificProblemID"`
AddInfo string `json:"addInfo"`
}
parseItem := func(v body) oam.Alarm {
// 产生时间
eventTime := date.ParseStrToDate(v.EventTime, time.RFC3339)
// 创建告警
alarm := oam.Alarm{
NeUid: v.NeId, // 网元唯一标识
AlarmTime: eventTime.UnixMilli(), // 事件产生时间
AlarmId: v.AlarmId, // 告警ID 唯一,清除时对应
AlarmCode: v.AlarmCode, // 告警状态码
AlarmType: alarmTypeValue(v.AlarmType), // 告警类型
AlarmTitle: v.AlarmTitle, // 告警标题
PerceivedSeverity: origSeverityValue(v.OrigSeverity), // 告警级别
AlarmStatus: alarmStatusValue(v.AlarmStatus), // 告警状态
SpecificProblem: v.SpecificProblem, // 告警问题原因
SpecificProblemID: v.SpecificProblemID, // 告警问题原因ID
AddInfo: v.AddInfo, // 告警辅助信息
LocationInfo: v.LocationInfo, // 告警定位信息
}
return alarm
}
var neInfos []neModel.NeInfo
if elementTypeValue == "all" {
neInfos = neService.NewNeInfo.Find(neModel.NeInfo{}, false, false)
} else {
neInfos = neService.NewNeInfo.FindByNeType(strings.ToUpper(elementTypeValue))
}
for _, neInfo := range neInfos {
data, err := neFetchlink.AlarmHistory(neInfo)
if err != nil {
logger.Errorf("failed to fetch alarm history:%s", err.Error())
continue
}
if len(data) == 0 {
logger.Warnf("not found sync alarms %s", neInfo.RmUID)
continue
}
bodyArr := make([]body, 0)
// 将 []map[string]any 序列化为 JSON 字符串
jsonData, err := json.Marshal(data)
if err != nil {
logger.Errorf("marshal error: %s", err.Error())
continue
}
// 反序列化到结构体
err = json.Unmarshal(jsonData, &bodyArr)
if err != nil {
logger.Errorf("Error unmarshal error: %s", err.Error())
continue
}
for _, v := range bodyArr {
alarmArr = append(alarmArr, parseItem(v))
}
}
errArr := make([]string, 0)
for _, alarm := range alarmArr {
if err := oamService.NewAlarm.Resolve(alarm); err != nil {
errArr = append(errArr, err.Error())
}
}
if len(errArr) > 0 {
c.JSON(200, resp.OkData(errArr))
return
}
c.JSON(200, resp.Ok(nil))
}
// QuerySystemState 查询系统状态
//
// GET /systemManagement/v1/elementType/:elementTypeValue/objectType/systemState
func (s APIRestController) QuerySystemState(c *gin.Context) {
elementTypeValue := c.Param("elementTypeValue")
if strings.ToLower(elementTypeValue) != "omc" {
c.JSON(200, resp.ErrMsg("elementType only omc"))
return
}
info := goOamState.NewState.Info()
info.SerialNum = "-"
info.ExpiryDate = "-"
info.Capability = 50
info.Version = "config.Version"
c.JSON(200, info)
}
// NeConfigOMC 网元配置对端网管信息
//
// PUT /systemManagement/v1/elementType/:elementTypeValue/objectType/config/omcNeConfig
func (s APIRestController) NeConfigOMC(c *gin.Context) {
c.JSON(204, nil)
}
// @Description CBSManagement CB消息
type CBSState struct {
NeName string `json:"neName"` // 网元名称
RmUID string `json:"rmUID"` // 网元唯一标识
EventData []oamService.CBSEventData `json:"eventData"` // 事件数据
}
func (s APIRestController) ResolveCBSState(c *gin.Context) {
var state CBSState
if err := c.ShouldBindBodyWithJSON(&state); err != nil {
errMsgs := fmt.Sprintf("bind err: %s", resp.FormatBindError(err))
c.JSON(422, resp.CodeMsg(resp.CODE_PARAM_PARSER, errMsgs))
return
}
for _, eventData := range state.EventData {
if err := oamService.NewCBS.Resolve(eventData); err != nil {
c.JSON(200, resp.ErrMsg(err.Error()))
return
}
}
c.JSON(200, resp.Ok(nil))
}

42
src/modules/oam/oam.go Normal file
View File

@@ -0,0 +1,42 @@
package oam
import (
"github.com/gin-gonic/gin"
"github.com/tsmask/go-oam"
"be.ems/src/framework/logger"
"be.ems/src/modules/oam/service"
)
// Setup 模块路由注册
func Setup(router *gin.Engine) {
logger.Infof("开始加载 ====> oam 模块路由")
// 网管接收端收告警
oam.AlarmReceiveRoute(router, service.NewAlarm.Resolve)
// 网管接收端收终端接入基站
oam.UENBReceiveRoute(router, service.NewUENB.Resolve)
// 网管接收端收基站状态
oam.NBStateReceiveRoute(router, service.NewNBState.Resolve)
// 网管接收端收话单
oam.CDRReceiveRoute(router, service.NewCDR.Resolve)
// 网管接收端收KPI
oam.KPIReceiveRoute(router, service.NewKPI.Resolve)
// APIRest 北向定义
// aprRest := controller.NewAPIRest
// aprRestGroup := router.Group("/api/rest")
// {
// aprRestGroup.GET("/faultManagement/v1/elementType/:elementTypeValue/objectType/alarms", aprRest.ResolveAlarmHistory)
// aprRestGroup.POST("/faultManagement/v1/elementType/:elementTypeValue/objectType/alarms", aprRest.ResolveAlarm)
// aprRestGroup.POST("/cdrManagement/v1/elementType/:elementTypeValue/objectType/cdrEvent", aprRest.ResolveCDR)
// aprRestGroup.POST("/performanceManagement/v1/elementType/:elementTypeValue/objectType/kpiReport/:index", aprRest.ResolveKPI)
// aprRestGroup.POST("/ueManagement/v1/elementType/:elementTypeValue/objectType/nbState", aprRest.ResolveNBState)
// aprRestGroup.POST("/ueManagement/v1/elementType/:elementTypeValue/objectType/cbsState", aprRest.ResolveCBSState)
// aprRestGroup.POST("/logManagement/v1/elementType/:elementTypeValue/objectType/ueEvent", aprRest.ResolveUENB)
// router.POST("/upload-ue/v1/:eventType", aprRest.ResolveUENBByAMF) // AMF特殊上报
// aprRestGroup.GET("/systemManagement/v1/elementType/:elementTypeValue/objectType/systemState", aprRest.QuerySystemState)
// aprRestGroup.PUT("/systemManagement/v1/elementType/:elementTypeValue/objectType/config/omcNeConfig", aprRest.NeConfigOMC)
// }
}

View File

@@ -0,0 +1,301 @@
package service
import (
"fmt"
"time"
"be.ems/src/framework/config"
"be.ems/src/framework/constants"
"be.ems/src/framework/utils/parse"
"github.com/tsmask/go-oam"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
notificationService "be.ems/src/modules/notification/service"
traceService "be.ems/src/modules/trace/service"
wsService "be.ems/src/modules/ws/service"
)
// 实例化服务层 Alarm 结构体
var NewAlarm = &Alarm{
neInfoService: neService.NewNeInfo,
wsService: wsService.NewWSSend,
alarmService: neDataService.NewAlarm,
alarmEventService: neDataService.NewAlarmEvent,
alarmLogService: neDataService.NewAlarmLog,
alarmForwardLogService: neDataService.NewAlarmForwardLog,
}
// Alarm 消息处理
type Alarm struct {
neInfoService *neService.NeInfo
wsService *wsService.WSSend
alarmService *neDataService.Alarm
alarmEventService *neDataService.AlarmEvent
alarmLogService *neDataService.AlarmLog
alarmForwardLogService *neDataService.AlarmForwardLog
}
// Resolve 接收处理
func (s *Alarm) Resolve(a oam.Alarm) error {
// 是否存在网元
neInfo := s.neInfoService.FindByRmuid(a.NeUid)
if neInfo.NeType == "" || neInfo.RmUID != a.NeUid {
return fmt.Errorf("resolve alarm network element does not exist %s", a.NeUid)
}
// seq 告警序号
lastSeq := neDataService.NewAlarm.FindAlarmSeqLast(neInfo.NeType, neInfo.NeId)
alarmTime := time.UnixMilli(a.AlarmTime)
// 告警信息
alarm := neDataModel.Alarm{
NeType: neInfo.NeType,
NeId: neInfo.NeId,
NeName: neInfo.NeName,
Province: neInfo.Province,
PvFlag: neInfo.PvFlag,
AlarmSeq: fmt.Sprintf("%d", lastSeq+1),
AlarmId: a.AlarmId,
AlarmTitle: a.AlarmTitle,
AlarmCode: fmt.Sprintf("%d", a.AlarmCode),
EventTime: alarmTime,
AlarmType: a.AlarmType,
OrigSeverity: a.PerceivedSeverity,
PerceivedSeverity: a.PerceivedSeverity,
ObjectUid: neInfo.RmUID,
ObjectName: neInfo.NeName,
ObjectType: neInfo.NeType,
LocationInfo: a.LocationInfo,
AlarmStatus: a.AlarmStatus,
SpecificProblem: a.SpecificProblem,
SpecificProblemId: a.SpecificProblemID,
AddInfo: a.AddInfo,
}
// 进行清除
if a.AlarmStatus == oam.ALARM_STATUS_CLEAR {
if a.PerceivedSeverity == oam.ALARM_SEVERITY_EVENT {
if err := s.clearEvent(alarm); err != nil {
return err
}
} else {
if err := s.clear(alarm); err != nil {
return err
}
}
}
// 进行新增
if a.AlarmStatus == oam.ALARM_STATUS_ACTIVE {
if a.PerceivedSeverity == oam.ALARM_SEVERITY_EVENT {
if err := s.addEvent(alarm); err != nil {
return err
}
} else {
if err := s.add(alarm); err != nil {
return err
}
}
}
// 记录日志
if err := s.saveLog(alarm); err != nil {
return err
}
// 推送
s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_ALARM, neInfo.NeType, neInfo.NeId), alarm)
// 通知
go s.notify(neInfo.IP, alarm)
return nil
}
// saveLog 记录日志
func (s *Alarm) saveLog(alarm neDataModel.Alarm) error {
alarmLog := neDataModel.AlarmLog{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmSeq: alarm.AlarmSeq,
AlarmId: alarm.AlarmId,
AlarmTitle: alarm.AlarmTitle,
AlarmCode: alarm.AlarmCode,
AlarmStatus: alarm.AlarmStatus,
AlarmType: alarm.AlarmType,
OrigSeverity: alarm.PerceivedSeverity,
EventTime: alarm.EventTime,
}
insertId := s.alarmLogService.Insert(alarmLog)
if insertId <= 0 {
return fmt.Errorf("save alarm log fail")
}
return nil
}
// add 新增告警
func (s *Alarm) add(alarm neDataModel.Alarm) error {
// 检查网元告警ID是否唯一
alarmIdArr := s.alarmService.Find(neDataModel.Alarm{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmId: alarm.AlarmId,
})
if len(alarmIdArr) > 0 {
return fmt.Errorf("already exists alarmId:%s", alarm.AlarmId)
}
insertId := s.alarmService.Insert(alarm)
if insertId != "" {
alarm.ID = insertId
return nil
}
return fmt.Errorf("add alarm fail")
}
// clear 清除告警
func (s *Alarm) clear(alarm neDataModel.Alarm) error {
// 检查网元告警ID是否唯一
alarmIdArr := s.alarmService.Find(neDataModel.Alarm{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmId: alarm.AlarmId,
})
if len(alarmIdArr) != 1 {
return fmt.Errorf("not exists alarmId:%s", alarm.AlarmId)
}
// 告警清除
rows, _ := s.alarmService.ClearByIds([]string{alarmIdArr[0].ID}, alarm.ObjectUid, constants.ALARM_CLEAR_TYPE_AUTO_CLEAR)
if rows > 0 {
return nil
}
return fmt.Errorf("clear fail alarmId:%s", alarm.AlarmId)
}
// addEvent 新增告警事件
func (s *Alarm) addEvent(alarm neDataModel.Alarm) error {
// 检查网元告警ID是否唯一
alarmIdArr := s.alarmEventService.Find(neDataModel.AlarmEvent{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmId: alarm.AlarmId,
})
if len(alarmIdArr) > 0 {
return fmt.Errorf("event already exists alarmId:%s", alarm.AlarmId)
}
// seq 告警序号
lastSeq := s.alarmEventService.FindAlarmEventSeqLast(alarm.NeType, alarm.NeId)
alarmEvent := neDataModel.AlarmEvent{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmSeq: fmt.Sprintf("%d", lastSeq+1),
AlarmId: alarm.AlarmId,
AlarmTitle: alarm.AlarmTitle,
AlarmCode: alarm.AlarmCode,
EventTime: alarm.EventTime,
ObjectUid: alarm.ObjectUid,
ObjectName: alarm.ObjectName,
ObjectType: alarm.ObjectType,
LocationInfo: alarm.LocationInfo,
AlarmStatus: alarm.AlarmStatus,
SpecificProblem: alarm.SpecificProblem,
SpecificProblemId: alarm.SpecificProblemId,
AddInfo: alarm.AddInfo,
}
insertId := s.alarmEventService.Insert(alarmEvent)
if insertId > 0 {
alarmEvent.ID = insertId
// 网元重启后,清除活动告警
if alarm.AlarmCode == fmt.Sprintf("%d", constants.ALARM_EVENT_REBOOT) {
rows := s.alarmService.Find(neDataModel.Alarm{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmStatus: oam.ALARM_STATUS_ACTIVE,
})
ids := make([]string, 0)
for _, v := range rows {
ids = append(ids, v.ID)
}
s.alarmService.ClearByIds(ids, alarm.ObjectUid, constants.ALARM_CLEAR_TYPE_AUTO_CLEAR)
}
// 网元重启后,有跟踪任务的需要重新补发启动任务
if alarm.AlarmCode == fmt.Sprintf("%d", constants.ALARM_EVENT_REBOOT) {
traceService.NewTraceTask.RunUnstopped(alarm.NeType, alarm.NeId)
}
return nil
}
return fmt.Errorf("event add fail")
}
// clearEvent 清除告警事件
func (s *Alarm) clearEvent(alarm neDataModel.Alarm) error {
alarmEventService := neDataService.NewAlarmEvent
// 检查网元告警ID是否唯一
alarmIdArr := alarmEventService.Find(neDataModel.AlarmEvent{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmId: alarm.AlarmId,
})
if len(alarmIdArr) != 1 {
return fmt.Errorf("event not exists alarmId:%s", alarm.AlarmId)
}
// 告警清除
rows, _ := s.alarmEventService.ClearByIds([]int64{alarmIdArr[0].ID}, alarm.ObjectUid, constants.ALARM_CLEAR_TYPE_AUTO_CLEAR)
if rows > 0 {
return nil
}
return fmt.Errorf("event clear fail alarmId:%s", alarm.AlarmId)
}
// notify 通知
func (s *Alarm) notify(neIp string, alarm neDataModel.Alarm) {
// 邮箱
emailEnable := parse.Boolean(config.Get("notification.email.enable"))
if emailEnable {
emailList := fmt.Sprint(config.Get("notification.email.emailList"))
emailResult := "Sent Successfully!"
emailErr := notificationService.EmailAlarm(alarm, neIp)
if emailErr != nil {
emailResult = emailErr.Error()
}
s.notifyLog(alarm, "EMAIL", emailList, emailResult)
}
// 短信
smscEnable := parse.Boolean(config.Get("notification.smsc.enable"))
if smscEnable {
mobileList := fmt.Sprint(config.Get("notification.smsc.mobileList"))
smscResult := "Sent Successfully!"
smscErr := notificationService.SMSCAlarm(alarm, neIp)
if smscErr != nil {
smscResult = smscErr.Error()
}
s.notifyLog(alarm, "SMSC", mobileList, smscResult)
}
}
// notifyLog 通知日志
func (s *Alarm) notifyLog(alarm neDataModel.Alarm, forwardBy, toUser, result string) error {
alarmForwardLog := neDataModel.AlarmForwardLog{
NeType: alarm.NeType,
NeId: alarm.NeId,
AlarmSeq: alarm.AlarmSeq,
AlarmId: alarm.AlarmId,
AlarmTitle: alarm.AlarmTitle,
AlarmCode: alarm.AlarmCode,
AlarmStatus: alarm.AlarmStatus,
AlarmType: alarm.AlarmType,
OrigSeverity: alarm.OrigSeverity,
EventTime: alarm.EventTime,
Type: forwardBy,
Target: toUser,
Result: result,
}
// 记录日志
insertId := s.alarmForwardLogService.Insert(alarmForwardLog)
if insertId <= 0 {
return fmt.Errorf("notify alarm log fail")
}
return nil
}

View File

@@ -0,0 +1,29 @@
package service
import (
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
)
// 实例化服务层 CDR 结构体
var NewCBS = &CBS{
neInfoService: neService.NewNeInfo,
cbcMessageService: neDataService.NewCBCMessage,
}
// CDR 消息处理
type CBS struct {
neInfoService *neService.NeInfo
cbcMessageService *neDataService.CBCMessage // CDR会话事件服务
}
type CBSEventData struct {
EventName string `json:"eventName"` // 事件名称
MessageId int64 `json:"messageId"` // 消息ID
Detail string `json:"detail"` // 详情
}
// Resolve 接收处理
func (s *CBS) Resolve(c CBSEventData) error {
return s.cbcMessageService.UpdateDetail(c.EventName, c.Detail)
}

View File

@@ -0,0 +1,71 @@
package service
import (
"encoding/json"
"fmt"
"github.com/tsmask/go-oam"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
)
// 实例化服务层 CDR 结构体
var NewCDR = &CDR{
neInfoService: neService.NewNeInfo,
wsService: wsService.NewWSSend,
cdrEventService: neDataService.NewCDREvent,
}
// CDR 消息处理
type CDR struct {
neInfoService *neService.NeInfo
wsService *wsService.WSSend
cdrEventService *neDataService.CDREvent // CDR会话事件服务
}
// Resolve 接收处理
func (s *CDR) Resolve(c oam.CDR) error {
if c.Data == nil {
return fmt.Errorf("cdr data is nil")
}
// 是否存在网元
neInfo := s.neInfoService.FindByRmuid(c.NeUid)
if neInfo.NeType == "" || neInfo.RmUID != c.NeUid {
return fmt.Errorf("resolve cdr network element does not exist %s", c.NeUid)
}
cdrByte, _ := json.Marshal(c.Data)
cdrEvent := neDataModel.CDREvent{
NeType: neInfo.NeType,
NeName: neInfo.NeName,
RmUid: neInfo.RmUID,
Timestamp: c.RecordTime,
CdrJson: string(cdrByte),
CreatedAt: c.RecordTime,
}
insertId := s.cdrEventService.Insert(cdrEvent)
if insertId <= 0 {
return fmt.Errorf("add cdr data fail")
}
cdrEvent.ID = insertId
// 推送到ws订阅组
switch neInfo.NeType {
case "IMS":
dataMap := c.Data.(map[string]any)
v, ok := dataMap["recordType"]
if ok && (v == "MOC" || v == "MTSM") {
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_IMS_CDR, neInfo.NeId), cdrEvent)
}
case "SMF":
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_SMF_CDR, neInfo.NeId), cdrEvent)
case "SMSC":
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_SMSC_CDR, neInfo.NeId), cdrEvent)
case "SGWC":
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_SGWC_CDR, neInfo.NeId), cdrEvent)
}
return nil
}

View File

@@ -0,0 +1,241 @@
package service
import (
"encoding/json"
"fmt"
"math"
"time"
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/expr"
"be.ems/src/framework/utils/parse"
"github.com/tsmask/go-oam"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neModel "be.ems/src/modules/network_element/model"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
)
// 实例化服务层 KPI 结构体
var NewKPI = &KPI{
neInfoService: neService.NewNeInfo,
wsService: wsService.NewWSSend,
kpiReportService: neDataService.NewKpiReport,
kpiCReportService: neDataService.NewKpiCReport,
}
// KPI 消息处理
type KPI struct {
neInfoService *neService.NeInfo
wsService *wsService.WSSend
kpiReportService *neDataService.KpiReport
kpiCReportService *neDataService.KpiCReport
}
// Resolve 接收处理
func (s *KPI) Resolve(k oam.KPI) error {
if len(k.Data) == 0 {
return fmt.Errorf("kpi data is nil")
}
// 是否存在网元
neInfo := s.neInfoService.FindByRmuid(k.NeUid)
if neInfo.NeType == "" || neInfo.RmUID != k.NeUid {
return fmt.Errorf("resolve kpi network element does not exist %s", k.NeUid)
}
// 时间片
curTime := time.Now()
curSeconds := curTime.Hour()*3600 + curTime.Minute()*60 + curTime.Second()
index := int64(curSeconds) / k.Granularity
if err := s.saveKPIData(neInfo, k, index); err != nil {
return err
}
if err := s.saveKPIDataC(neInfo, k, index); err != nil {
return err
}
return nil
}
// saveKPIData 存储KPI数据并推送到ws订阅组
func (s KPI) saveKPIData(neInfo neModel.NeInfo, k oam.KPI, index int64) error {
// 时间数据处理
recordTime := time.Now()
if k.RecordTime > 1e12 {
recordTime = time.UnixMilli(k.RecordTime)
} else if k.RecordTime > 1e9 {
recordTime = time.Unix(k.RecordTime, 0)
}
recordDate := date.ParseDateToStr(recordTime, "2006-01-02")
recordEndTime := date.ParseDateToStr(recordTime, "15:04:05")
startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second)
recordStartTime := date.ParseDateToStr(startTime, "15:04:05")
// kpi data数据json
kpiTitles := s.kpiReportService.FindTitle(neInfo.NeType)
KpiValues := make([]map[string]any, 0)
for _, kt := range kpiTitles {
item := map[string]any{
"kpiId": kt.KpiId,
"value": 0,
"err": "",
}
// 匹配指标记录
for k, v := range k.Data {
if k == kt.KpiId {
item["value"] = v
}
}
KpiValues = append(KpiValues, item)
}
KpiValuesByte, err := json.Marshal(KpiValues)
if err != nil {
return err
}
// KPI 信息
kpiData := neDataModel.KpiReport{
NeType: neInfo.NeType,
NeName: neInfo.NeName,
RmUid: neInfo.RmUID,
Date: recordDate,
StartTime: recordStartTime,
EndTime: recordEndTime,
Index: index,
Granularity: k.Granularity,
KpiValues: string(KpiValuesByte),
CreatedAt: k.RecordTime,
}
insertId := s.kpiReportService.Insert(kpiData)
if insertId <= 0 {
return fmt.Errorf("add kpi data fail")
}
kpiData.ID = insertId
// 指标事件对象
data := map[string]any{
"neType": kpiData.NeType,
"neName": kpiData.NeName,
"rmUID": kpiData.RmUid,
"startIndex": kpiData.Index,
"timeGroup": kpiData.CreatedAt,
// kip_id ...
}
for _, v := range KpiValues {
data[fmt.Sprint(v["kpiId"])] = v["value"]
}
// 推送到ws订阅组
s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, neInfo.NeType, neInfo.NeId), data)
// 更新UPF总流量
if neInfo.NeType == "UPF" {
upValue := parse.Number(data["UPF.03"])
downValue := parse.Number(data["UPF.06"])
s.kpiReportService.UPFTodayFlowUpdate(neInfo.RmUID, upValue, downValue)
}
return nil
}
// saveKPIDataC 存储自定义KPI数据并推送到ws订阅组
func (s KPI) saveKPIDataC(neInfo neModel.NeInfo, k oam.KPI, index int64) error {
// 时间数据处理
recordTime := time.Now()
if k.RecordTime > 1e12 {
recordTime = time.UnixMilli(k.RecordTime)
} else if k.RecordTime > 1e9 {
recordTime = time.Unix(k.RecordTime, 0)
}
recordDate := date.ParseDateToStr(recordTime, "2006-01-02")
recordEndTime := date.ParseDateToStr(recordTime, "15:04:05")
startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second)
recordStartTime := date.ParseDateToStr(startTime, "15:04:05")
// kpi data数据json
kpiCTitles := s.kpiCReportService.FindTitle(neInfo.NeType)
KpiValues := make([]map[string]any, 0)
// 自定义指标的表达式环境变量
KpiExprEnv := make(map[string]any, 0)
for k, v := range k.Data {
KpiExprEnv[k] = v
}
// 自定义指标的计算
for _, v := range kpiCTitles {
item := map[string]any{
"kpiId": v.KpiId,
"value": 0,
"err": "",
}
// 匹配指标记录
if envValue, envOk := KpiExprEnv[v.KpiId]; envOk {
item["value"] = envValue
}
// 计算结果
exprStr, exprEnv := expr.ParseExprEnv(v.Expression, KpiExprEnv)
result, err := expr.Eval(exprStr, exprEnv)
if err != nil {
item["value"] = 0
item["err"] = err.Error()
} else {
if v.Unit == "%" {
resultV, ok := result.(float64)
if !ok || math.IsNaN(resultV) {
resultV = 0
}
if resultV > 100 {
result = 100
}
if resultV <= 0 {
result = 0
}
}
item["value"] = result
}
KpiValues = append(KpiValues, item)
}
KpiValuesByte, err := json.Marshal(KpiValues)
if err != nil {
return err
}
// KPI 信息
kpiCData := neDataModel.KpiCReport{
NeType: neInfo.NeType,
NeName: neInfo.NeName,
RmUid: neInfo.RmUID,
Date: recordDate,
StartTime: recordStartTime,
EndTime: recordEndTime,
Index: index,
Granularity: k.Granularity,
KpiValues: string(KpiValuesByte),
CreatedAt: k.RecordTime,
}
insertId := s.kpiCReportService.Insert(kpiCData)
if insertId <= 0 {
return fmt.Errorf("add kpic data fail")
}
kpiCData.ID = insertId
// 指标事件对象
data := map[string]any{
"neType": kpiCData.NeType,
"neName": kpiCData.NeName,
"rmUID": kpiCData.RmUid,
"startIndex": kpiCData.Index,
"timeGroup": kpiCData.CreatedAt,
// kip_id ...
}
for _, v := range KpiValues {
data[fmt.Sprint(v["kpiId"])] = v["value"]
}
// 推送到ws订阅组
s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, neInfo.NeType, neInfo.NeId), data)
return nil
}

View File

@@ -0,0 +1,65 @@
package service
import (
"fmt"
"time"
"be.ems/src/framework/utils/date"
"github.com/tsmask/go-oam"
neDataModel "be.ems/src/modules/network_data/model"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
)
// 实例化服务层 NBState 结构体
var NewNBState = &NBState{
neInfoService: neService.NewNeInfo,
wsService: wsService.NewWSSend,
nbStateService: neDataService.NewNBState,
}
// NBState 消息处理
type NBState struct {
neInfoService *neService.NeInfo
wsService *wsService.WSSend
nbStateService *neDataService.NBState
}
// Resolve 接收处理
func (s *NBState) Resolve(n oam.NBState) error {
// 是否存在网元
neInfo := s.neInfoService.FindByRmuid(n.NeUid)
if neInfo.NeType == "" || neInfo.RmUID != n.NeUid {
return fmt.Errorf("resolve nb_state network element does not exist %s", n.NeUid)
}
nbState := neDataModel.NBState{
NeType: neInfo.NeType,
NeId: neInfo.NeId,
RmUid: neInfo.RmUID,
Address: n.Address,
Name: n.Name,
Position: n.Position,
NbName: n.DeviceName,
State: n.State,
Time: date.ParseDateToStr(n.StateTime, time.RFC3339),
}
insertId := s.nbStateService.Insert(nbState)
if insertId <= 0 {
return fmt.Errorf("add nb_state data fail")
}
nbState.ID = insertId
// 推送到ws订阅组
switch neInfo.NeType {
case "AMF":
s.wsService.ByGroupID(wsService.GROUP_AMF_NB, nbState)
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_AMF_NB, neInfo.NeId), nbState)
case "MME":
s.wsService.ByGroupID(wsService.GROUP_MME_NB, nbState)
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_MME_NB, neInfo.NeId), nbState)
}
return nil
}

View File

@@ -0,0 +1,69 @@
package service
import (
"encoding/json"
"fmt"
"github.com/tsmask/go-oam"
neDataModel "be.ems/src/modules/network_data/model"
neDataRepository "be.ems/src/modules/network_data/repository"
neDataService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
)
// 实例化服务层 UENB 结构体
var NewUENB = &UENB{
neInfoService: neService.NewNeInfo,
wsService: wsService.NewWSSend,
ueEventService: neDataService.NewUEEvent,
}
// UENB 消息处理
type UENB struct {
neInfoService *neService.NeInfo
wsService *wsService.WSSend
ueEventService *neDataService.UEEvent // UE会话事件服务
}
// Resolve 接收处理
func (s *UENB) Resolve(u oam.UENB) error {
// 是否存在网元
neInfo := s.neInfoService.FindByRmuid(u.NeUid)
if neInfo.NeType == "" || neInfo.RmUID != u.NeUid {
return fmt.Errorf("resolve ue_nb network element does not exist %s", u.NeUid)
}
// 查询租户ID
tenantID, _ := neDataRepository.NewSysTenant.Query(map[string]string{
"imsi": u.IMSI,
})
uenbByte, _ := json.Marshal(u)
uenbEvent := neDataModel.UEEvent{
NeType: neInfo.NeType,
NeName: neInfo.NeName,
RmUID: neInfo.RmUID,
Timestamp: u.RecordTime,
EventType: u.Type,
EventJSONStr: string(uenbByte),
TenantID: fmt.Sprintf("%d", tenantID),
}
insertId := s.ueEventService.Insert(uenbEvent)
if insertId <= 0 {
return fmt.Errorf("add ue_nb data fail")
}
uenbEvent.ID = insertId
// 推送到ws订阅组
switch neInfo.NeType {
case "AMF":
s.wsService.ByGroupID(wsService.GROUP_AMF_UE, uenbEvent)
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_AMF_UE, neInfo.NeId), uenbEvent)
case "MME":
s.wsService.ByGroupID(wsService.GROUP_MME_UE, uenbEvent)
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_MME_UE, neInfo.NeId), uenbEvent)
}
return nil
}