feat: 新增oam对外开放无限制接口模块
This commit is contained in:
236
src/modules/oam/service/kpi.go
Normal file
236
src/modules/oam/service/kpi.go
Normal file
@@ -0,0 +1,236 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/logger"
|
||||
"be.ems/src/framework/utils/date"
|
||||
"be.ems/src/framework/utils/expr"
|
||||
"be.ems/src/framework/utils/parse"
|
||||
"github.com/tsmask/go-oam"
|
||||
|
||||
neDataModel "be.ems/src/modules/network_data/model"
|
||||
neDataService "be.ems/src/modules/network_data/service"
|
||||
neModel "be.ems/src/modules/network_element/model"
|
||||
neService "be.ems/src/modules/network_element/service"
|
||||
wsService "be.ems/src/modules/ws/service"
|
||||
)
|
||||
|
||||
// 实例化服务层 KPI 结构体
|
||||
var NewKPI = &KPI{
|
||||
neInfoService: neService.NewNeInfo,
|
||||
wsService: wsService.NewWSSend,
|
||||
kpiReportService: neDataService.NewKpiReport,
|
||||
kpiCReportService: neDataService.NewKpiCReport,
|
||||
}
|
||||
|
||||
// KPI 消息处理
|
||||
type KPI struct {
|
||||
neInfoService *neService.NeInfo
|
||||
wsService *wsService.WSSend
|
||||
kpiReportService *neDataService.KpiReport
|
||||
kpiCReportService *neDataService.KpiCReport
|
||||
neInfo neModel.NeInfo
|
||||
}
|
||||
|
||||
// Resolve 接收处理
|
||||
func (s *KPI) Resolve(k oam.KPI) error {
|
||||
if len(k.Data) == 0 {
|
||||
return fmt.Errorf("kpi data is nil")
|
||||
}
|
||||
// 是否存在网元
|
||||
s.neInfo = s.neInfoService.FindByRmuid(k.NeUid)
|
||||
if s.neInfo.NeType == "" || s.neInfo.RmUID != k.NeUid {
|
||||
logger.Warnf("resolve kpi network element does not exist %s", k.NeUid)
|
||||
return fmt.Errorf("resolve kpi network element does not exist %s", k.NeUid)
|
||||
}
|
||||
|
||||
// 时间片
|
||||
curTime := time.Now()
|
||||
curSeconds := curTime.Hour()*3600 + curTime.Minute()*60 + curTime.Second()
|
||||
index := int64(curSeconds) / k.Granularity
|
||||
|
||||
if err := s.saveKPIData(k, index); err != nil {
|
||||
logger.Warnf("resolve kpi data fail %s", k.NeUid)
|
||||
return err
|
||||
}
|
||||
if err := s.saveKPIDataC(k, index); err != nil {
|
||||
logger.Warnf("resolve kpic data fail %s", k.NeUid)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveKPIData 存储KPI数据并推送到ws订阅组
|
||||
func (s *KPI) saveKPIData(k oam.KPI, index int64) error {
|
||||
// 时间数据处理
|
||||
recordTime := time.Now()
|
||||
if k.RecordTime > 1e12 {
|
||||
recordTime = time.UnixMilli(k.RecordTime)
|
||||
} else if k.RecordTime > 1e9 {
|
||||
recordTime = time.Unix(k.RecordTime, 0)
|
||||
}
|
||||
recordDate := date.ParseDateToStr(recordTime, "2006-01-02")
|
||||
recordEndTime := date.ParseDateToStr(recordTime, "15:04:05")
|
||||
startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second)
|
||||
recordStartTime := date.ParseDateToStr(startTime, "15:04:05")
|
||||
|
||||
// kpi data数据json
|
||||
kpiTitles := s.kpiReportService.FindTitle(s.neInfo.NeType)
|
||||
KpiValues := make([]map[string]any, 0)
|
||||
for _, kt := range kpiTitles {
|
||||
item := map[string]any{
|
||||
"kpiId": kt.KpiId,
|
||||
"value": 0,
|
||||
"err": "",
|
||||
}
|
||||
// 匹配指标记录
|
||||
for k, v := range k.Data {
|
||||
if k == kt.KpiId {
|
||||
item["value"] = v
|
||||
}
|
||||
}
|
||||
KpiValues = append(KpiValues, item)
|
||||
}
|
||||
|
||||
KpiValuesByte, _ := json.Marshal(KpiValues)
|
||||
|
||||
// KPI 信息
|
||||
kpiData := neDataModel.KpiReport{
|
||||
NeType: s.neInfo.NeType,
|
||||
NeName: s.neInfo.NeName,
|
||||
RmUid: s.neInfo.RmUID,
|
||||
Date: recordDate,
|
||||
StartTime: recordStartTime,
|
||||
EndTime: recordEndTime,
|
||||
Index: index,
|
||||
Granularity: k.Granularity,
|
||||
KpiValues: string(KpiValuesByte),
|
||||
CreatedAt: k.RecordTime,
|
||||
}
|
||||
insertId := s.kpiReportService.Insert(kpiData)
|
||||
if insertId <= 0 {
|
||||
return fmt.Errorf("add kpi data fail")
|
||||
}
|
||||
kpiData.ID = insertId
|
||||
|
||||
// 指标事件对象
|
||||
data := map[string]any{
|
||||
"neType": kpiData.NeType,
|
||||
"neName": kpiData.NeName,
|
||||
"rmUID": kpiData.RmUid,
|
||||
"startIndex": kpiData.Index,
|
||||
"timeGroup": kpiData.CreatedAt,
|
||||
// kip_id ...
|
||||
}
|
||||
for _, v := range KpiValues {
|
||||
data[fmt.Sprint(v["kpiId"])] = v["value"]
|
||||
}
|
||||
|
||||
// 推送到ws订阅组
|
||||
s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI, s.neInfo.NeType, s.neInfo.NeId), data)
|
||||
// 更新UPF总流量
|
||||
if s.neInfo.NeType == "UPF" {
|
||||
upValue := parse.Number(data["UPF.03"])
|
||||
downValue := parse.Number(data["UPF.06"])
|
||||
s.kpiReportService.UPFTodayFlowUpdate(s.neInfo.RmUID, upValue, downValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveKPIDataC 存储自定义KPI数据并推送到ws订阅组
|
||||
func (s *KPI) saveKPIDataC(k oam.KPI, index int64) error {
|
||||
// 时间数据处理
|
||||
recordTime := time.Now()
|
||||
if k.RecordTime > 1e12 {
|
||||
recordTime = time.UnixMilli(k.RecordTime)
|
||||
} else if k.RecordTime > 1e9 {
|
||||
recordTime = time.Unix(k.RecordTime, 0)
|
||||
}
|
||||
recordDate := date.ParseDateToStr(recordTime, "2006-01-02")
|
||||
recordEndTime := date.ParseDateToStr(recordTime, "15:04:05")
|
||||
startTime := recordTime.Add(-time.Duration(k.Granularity) * time.Second)
|
||||
recordStartTime := date.ParseDateToStr(startTime, "15:04:05")
|
||||
|
||||
// kpi data数据json
|
||||
kpiCTitles := s.kpiCReportService.FindTitle(s.neInfo.NeType)
|
||||
KpiValues := make([]map[string]any, 0)
|
||||
// 自定义指标的表达式环境变量
|
||||
KpiExprEnv := make(map[string]any, 0)
|
||||
for k, v := range k.Data {
|
||||
KpiExprEnv[k] = v
|
||||
}
|
||||
// 自定义指标的计算
|
||||
for _, v := range kpiCTitles {
|
||||
item := map[string]any{
|
||||
"kpiId": v.KpiId,
|
||||
"value": 0,
|
||||
"err": "",
|
||||
}
|
||||
|
||||
// 匹配指标记录
|
||||
if envValue, envOk := KpiExprEnv[v.KpiId]; envOk {
|
||||
item["value"] = envValue
|
||||
}
|
||||
|
||||
// 计算结果
|
||||
exprStr, exprEnv := expr.ParseExprEnv(v.Expression, KpiExprEnv)
|
||||
result, err := expr.Eval(exprStr, exprEnv)
|
||||
if err != nil {
|
||||
item["value"] = 0
|
||||
item["err"] = err.Error()
|
||||
} else {
|
||||
if v.Unit == "%" {
|
||||
resultInt64 := parse.Number(result)
|
||||
if resultInt64 > 100 {
|
||||
result = 100
|
||||
}
|
||||
if resultInt64 < 0 {
|
||||
result = 0
|
||||
}
|
||||
}
|
||||
|
||||
item["value"] = result
|
||||
}
|
||||
KpiValues = append(KpiValues, item)
|
||||
}
|
||||
KpiValuesByte, _ := json.Marshal(KpiValues)
|
||||
|
||||
// KPI 信息
|
||||
kpiCData := neDataModel.KpiCReport{
|
||||
NeType: s.neInfo.NeType,
|
||||
NeName: s.neInfo.NeName,
|
||||
RmUid: s.neInfo.RmUID,
|
||||
Date: recordDate,
|
||||
StartTime: recordStartTime,
|
||||
EndTime: recordEndTime,
|
||||
Index: index,
|
||||
Granularity: k.Granularity,
|
||||
KpiValues: string(KpiValuesByte),
|
||||
CreatedAt: k.RecordTime,
|
||||
}
|
||||
insertId := s.kpiCReportService.Insert(kpiCData)
|
||||
if insertId <= 0 {
|
||||
return fmt.Errorf("add kpic data fail")
|
||||
}
|
||||
kpiCData.ID = insertId
|
||||
|
||||
// 指标事件对象
|
||||
data := map[string]any{
|
||||
"neType": kpiCData.NeType,
|
||||
"neName": kpiCData.NeName,
|
||||
"rmUID": kpiCData.RmUid,
|
||||
"startIndex": kpiCData.Index,
|
||||
"timeGroup": kpiCData.CreatedAt,
|
||||
// kip_id ...
|
||||
}
|
||||
for _, v := range KpiValues {
|
||||
data[fmt.Sprint(v["kpiId"])] = v["value"]
|
||||
}
|
||||
|
||||
// 推送到ws订阅组
|
||||
s.wsService.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_KPI_C, s.neInfo.NeType, s.neInfo.NeId), data)
|
||||
return nil
|
||||
}
|
||||
67
src/modules/oam/service/nb_state.go
Normal file
67
src/modules/oam/service/nb_state.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/logger"
|
||||
"be.ems/src/framework/utils/date"
|
||||
"github.com/tsmask/go-oam"
|
||||
|
||||
neDataModel "be.ems/src/modules/network_data/model"
|
||||
neDataService "be.ems/src/modules/network_data/service"
|
||||
neService "be.ems/src/modules/network_element/service"
|
||||
wsService "be.ems/src/modules/ws/service"
|
||||
)
|
||||
|
||||
// 实例化服务层 NBState 结构体
|
||||
var NewNBState = &NBState{
|
||||
neInfoService: neService.NewNeInfo,
|
||||
wsService: wsService.NewWSSend,
|
||||
nbStateService: neDataService.NewNBState,
|
||||
}
|
||||
|
||||
// NBState 消息处理
|
||||
type NBState struct {
|
||||
neInfoService *neService.NeInfo
|
||||
wsService *wsService.WSSend
|
||||
nbStateService *neDataService.NBState
|
||||
}
|
||||
|
||||
// Resolve 接收处理
|
||||
func (s *NBState) Resolve(n oam.NBState) error {
|
||||
// 是否存在网元
|
||||
neInfo := s.neInfoService.FindByRmuid(n.NeUid)
|
||||
if neInfo.NeType == "" || neInfo.RmUID != n.NeUid {
|
||||
logger.Warnf("resolve nb_state network element does not exist %s", n.NeUid)
|
||||
return fmt.Errorf("resolve nb_state network element does not exist %s", n.NeUid)
|
||||
}
|
||||
|
||||
nbState := neDataModel.NBState{
|
||||
NeType: neInfo.NeType,
|
||||
NeId: neInfo.NeId,
|
||||
RmUid: neInfo.RmUID,
|
||||
Address: n.Address,
|
||||
Name: n.Name,
|
||||
Position: n.Position,
|
||||
NbName: n.DeviceName,
|
||||
State: n.State,
|
||||
Time: date.ParseDateToStr(n.StateTime, time.RFC3339),
|
||||
}
|
||||
insertId := s.nbStateService.Insert(nbState)
|
||||
if insertId <= 0 {
|
||||
return fmt.Errorf("add nb_state data fail")
|
||||
}
|
||||
nbState.ID = insertId
|
||||
|
||||
// 推送到ws订阅组
|
||||
switch neInfo.NeType {
|
||||
case "AMF":
|
||||
s.wsService.ByGroupID(wsService.GROUP_AMF_NB, nbState)
|
||||
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_AMF_NB, neInfo.NeId), nbState)
|
||||
case "MME":
|
||||
s.wsService.ByGroupID(wsService.GROUP_MME_NB, nbState)
|
||||
s.wsService.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_MME_NB, neInfo.NeId), nbState)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user