feat: sync NE NRM data

This commit is contained in:
simon
2025-04-27 20:02:36 +08:00
parent f7cb6f84b0
commit ccd2f0218c
10 changed files with 1427 additions and 156 deletions

View File

@@ -44,5 +44,6 @@ INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '
INSERT INTO `sys_job` VALUES (14, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smsc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceType\')) as service_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.result\')) as result,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.updateTime\')) as update_time\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
INSERT INTO `sys_job` VALUES (15, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1728634085631, '');
INSERT INTO `sys_job` VALUES (16, 'job.exportSGWCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_sgwc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as recordType,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.accessPointNameNI\')) as accessPointNameNI,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedIMSI\')) as IMSI,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedMSISDN\')) as MSISDN,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedPDPPDNAddress\')) as PdpAddress,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) as duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordOpeningTime\')) as recordOpeningTime,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) as chargingID,JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfTrafficVolumes[0].dataVolumeGPRSDownlink\')) AS dataVolumeGPRSDownlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfTrafficVolumes[0].dataVolumeGPRsUplink\')) as dataVolumeGPRsUplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.userLocationInformation.tai.tac\')) as tac,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.userLocationInformation.ecgi.eutraCellId\')) as cellID\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/sgwc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
INSERT INTO `sys_job` VALUES (17, 'job.nbiNRM', 'SYSTEM', 'nbiNRM', '{}', '0 10 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, '');
SET FOREIGN_KEY_CHECKS = 1;

View File

@@ -1,30 +1,36 @@
package amf
import "be.ems/src/modules/crontask/processor/nbiNRM/common"
const (
AMF string = "AMF" // 网元类型
)
type ManagedElement struct {
Id string `json:"id"`
UserLabel string `json:"userLabel"`
VendorName string `json:"vendorName"`
ManagedBy string `json:"managedBy"`
ManagementIpAddress string `json:"managementIpAddress"`
SwVersion string `json:"swVersion"`
PatchInfo string `json:"patchInfo"`
AdministrativeState string `json:"administrativeState"`
OperationalState string `json:"operationalState"`
Id string `json:"id"`
UserLabel string `json:"userLabel"`
VendorName string `json:"vendorName"`
ManagedBy string `json:"managedBy"`
ManagementIpAddress string `json:"managementIpAddress"`
SwVersion string `json:"swVersion"`
PatchInfo string `json:"patchInfo"`
AdministrativeState common.AdministrativeState `json:"administrativeState"`
OperationalState common.OperationalState `json:"operationalState"`
}
type AmfFunction struct {
Id string `json:"id"`
UserLabel string `json:"userLabel"`
AdministrativeState string `json:"administrativeState"`
OperationalState string `json:"operationalState"`
VnfInstanceId string `json:"vnfInstanceId"`
Fqdn string `json:"fqdn"`
SbiServiceList string `json:"sbiServiceList"`
AmfGuamiList string `json:"amfGuamiList"`
SnssaiList string `json:"snssaiList"`
MaxUser int `json:"maxUser"`
RelativeCapacity int `json:"relativeCapacity"`
MaxGnbNum int `json:"maxGnbNum"`
Id string `json:"id"`
UserLabel string `json:"userLabel"`
AdministrativeState common.AdministrativeState `json:"administrativeState"`
OperationalState common.OperationalState `json:"operationalState"`
VnfInstanceId string `json:"vnfInstanceId"`
Fqdn string `json:"fqdn"`
SbiServiceList string `json:"sbiServiceList"`
AmfGuamiList string `json:"amfGuamiList"`
SnssaiList string `json:"snssaiList"`
MaxUser int `json:"maxUser"`
RelativeCapacity int `json:"relativeCapacity"`
MaxGnbNum int `json:"maxGnbNum"`
}
type EpRpDynN8Amf struct {

View File

@@ -0,0 +1,138 @@
package common
import (
"fmt"
"strconv"
)
type AdministrativeState string
const (
Locked AdministrativeState = "Locked"
Unlocked AdministrativeState = "Unlocked"
ShuttingDown AdministrativeState = "ShuttingDown"
)
type OperationalState string
const (
Enabled OperationalState = "Enabled"
Disabled OperationalState = "Disabled"
)
const (
// NBI CM表名
NbiCmTableName = "nbi_cm"
CmVersion = "v1" // CM版本
)
type EventType int
const (
ObjectNullEvent EventType = iota // ObjectNullEvent 空事件
ObjectCreationEvent // ObjectCreationEvent 创建事件
ObjectDeletionEvent // ObjectDeletionEvent 删除事件
ObjectAttributeValueChangeEvent // ObjectAttributeValueChangeEvent 修改事件
ObjectUnkownEvent // 未知事件
)
func (et EventType) EventTypeEnumString() string {
switch et {
case ObjectCreationEvent:
return "ObjectCreationEvent"
case ObjectDeletionEvent:
return "ObjectDeletionEvent"
case ObjectAttributeValueChangeEvent:
return "ObjectAttributeValueChangeEvent"
case ObjectNullEvent:
return "ObjectNullEvent"
default:
return "ObjectUnkownEvent"
}
}
func (et EventType) EventTypeIntString() string {
return fmt.Sprintf("%d", et)
}
// EventTypeInt 将字符串转换为 EventType 枚举类型
func EventTypeInt(s string) EventType {
if i, err := strconv.Atoi(s); err == nil {
return EventType(i)
}
switch s {
case "ObjectCreationEvent":
return ObjectCreationEvent
case "ObjectDeletionEvent":
return ObjectDeletionEvent
case "ObjectAttributeValueChangeEvent":
return ObjectAttributeValueChangeEvent
case "":
// 如果字符串为空,则返回未知事件
return ObjectNullEvent
default:
return ObjectUnkownEvent
}
}
type NeStatus int
const (
NeStatusOffline NeStatus = iota
NeStatusActive
NeStatusToSync
NeStatusStandby
NeStatusUnknown
)
// ParseStateFromStatus 将状态字符串转换为 AdministrativeState 和 OperationalState
func ParseStateFromStatus(status NeStatus) (AdministrativeState, OperationalState) {
var adminState AdministrativeState
var operState OperationalState
switch status {
case NeStatusOffline:
adminState = Locked
operState = Disabled
case NeStatusActive:
adminState = Unlocked
operState = Enabled
case NeStatusToSync:
adminState = Unlocked
operState = Enabled
case NeStatusStandby:
adminState = Locked
operState = Enabled
default:
adminState = ShuttingDown
operState = Disabled
}
return adminState, operState
}
type NeInfo struct {
NeId string `db:"ne_id"`
NeType string `db:"ne_type"`
RmUid string `db:"rm_uid"`
VendorName string `db:"vendor_name"`
NeName string `db:"ne_name"`
Ip string `db:"ip"`
Port string `db:"port"`
Dn string `db:"dn"`
Status NeStatus `db:"status"`
}
// NbiCm 表结构
type NbiCm struct {
Id int `json:"id" db:"id PRIMARY KEY"`
NeType string `json:"neType" db:"ne_type"`
NeId string `json:"neId" db:"ne_id"`
CmVersion string `json:"cmVersion" db:"cm_version"`
RmUid string `json:"rmUid" db:"rm_uid"`
EventType EventType `json:"eventType" db:"event_type"`
ObjectType string `json:"objectType" db:"object_type"`
ValueJson string `json:"valueJson" db:"value_json"`
Timestamp string `json:"timestamp" db:"timestamp"`
}

View File

@@ -2,80 +2,11 @@ package nbiNRM
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"be.ems/lib/log"
"be.ems/src/framework/cron"
)
const (
// NBI CM表名
NbiCmTableName = "nbi_cm"
CmVersion = "v1" // CM版本
)
type EventType int
const (
ObjectCreationEvent EventType = iota // ObjectCreationEvent 创建事件
ObjectDeletionEvent // ObjectDeletionEvent 删除事件
ObjectAttributeValueChangeEvent // ObjectAttributeValueChangeEvent 修改事件
ObjectUnkownEvent // 未知事件
)
func (et EventType) EventTypeEnumString() string {
switch et {
case ObjectCreationEvent:
return "ObjectCreationEvent"
case ObjectDeletionEvent:
return "ObjectDeletionEvent"
case ObjectAttributeValueChangeEvent:
return "ObjectAttributeValueChangeEvent"
default:
return "ObjectUnkownEvent"
}
}
func (et EventType) EventTypeIntString() string {
return fmt.Sprintf("%d", et)
}
// ParseCallTag 将字符串转换为 CallTag 枚举类型
func EventTypeInt(s string) EventType {
if i, err := strconv.Atoi(s); err == nil {
return EventType(i)
}
// 如果转换失败,则按名称匹配(忽略大小写)
switch strings.ToLower(s) {
case "ObjectCreationEvent":
return ObjectCreationEvent
case "ObjectDeletionEvent":
return ObjectDeletionEvent
case "ObjectAttributeValueChangeEvent":
return ObjectAttributeValueChangeEvent
case "":
// 如果字符串为空,则返回未知事件
return ObjectUnkownEvent
default:
return ObjectUnkownEvent
}
}
// NbiCm 表结构
type NbiCm struct {
Id string `json:"id" db:"id PRIMARY KEY"`
NeType string `json:"neType" db:"ne_type"`
NeId string `json:"neId" db:"ne_id"`
CmVersion string `json:"cmVersion" db:"cm_version"`
RmUid string `json:"rmUid" db:"rm_uid"`
EventType EventType `json:"eventType" db:"event_type"`
ObjectType string `json:"objectType" db:"object_type"`
ValueJson string `json:"valueJson" db:"value_json"`
Timestamp string `json:"timestamp" db:"timestamp"`
}
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
@@ -116,10 +47,29 @@ func (s *BarProcessor) Execute(data any) (any, error) {
log.Errorf("SyncAmfNbiCM error: %v", err)
return nil, err
}
err = s.SyncPcfNbiCM()
if err != nil {
log.Errorf("SyncPcfNbiCM error: %v", err)
return nil, err
}
err = s.SyncUdmNbiCM()
if err != nil {
log.Errorf("SyncUdmNbiCM error: %v", err)
return nil, err
}
err = s.SyncSmfNbiCM()
if err != nil {
log.Errorf("SyncSmfNbiCM error: %v", err)
return nil, err
}
err = s.SyncUpfNbiCM()
if err != nil {
log.Errorf("SyncUpfNbiCM error: %v", err)
return nil, err
}
// 返回结果,用于记录执行结果
return map[string]any{
"msg": "sucess",
"affected": 1,
"msg": "sucess",
}, nil
}

View File

@@ -1,33 +1,39 @@
package pcf
import "be.ems/src/modules/crontask/processor/nbiNRM/common"
const (
PCF string = "PCF" // 网元类型
)
type ManagedElement struct {
Id string `json:"id"`
UserLabel string `json:"userLabel"`
VendorName string `json:"vendorName"`
ManagedBy string `json:"managedBy"`
ManagementIpAddress string `json:"managementIpAddress"`
SwVersion string `json:"swVersion"`
PatchInfo string `json:"patchInfo"`
AdministrativeState string `json:"administrativeState"`
OperationalState string `json:"operationalState"`
Id string `json:"id"`
UserLabel string `json:"userLabel"`
VendorName string `json:"vendorName"`
ManagedBy string `json:"managedBy"`
ManagementIpAddress string `json:"managementIpAddress"`
SwVersion string `json:"swVersion"`
PatchInfo string `json:"patchInfo"`
AdministrativeState common.AdministrativeState `json:"administrativeState"`
OperationalState common.OperationalState `json:"operationalState"`
}
type PcfFunction struct {
Id string `json:"id"`
UserLabel string `json:"userLabel"`
AdministrativeState string `json:"administrativeState"`
OperationalState string `json:"operationalState"`
VnfInstanceId string `json:"vnfInstanceId"`
Fqdn string `json:"fqdn"`
SbiServiceList string `json:"sbiServiceList"`
Id string `json:"id"`
UserLabel string `json:"userLabel"`
AdministrativeState common.AdministrativeState `json:"administrativeState"`
OperationalState common.OperationalState `json:"operationalState"`
VnfInstanceId string `json:"vnfInstanceId"`
Fqdn string `json:"fqdn"`
SbiServiceList string `json:"sbiServiceList"`
}
type UdrFunction struct {
Id string `json:"id"`
UserLabel string `json:"userLabel"`
AdministrativeState string `json:"administrativeState"`
OperationalState string `json:"operationalState"`
MaxSubNbr int `json:"maxSubNbr"`
Id string `json:"id"`
UserLabel string `json:"userLabel"`
AdministrativeState common.AdministrativeState `json:"administrativeState"`
OperationalState common.OperationalState `json:"operationalState"`
MaxSubNbr int `json:"maxSubNbr"`
}
type IPResource struct {

View File

@@ -9,6 +9,7 @@ import (
"be.ems/lib/log"
"be.ems/src/modules/crontask/processor/nbiNRM/amf"
"be.ems/src/modules/crontask/processor/nbiNRM/common"
)
// SyncAmfNbiCM 从ne_info获取AMF网元并同步数据到nbi_cm表
@@ -16,16 +17,9 @@ func (s *BarProcessor) SyncAmfNbiCM() error {
log.Info("Starting AMF NBI CM synchronization")
// 从ne_info表获取AMF类型的网元
var amfNEs []struct {
NeID string `db:"ne_id"`
NeType string `db:"ne_type"`
RmUid string `db:"rm_uid"`
}
err := dborm.DefaultDB().Table("ne_info").
Where("ne_type = ? AND status = 1", "AMF").
var amfNEs []common.NeInfo
err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "AMF").
Find(&amfNEs).Error
if err != nil {
log.Errorf("Failed to query AMF network elements: %v", err)
return err
@@ -39,39 +33,42 @@ func (s *BarProcessor) SyncAmfNbiCM() error {
// 遍历每个AMF网元生成对应的NBI CM记录
for _, ne := range amfNEs {
adminState, operState := common.ParseStateFromStatus(ne.Status)
// 为每个网元生成ManagedElement记录
managedElement := amf.ManagedElement{
Id: "ManagedElement=" + ne.NeID,
UserLabel: "AMF-" + ne.NeID,
VendorName: "Vendor",
ManagedBy: "OMC",
ManagementIpAddress: "", // 可以从其他表获取IP地址
SwVersion: "", // 可以从其他表获取软件版本
AdministrativeState: "UNLOCKED",
OperationalState: "ENABLED",
Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), // 生成唯一ID
UserLabel: ne.NeName,
VendorName: ne.VendorName,
ManagedBy: ne.Dn,
ManagementIpAddress: ne.Ip, // 可以从其他表获取IP地址
SwVersion: "", // 可以从其他表获取软件版本
PatchInfo: "-",
AdministrativeState: adminState,
OperationalState: operState,
}
// 序列化为JSON
meJSON, err := json.Marshal(managedElement)
if err != nil {
log.Errorf("Failed to marshal ManagedElement for ne_id %s: %v", ne.NeID, err)
log.Errorf("Failed to marshal ManagedElement for ne_id %s: %v", ne.NeId, err)
continue
}
// 生成唯一ID
meID := fmt.Sprintf("nbi-cm-%s-me-%d", ne.NeID, timestamp)
// meID := fmt.Sprintf("nbi-cm-%s-me-%d", ne.NeID, timestamp)
// 插入ManagedElement记录
nbiCM := NbiCm{
Id: meID,
nbiCM := common.NbiCm{
// Id: meID,
NeType: ne.NeType,
NeId: ne.NeID,
CmVersion: CmVersion,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: EventTypeInt("ObjectCreationEvent"),
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "ManagedElement",
ValueJson: string(meJSON),
Timestamp: fmt.Sprintf("%d", timestamp),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
@@ -83,14 +80,14 @@ func (s *BarProcessor) SyncAmfNbiCM() error {
// 为每个网元生成AmfFunction记录
amfFunction := amf.AmfFunction{
Id: "ManagedElement=" + ne.NeID + ",AmfFunction=1",
UserLabel: "AMF-" + ne.NeID + "-Func",
AdministrativeState: "UNLOCKED",
OperationalState: "ENABLED",
VnfInstanceId: "vnf-" + ne.NeID,
Fqdn: "amf-" + ne.NeID + ".example.com",
Id: "AF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), // 生成唯一ID
UserLabel: ne.NeName + "-AmfFunction",
AdministrativeState: adminState,
OperationalState: operState,
VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId,
Fqdn: "amf-" + ne.NeId + ".5gc.3gpp.org",
SbiServiceList: "Namf_Communication,Namf_EventExposure,Namf_MT,Namf_Location",
AmfGuamiList: "[{\"mcc\":\"460\",\"mnc\":\"01\",\"amfId\":\"" + ne.NeID + "\"}]",
AmfGuamiList: "[{\"mcc\":\"460\",\"mnc\":\"01\",\"amfId\":\"" + ne.NeId + "\"}]",
SnssaiList: "[{\"sst\":1,\"sd\":\"010203\"}]",
MaxUser: 1000000,
RelativeCapacity: 30,
@@ -100,24 +97,24 @@ func (s *BarProcessor) SyncAmfNbiCM() error {
// 序列化为JSON
amfJSON, err := json.Marshal(amfFunction)
if err != nil {
log.Errorf("Failed to marshal AmfFunction for ne_id %s: %v", ne.NeID, err)
log.Errorf("Failed to marshal AmfFunction for ne_id %s: %v", ne.NeId, err)
continue
}
// 生成唯一ID
amfID := fmt.Sprintf("nbi-cm-%s-amf-%d", ne.NeID, timestamp)
// amfID := fmt.Sprintf("nbi-cm-%s-amf-%d", ne.NeID, timestamp)
// 插入AmfFunction记录
nbiCM = NbiCm{
Id: amfID,
nbiCM = common.NbiCm{
// Id: amfID,
NeType: ne.NeType,
NeId: ne.NeID,
CmVersion: "1.0",
RmUid: "OMC-SYSTEM",
EventType: EventTypeInt("ObjectCreationEvent"),
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "AmfFunction",
ValueJson: string(amfJSON),
Timestamp: fmt.Sprintf("%d", timestamp),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
@@ -127,8 +124,136 @@ func (s *BarProcessor) SyncAmfNbiCM() error {
continue
}
// 可以继续添加其他AMF相关对象如EpRpDynN8Amf、IPResource等
// 这里仅展示了基本结构,可根据实际需求扩展
// 在 AmfFunction 记录创建完成后添加以下代码
// 创建 EpRpDynN8Amf 记录
epRpDynN8Amf := amf.EpRpDynN8Amf{
Id: "N8" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-N8",
LocIpAddrList: "[\"" + ne.Ip + "\"]",
FarIpSubnetworkList: "[\"10.10.0.0/16\"]",
}
// 序列化为JSON
n8JSON, err := json.Marshal(epRpDynN8Amf)
if err != nil {
log.Errorf("Failed to marshal EpRpDynN8Amf for ne_id %s: %v", ne.NeId, err)
} else {
// 插入EpRpDynN8Amf记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "EpRpDynN8Amf",
ValueJson: string(n8JSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert EpRpDynN8Amf record: %v", err)
}
}
// 创建 EpRpDynN11Amf 记录
epRpDynN11Amf := amf.EpRpDynN11Amf{
Id: "N11" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-N11",
LocIpAddrList: "[\"" + ne.Ip + "\"]",
FarIpSubnetworkList: "[\"10.11.0.0/16\"]",
}
// 序列化为JSON
n11JSON, err := json.Marshal(epRpDynN11Amf)
if err != nil {
log.Errorf("Failed to marshal EpRpDynN11Amf for ne_id %s: %v", ne.NeId, err)
} else {
// 插入EpRpDynN11Amf记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "EpRpDynN11Amf",
ValueJson: string(n11JSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert EpRpDynN11Amf record: %v", err)
}
}
// 创建 EpRpDynN12Amf 记录
epRpDynN12Amf := amf.EpRpDynN12Amf{
Id: "N12" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-N12",
LocIpAddrList: "[\"" + ne.Ip + "\"]",
FarIpSubnetworkList: "[\"10.12.0.0/16\"]",
}
// 序列化为JSON
n12JSON, err := json.Marshal(epRpDynN12Amf)
if err != nil {
log.Errorf("Failed to marshal EpRpDynN12Amf for ne_id %s: %v", ne.NeId, err)
} else {
// 插入EpRpDynN12Amf记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "EpRpDynN12Amf",
ValueJson: string(n12JSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert EpRpDynN12Amf record: %v", err)
}
}
// 创建 IPResource 记录
ipResource := amf.IPResource{
Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-IPResource",
InterfaceType: "{Mgt,N8,N11,N12}",
LocIpV4AddrList: fmt.Sprintf("{%s,Default,Default,Default}", ne.Ip),
LocIpV6AddrList: "{Default,Default,Default,Default}",
}
// 序列化为JSON
ipJSON, err := json.Marshal(ipResource)
if err != nil {
log.Errorf("Failed to marshal IPResource for ne_id %s: %v", ne.NeId, err)
} else {
// 插入IPResource记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "IPResource",
ValueJson: string(ipJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert IPResource record: %v", err)
}
}
}
log.Info("AMF NBI CM synchronization completed")

View File

@@ -0,0 +1,182 @@
package nbiNRM
import (
"encoding/json"
"fmt"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/modules/crontask/processor/nbiNRM/common"
"be.ems/src/modules/crontask/processor/nbiNRM/pcf"
)
// SyncPcfNbiCM 从ne_info获取PCF网元并同步数据到nbi_cm表
func (s *BarProcessor) SyncPcfNbiCM() error {
log.Info("Starting PCF NBI CM synchronization")
// 从ne_info表获取PCF类型的网元
var pcfNEs []common.NeInfo
err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "PCF").
Find(&pcfNEs).Error
if err != nil {
log.Errorf("Failed to query PCF network elements: %v", err)
return err
}
log.Infof("Found %d PCF network elements", len(pcfNEs))
// 当前时间戳
now := time.Now()
timestamp := now.Unix()
// 遍历每个PCF网元生成对应的NBI CM记录
for _, ne := range pcfNEs {
adminState, operState := common.ParseStateFromStatus(ne.Status)
// 为每个网元生成ManagedElement记录
managedElement := pcf.ManagedElement{
Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName,
VendorName: ne.VendorName,
ManagedBy: ne.Dn,
ManagementIpAddress: ne.Ip,
SwVersion: "",
PatchInfo: "-",
AdministrativeState: adminState,
OperationalState: operState,
}
// 序列化为JSON
meJSON, err := json.Marshal(managedElement)
if err != nil {
log.Errorf("Failed to marshal ManagedElement for PCF ne_id %s: %v", ne.NeId, err)
continue
}
// 插入ManagedElement记录
nbiCM := common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "ManagedElement",
ValueJson: string(meJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert PCF ManagedElement record: %v", err)
continue
}
// 为每个网元生成PcfFunction记录
pcfFunction := pcf.PcfFunction{
Id: "PF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-PcfFunction",
AdministrativeState: adminState,
OperationalState: operState,
VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId,
Fqdn: "pcf-" + ne.NeId + ".5gc.3gpp.org",
SbiServiceList: "Npcf_AMPolicyControl,Npcf_PolicyAuthorization,Npcf_SMPolicyControl,Npcf_BDTPolicyControl",
}
// 序列化为JSON
pcfJSON, err := json.Marshal(pcfFunction)
if err != nil {
log.Errorf("Failed to marshal PcfFunction for ne_id %s: %v", ne.NeId, err)
continue
}
// 插入PcfFunction记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "PcfFunction",
ValueJson: string(pcfJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert PcfFunction record: %v", err)
continue
}
// 为每个网元生成UdrFunction记录
udrFunction := pcf.UdrFunction{
Id: "UF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-UdrFunction",
AdministrativeState: adminState,
OperationalState: operState,
MaxSubNbr: 500000,
}
// 序列化为JSON
udrJSON, err := json.Marshal(udrFunction)
if err != nil {
log.Errorf("Failed to marshal UdrFunction for ne_id %s: %v", ne.NeId, err)
} else {
// 插入UdrFunction记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "UdrFunction",
ValueJson: string(udrJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UdrFunction record: %v", err)
}
}
// 创建 IPResource 记录
ipResource := pcf.IPResource{
Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-IPResource",
InterfaceType: "{Mgt,N5,N7,N15}",
LocIpV4AddrList: fmt.Sprintf("{%s,Default,Default,Default}", ne.Ip),
LocIpV6AddrList: "{Default,Default,Default,Default}",
}
// 序列化为JSON
ipJSON, err := json.Marshal(ipResource)
if err != nil {
log.Errorf("Failed to marshal IPResource for PCF ne_id %s: %v", ne.NeId, err)
} else {
// 插入IPResource记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "IPResource",
ValueJson: string(ipJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert PCF IPResource record: %v", err)
}
}
}
log.Info("PCF NBI CM synchronization completed")
return nil
}

View File

@@ -0,0 +1,249 @@
package nbiNRM
import (
"encoding/json"
"fmt"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/modules/crontask/processor/nbiNRM/common"
"be.ems/src/modules/crontask/processor/nbiNRM/smf"
)
// SyncSmfNbiCM 从ne_info获取SMF网元并同步数据到nbi_cm表
func (s *BarProcessor) SyncSmfNbiCM() error {
log.Info("Starting SMF NBI CM synchronization")
// 从ne_info表获取SMF类型的网元
var smfNEs []common.NeInfo
err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "SMF").
Find(&smfNEs).Error
if err != nil {
log.Errorf("Failed to query SMF network elements: %v", err)
return err
}
log.Infof("Found %d SMF network elements", len(smfNEs))
// 当前时间戳
now := time.Now()
timestamp := now.Unix()
// 遍历每个SMF网元生成对应的NBI CM记录
for _, ne := range smfNEs {
adminState, operState := common.ParseStateFromStatus(ne.Status)
// 为每个网元生成ManagedElement记录
managedElement := smf.ManagedElement{
Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName,
VendorName: ne.VendorName,
ManagedBy: ne.Dn,
ManagementIpAddress: ne.Ip,
SwVersion: "",
PatchInfo: "-",
AdministrativeState: string(adminState),
OperationalState: string(operState),
}
// 序列化为JSON
meJSON, err := json.Marshal(managedElement)
if err != nil {
log.Errorf("Failed to marshal ManagedElement for SMF ne_id %s: %v", ne.NeId, err)
continue
}
// 插入ManagedElement记录
nbiCM := common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "ManagedElement",
ValueJson: string(meJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert SMF ManagedElement record: %v", err)
continue
}
// 为每个网元生成SmfFunction记录
smfFunction := smf.SmfFunction{
Id: "SF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-SmfFunction",
AdministrativeState: string(adminState),
OperationalState: string(operState),
VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId,
Fqdn: "smf-" + ne.NeId + ".5gc.3gpp.org",
SbiServiceList: "Nsmf_PDUSession,Nsmf_EventExposure",
MaxPduSessions: 1000000,
MaxQfi: 64,
UpfList: "[\"UPF-1\",\"UPF-2\"]",
}
// 序列化为JSON
smfJSON, err := json.Marshal(smfFunction)
if err != nil {
log.Errorf("Failed to marshal SmfFunction for ne_id %s: %v", ne.NeId, err)
continue
}
// 插入SmfFunction记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "SmfFunction",
ValueJson: string(smfJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert SmfFunction record: %v", err)
continue
}
// 创建 AddrPool 记录
addrPool := smf.AddrPool{
Id: "AP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-AddrPool",
AddrType: "UE",
IpVersion: "IPv4",
AddrSegList: "[\"10.60.0.0/16\",\"10.61.0.0/16\"]",
}
// 序列化为JSON
addrPoolJSON, err := json.Marshal(addrPool)
if err != nil {
log.Errorf("Failed to marshal AddrPool for ne_id %s: %v", ne.NeId, err)
} else {
// 插入AddrPool记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "AddrPool",
ValueJson: string(addrPoolJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert AddrPool record: %v", err)
}
}
// 创建 EpRpDynN7Smf 记录
epRpDynN7Smf := smf.EpRpDynN7Smf{
Id: "N7" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-N7",
LocIpAddrList: "[\"" + ne.Ip + "\"]",
FarIpSubnetworkList: "[\"10.17.0.0/16\"]",
}
// 序列化为JSON
n7JSON, err := json.Marshal(epRpDynN7Smf)
if err != nil {
log.Errorf("Failed to marshal EpRpDynN7Smf for ne_id %s: %v", ne.NeId, err)
} else {
// 插入EpRpDynN7Smf记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "EpRpDynN7Smf",
ValueJson: string(n7JSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert EpRpDynN7Smf record: %v", err)
}
}
// 创建 EpRpDynN10Smf 记录
epRpDynN10Smf := smf.EpRpDynN10Smf{
Id: "N10" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-N10",
LocIpAddrList: "[\"" + ne.Ip + "\"]",
FarIpSubnetworkList: "[\"10.20.0.0/16\"]",
}
// 序列化为JSON
n10JSON, err := json.Marshal(epRpDynN10Smf)
if err != nil {
log.Errorf("Failed to marshal EpRpDynN10Smf for ne_id %s: %v", ne.NeId, err)
} else {
// 插入EpRpDynN10Smf记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "EpRpDynN10Smf",
ValueJson: string(n10JSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert EpRpDynN10Smf record: %v", err)
}
}
// 创建 IPResource 记录
ipResource := smf.IPResource{
Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-IPResource",
InterfaceType: "{Mgt,N4,N7,N10,N11}",
LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip, ne.Ip),
LocIpV6AddrList: "{Default,Default,Default,Default,Default}",
}
// 序列化为JSON
ipJSON, err := json.Marshal(ipResource)
if err != nil {
log.Errorf("Failed to marshal IPResource for SMF ne_id %s: %v", ne.NeId, err)
} else {
// 插入IPResource记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "IPResource",
ValueJson: string(ipJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert SMF IPResource record: %v", err)
}
}
}
log.Info("SMF NBI CM synchronization completed")
return nil
}

View File

@@ -0,0 +1,221 @@
package nbiNRM
import (
"encoding/json"
"fmt"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/modules/crontask/processor/nbiNRM/common"
"be.ems/src/modules/crontask/processor/nbiNRM/udm"
)
// SyncUdmNbiCM 从ne_info获取UDM网元并同步数据到nbi_cm表
func (s *BarProcessor) SyncUdmNbiCM() error {
log.Info("Starting UDM NBI CM synchronization")
// 从ne_info表获取UDM类型的网元
var udmNEs []common.NeInfo
err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "UDM").
Find(&udmNEs).Error
if err != nil {
log.Errorf("Failed to query UDM network elements: %v", err)
return err
}
log.Infof("Found %d UDM network elements", len(udmNEs))
// 当前时间戳
now := time.Now()
timestamp := now.Unix()
// 遍历每个UDM网元生成对应的NBI CM记录
for _, ne := range udmNEs {
adminState, operState := common.ParseStateFromStatus(ne.Status)
// 为每个网元生成ManagedElement记录
managedElement := udm.ManagedElement{
Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName,
VendorName: ne.VendorName,
ManagedBy: ne.Dn,
ManagementIpAddress: ne.Ip,
SwVersion: "",
PatchInfo: "-",
AdministrativeState: string(adminState),
OperationalState: string(operState),
}
// 序列化为JSON
meJSON, err := json.Marshal(managedElement)
if err != nil {
log.Errorf("Failed to marshal ManagedElement for UDM ne_id %s: %v", ne.NeId, err)
continue
}
// 插入ManagedElement记录
nbiCM := common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "ManagedElement",
ValueJson: string(meJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UDM ManagedElement record: %v", err)
continue
}
// 为每个网元生成UdmFunction记录
udmFunction := udm.UdmFunction{
Id: "UF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-UdmFunction",
AdministrativeState: string(adminState),
OperationalState: string(operState),
VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId,
Fqdn: "udm-" + ne.NeId + ".5gc.3gpp.org",
SbiServiceList: "Nudm_UEAuthentication,Nudm_SubscriberDataManagement,Nudm_UEContextManagement",
}
// 序列化为JSON
udmJSON, err := json.Marshal(udmFunction)
if err != nil {
log.Errorf("Failed to marshal UdmFunction for ne_id %s: %v", ne.NeId, err)
continue
}
// 插入UdmFunction记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "UdmFunction",
ValueJson: string(udmJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UdmFunction record: %v", err)
continue
}
// 为每个网元生成UdrFunction记录
udrFunction := udm.UdrFunction{
Id: "UDR" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-UdrFunction",
AdministrativeState: string(adminState),
OperationalState: string(operState),
VnfInstanceId: "vnf-UDR-" + ne.NeId,
Fqdn: "udr-" + ne.NeId + ".5gc.3gpp.org",
SbiServiceList: "Nudr_DataRepository",
MaxNumSupi: 800000,
MaxNumMsisdn: 800000,
}
// 序列化为JSON
udrJSON, err := json.Marshal(udrFunction)
if err != nil {
log.Errorf("Failed to marshal UdrFunction for ne_id %s: %v", ne.NeId, err)
} else {
// 插入UdrFunction记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "UdrFunction",
ValueJson: string(udrJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UdrFunction record: %v", err)
}
}
// 为每个网元生成AusfFunction记录
ausfFunction := udm.AusfFunction{
Id: "AUSF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-AusfFunction",
AdministrativeState: string(adminState),
OperationalState: string(operState),
VnfInstanceId: "vnf-AUSF-" + ne.NeId,
Fqdn: "ausf-" + ne.NeId + ".5gc.3gpp.org",
SbiServiceList: "Nausf_UEAuthentication,Nausf_SoRProtection",
}
// 序列化为JSON
ausfJSON, err := json.Marshal(ausfFunction)
if err != nil {
log.Errorf("Failed to marshal AusfFunction for ne_id %s: %v", ne.NeId, err)
} else {
// 插入AusfFunction记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "AusfFunction",
ValueJson: string(ausfJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert AusfFunction record: %v", err)
}
}
// 创建 IPResource 记录
ipResource := udm.IPResource{
Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-IPResource",
InterfaceType: "{Mgt,N8,N10,N12,N21}",
LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip, ne.Ip),
LocIpV6AddrList: "{Default,Default,Default,DefaultDefault}",
}
// 序列化为JSON
ipJSON, err := json.Marshal(ipResource)
if err != nil {
log.Errorf("Failed to marshal IPResource for UDM ne_id %s: %v", ne.NeId, err)
} else {
// 插入IPResource记录
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "IPResource",
ValueJson: string(ipJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UDM IPResource record: %v", err)
}
}
}
log.Info("UDM NBI CM synchronization completed")
return nil
}

View File

@@ -0,0 +1,393 @@
package nbiNRM
import (
"encoding/json"
"fmt"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/modules/crontask/processor/nbiNRM/common"
"be.ems/src/modules/crontask/processor/nbiNRM/upf"
)
// SyncUpfNbiCM 从ne_info获取UPF网元并同步数据到nbi_cm表
func (s *BarProcessor) SyncUpfNbiCM() error {
log.Info("Starting UPF NBI CM synchronization")
// 从ne_info表获取UPF类型的网元
var upfNEs []common.NeInfo
err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "UPF").
Find(&upfNEs).Error
if err != nil {
log.Errorf("Failed to query UPF network elements: %v", err)
return err
}
log.Infof("Found %d UPF network elements", len(upfNEs))
// 当前时间戳
now := time.Now()
timestamp := now.Unix()
// 遍历每个UPF网元生成对应的NBI CM记录
for _, ne := range upfNEs {
adminState, operState := common.ParseStateFromStatus(ne.Status)
// 为每个网元生成ManagedElement记录
managedElement := upf.ManagedElement{
Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName,
VendorName: ne.VendorName,
ManagedBy: ne.Dn,
ManagementIpAddress: ne.Ip,
SwVersion: "",
PatchInfo: "-",
AdministrativeState: string(adminState),
OperationalState: string(operState),
}
// 序列化为JSON
meJSON, err := json.Marshal(managedElement)
if err != nil {
log.Errorf("Failed to marshal ManagedElement for UPF ne_id %s: %v", ne.NeId, err)
continue
}
// 插入ManagedElement记录
nbiCM := common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "ManagedElement",
ValueJson: string(meJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
// 插入到数据库
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UPF ManagedElement record: %v", err)
continue
}
// 生成 InventoryUnitRack 记录
inventoryUnitRack := upf.InventoryUnitRack{
Id: "RACK" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-Rack",
VendorUnitFamilyType: "5G-UPF",
VendorUnitTypeNumber: "UPF-" + ne.NeId,
VendorName: ne.VendorName,
SerialNumber: "SN-UPF-" + ne.NeId,
VersionNumber: "1.0.0",
DateOfManufacture: "2023-01-01",
DateOfLastService: now.Format("2006-01-02"),
ManufacturerData: "{}",
RackPosition: "1",
}
// 序列化为JSON
rackJSON, err := json.Marshal(inventoryUnitRack)
if err != nil {
log.Errorf("Failed to marshal InventoryUnitRack for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "InventoryUnitRack",
ValueJson: string(rackJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert InventoryUnitRack record: %v", err)
}
}
// 生成 InventoryUnitShelf 记录
inventoryUnitShelf := upf.InventoryUnitShelf{
Id: "SHELF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-Shelf",
VendorUnitFamilyType: "5G-UPF-SHELF",
VendorUnitTypeNumber: "UPF-SHELF-" + ne.NeId,
VendorName: ne.VendorName,
SerialNumber: "SN-UPF-SHELF-" + ne.NeId,
VersionNumber: "1.0.0",
DateOfManufacture: "2023-01-01",
DateOfLastService: now.Format("2006-01-02"),
ManufacturerData: "{}",
ShelfPosition: "1",
}
// 序列化为JSON
shelfJSON, err := json.Marshal(inventoryUnitShelf)
if err != nil {
log.Errorf("Failed to marshal InventoryUnitShelf for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "InventoryUnitShelf",
ValueJson: string(shelfJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert InventoryUnitShelf record: %v", err)
}
}
// 生成 InventoryUnitPack 记录
inventoryUnitPack := upf.InventoryUnitPack{
Id: "PACK" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-Pack",
VendorUnitFamilyType: "5G-UPF-PACK",
VendorUnitTypeNumber: "UPF-PACK-" + ne.NeId,
VendorName: ne.VendorName,
SerialNumber: "SN-UPF-PACK-" + ne.NeId,
VersionNumber: "1.0.0",
DateOfManufacture: "2023-01-01",
DateOfLastService: now.Format("2006-01-02"),
ManufacturerData: "{}",
SlotsOccupied: "1,2",
}
// 序列化为JSON
packJSON, err := json.Marshal(inventoryUnitPack)
if err != nil {
log.Errorf("Failed to marshal InventoryUnitPack for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "InventoryUnitPack",
ValueJson: string(packJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert InventoryUnitPack record: %v", err)
}
}
// 生成 InventoryUnitHost 记录
inventoryUnitHost := upf.InventoryUnitHost{
Id: "HOST" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-Host",
VendorUnitFamilyType: "5G-UPF-HOST",
VendorUnitTypeNumber: "UPF-HOST-" + ne.NeId,
VendorName: ne.VendorName,
SerialNumber: "SN-UPF-HOST-" + ne.NeId,
VersionNumber: "1.0.0",
DateOfManufacture: "2023-01-01",
DateOfLastService: now.Format("2006-01-02"),
ManufacturerData: "{}",
HostPosition: "1",
NumberOfCpu: "16",
MemSize: "64GB",
HardDiskSize: "1TB",
}
// 序列化为JSON
hostJSON, err := json.Marshal(inventoryUnitHost)
if err != nil {
log.Errorf("Failed to marshal InventoryUnitHost for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "InventoryUnitHost",
ValueJson: string(hostJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert InventoryUnitHost record: %v", err)
}
}
// 生成 InventoryUnitAccessory 记录
inventoryUnitAccessory := upf.InventoryUnitAccessory{
Id: "ACC" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-Accessory",
VendorUnitFamilyType: "5G-UPF-ACC",
VendorUnitTypeNumber: "UPF-ACC-" + ne.NeId,
VendorName: ne.VendorName,
SerialNumber: "SN-UPF-ACC-" + ne.NeId,
VersionNumber: "1.0.0",
DateOfManufacture: "2023-01-01",
DateOfLastService: now.Format("2006-01-02"),
ManufacturerData: "{}",
AccessoryPosition: "1",
AccessoryType: "FAN",
AddtionalInformation: "Cooling system",
}
// 序列化为JSON
accJSON, err := json.Marshal(inventoryUnitAccessory)
if err != nil {
log.Errorf("Failed to marshal InventoryUnitAccessory for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "InventoryUnitAccessory",
ValueJson: string(accJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert InventoryUnitAccessory record: %v", err)
}
}
// 生成 UpfFunction 记录
upfFunction := upf.UpfFunction{
Id: "UF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-UpfFunction",
AdministrativeState: string(adminState),
OperationalState: string(operState),
VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId,
MaxQosFlows: "1000000",
MaxThroughput: "10Gbps",
}
// 序列化为JSON
upfJSON, err := json.Marshal(upfFunction)
if err != nil {
log.Errorf("Failed to marshal UpfFunction for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "UpfFunction",
ValueJson: string(upfJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UpfFunction record: %v", err)
}
}
// 创建 EpRpDynN3Upf 记录
epRpDynN3Upf := upf.EpRpDynN3Upf{
Id: "N3" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-N3",
LocIpAddrList: "[\"" + ne.Ip + "\"]",
FarIpSubnetworkList: "[\"10.13.0.0/16\"]",
}
// 序列化为JSON
n3JSON, err := json.Marshal(epRpDynN3Upf)
if err != nil {
log.Errorf("Failed to marshal EpRpDynN3Upf for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "EpRpDynN3Upf",
ValueJson: string(n3JSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert EpRpDynN3Upf record: %v", err)
}
}
// 创建 EpRpDynN9Upf 记录
epRpDynN9Upf := upf.EpRpDynN9Upf{
Id: "N9" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-N9",
LocIpAddrList: "[\"" + ne.Ip + "\"]",
FarIpSubnetworkList: "[\"10.19.0.0/16\"]",
}
// 序列化为JSON
n9JSON, err := json.Marshal(epRpDynN9Upf)
if err != nil {
log.Errorf("Failed to marshal EpRpDynN9Upf for ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "EpRpDynN9Upf",
ValueJson: string(n9JSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert EpRpDynN9Upf record: %v", err)
}
}
// 创建 IPResource 记录
ipResource := upf.IPResource{
Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp),
UserLabel: ne.NeName + "-IPResource",
InterfaceType: "{Mgt,N3,N4,N9}",
LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip),
LocIpV6AddrList: "{Default,Default,Default,Default}",
}
// 序列化为JSON
ipJSON, err := json.Marshal(ipResource)
if err != nil {
log.Errorf("Failed to marshal IPResource for UPF ne_id %s: %v", ne.NeId, err)
} else {
nbiCM = common.NbiCm{
NeType: ne.NeType,
NeId: ne.NeId,
CmVersion: common.CmVersion,
RmUid: ne.RmUid,
EventType: common.EventTypeInt("ObjectCreationEvent"),
ObjectType: "IPResource",
ValueJson: string(ipJSON),
Timestamp: now.Format("2006-01-02 15:04:05"),
}
err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error
if err != nil {
log.Errorf("Failed to insert UPF IPResource record: %v", err)
}
}
}
log.Info("UPF NBI CM synchronization completed")
return nil
}