diff --git a/database/install/sys_job.sql b/database/install/sys_job.sql index 218c2425..c8505380 100644 --- a/database/install/sys_job.sql +++ b/database/install/sys_job.sql @@ -44,5 +44,6 @@ INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', ' INSERT INTO `sys_job` VALUES (14, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smsc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceType\')) as service_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.result\')) as result,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.updateTime\')) as update_time\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); INSERT INTO `sys_job` VALUES (15, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1728634085631, ''); INSERT INTO `sys_job` VALUES (16, 'job.exportSGWCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_sgwc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as recordType,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.accessPointNameNI\')) as accessPointNameNI,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedIMSI\')) as IMSI,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedMSISDN\')) as MSISDN,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedPDPPDNAddress\')) as PdpAddress,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) as duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordOpeningTime\')) as recordOpeningTime,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) as chargingID,JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfTrafficVolumes[0].dataVolumeGPRSDownlink\')) AS dataVolumeGPRSDownlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfTrafficVolumes[0].dataVolumeGPRsUplink\')) as dataVolumeGPRsUplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.userLocationInformation.tai.tac\')) as tac,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.userLocationInformation.ecgi.eutraCellId\')) as cellID\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/sgwc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); +INSERT INTO `sys_job` VALUES (17, 'job.nbiNRM', 'SYSTEM', 'nbiNRM', '{}', '0 10 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/src/modules/crontask/processor/nbiNRM/amf/schema.go b/src/modules/crontask/processor/nbiNRM/amf/schema.go index 203d5924..39b95d1f 100644 --- a/src/modules/crontask/processor/nbiNRM/amf/schema.go +++ b/src/modules/crontask/processor/nbiNRM/amf/schema.go @@ -1,30 +1,36 @@ package amf +import "be.ems/src/modules/crontask/processor/nbiNRM/common" + +const ( + AMF string = "AMF" // 网元类型 +) + type ManagedElement struct { - Id string `json:"id"` - UserLabel string `json:"userLabel"` - VendorName string `json:"vendorName"` - ManagedBy string `json:"managedBy"` - ManagementIpAddress string `json:"managementIpAddress"` - SwVersion string `json:"swVersion"` - PatchInfo string `json:"patchInfo"` - AdministrativeState string `json:"administrativeState"` - OperationalState string `json:"operationalState"` + Id string `json:"id"` + UserLabel string `json:"userLabel"` + VendorName string `json:"vendorName"` + ManagedBy string `json:"managedBy"` + ManagementIpAddress string `json:"managementIpAddress"` + SwVersion string `json:"swVersion"` + PatchInfo string `json:"patchInfo"` + AdministrativeState common.AdministrativeState `json:"administrativeState"` + OperationalState common.OperationalState `json:"operationalState"` } type AmfFunction struct { - Id string `json:"id"` - UserLabel string `json:"userLabel"` - AdministrativeState string `json:"administrativeState"` - OperationalState string `json:"operationalState"` - VnfInstanceId string `json:"vnfInstanceId"` - Fqdn string `json:"fqdn"` - SbiServiceList string `json:"sbiServiceList"` - AmfGuamiList string `json:"amfGuamiList"` - SnssaiList string `json:"snssaiList"` - MaxUser int `json:"maxUser"` - RelativeCapacity int `json:"relativeCapacity"` - MaxGnbNum int `json:"maxGnbNum"` + Id string `json:"id"` + UserLabel string `json:"userLabel"` + AdministrativeState common.AdministrativeState `json:"administrativeState"` + OperationalState common.OperationalState `json:"operationalState"` + VnfInstanceId string `json:"vnfInstanceId"` + Fqdn string `json:"fqdn"` + SbiServiceList string `json:"sbiServiceList"` + AmfGuamiList string `json:"amfGuamiList"` + SnssaiList string `json:"snssaiList"` + MaxUser int `json:"maxUser"` + RelativeCapacity int `json:"relativeCapacity"` + MaxGnbNum int `json:"maxGnbNum"` } type EpRpDynN8Amf struct { diff --git a/src/modules/crontask/processor/nbiNRM/common/common.go b/src/modules/crontask/processor/nbiNRM/common/common.go new file mode 100644 index 00000000..9c2cf789 --- /dev/null +++ b/src/modules/crontask/processor/nbiNRM/common/common.go @@ -0,0 +1,138 @@ +package common + +import ( + "fmt" + "strconv" +) + +type AdministrativeState string + +const ( + Locked AdministrativeState = "Locked" + Unlocked AdministrativeState = "Unlocked" + ShuttingDown AdministrativeState = "ShuttingDown" +) + +type OperationalState string + +const ( + Enabled OperationalState = "Enabled" + Disabled OperationalState = "Disabled" +) + +const ( + // NBI CM表名 + NbiCmTableName = "nbi_cm" + CmVersion = "v1" // CM版本 +) + +type EventType int + +const ( + ObjectNullEvent EventType = iota // ObjectNullEvent 空事件 + ObjectCreationEvent // ObjectCreationEvent 创建事件 + ObjectDeletionEvent // ObjectDeletionEvent 删除事件 + ObjectAttributeValueChangeEvent // ObjectAttributeValueChangeEvent 修改事件 + ObjectUnkownEvent // 未知事件 +) + +func (et EventType) EventTypeEnumString() string { + switch et { + case ObjectCreationEvent: + return "ObjectCreationEvent" + case ObjectDeletionEvent: + return "ObjectDeletionEvent" + case ObjectAttributeValueChangeEvent: + return "ObjectAttributeValueChangeEvent" + case ObjectNullEvent: + return "ObjectNullEvent" + default: + return "ObjectUnkownEvent" + } +} + +func (et EventType) EventTypeIntString() string { + return fmt.Sprintf("%d", et) +} + +// EventTypeInt 将字符串转换为 EventType 枚举类型 +func EventTypeInt(s string) EventType { + if i, err := strconv.Atoi(s); err == nil { + return EventType(i) + } + + switch s { + case "ObjectCreationEvent": + return ObjectCreationEvent + case "ObjectDeletionEvent": + return ObjectDeletionEvent + case "ObjectAttributeValueChangeEvent": + return ObjectAttributeValueChangeEvent + case "": + // 如果字符串为空,则返回未知事件 + return ObjectNullEvent + default: + return ObjectUnkownEvent + } +} + +type NeStatus int + +const ( + NeStatusOffline NeStatus = iota + NeStatusActive + NeStatusToSync + NeStatusStandby + NeStatusUnknown +) + +// ParseStateFromStatus 将状态字符串转换为 AdministrativeState 和 OperationalState +func ParseStateFromStatus(status NeStatus) (AdministrativeState, OperationalState) { + var adminState AdministrativeState + var operState OperationalState + + switch status { + case NeStatusOffline: + adminState = Locked + operState = Disabled + case NeStatusActive: + adminState = Unlocked + operState = Enabled + case NeStatusToSync: + adminState = Unlocked + operState = Enabled + case NeStatusStandby: + adminState = Locked + operState = Enabled + default: + adminState = ShuttingDown + operState = Disabled + } + + return adminState, operState +} + +type NeInfo struct { + NeId string `db:"ne_id"` + NeType string `db:"ne_type"` + RmUid string `db:"rm_uid"` + VendorName string `db:"vendor_name"` + NeName string `db:"ne_name"` + Ip string `db:"ip"` + Port string `db:"port"` + Dn string `db:"dn"` + Status NeStatus `db:"status"` +} + +// NbiCm 表结构 +type NbiCm struct { + Id int `json:"id" db:"id PRIMARY KEY"` + NeType string `json:"neType" db:"ne_type"` + NeId string `json:"neId" db:"ne_id"` + CmVersion string `json:"cmVersion" db:"cm_version"` + RmUid string `json:"rmUid" db:"rm_uid"` + EventType EventType `json:"eventType" db:"event_type"` + ObjectType string `json:"objectType" db:"object_type"` + ValueJson string `json:"valueJson" db:"value_json"` + Timestamp string `json:"timestamp" db:"timestamp"` +} diff --git a/src/modules/crontask/processor/nbiNRM/nbiNRM.go b/src/modules/crontask/processor/nbiNRM/nbiNRM.go index e948fab8..e3a64c06 100644 --- a/src/modules/crontask/processor/nbiNRM/nbiNRM.go +++ b/src/modules/crontask/processor/nbiNRM/nbiNRM.go @@ -2,80 +2,11 @@ package nbiNRM import ( "encoding/json" - "fmt" - "strconv" - "strings" "be.ems/lib/log" "be.ems/src/framework/cron" ) -const ( - // NBI CM表名 - NbiCmTableName = "nbi_cm" - CmVersion = "v1" // CM版本 -) - -type EventType int - -const ( - ObjectCreationEvent EventType = iota // ObjectCreationEvent 创建事件 - ObjectDeletionEvent // ObjectDeletionEvent 删除事件 - ObjectAttributeValueChangeEvent // ObjectAttributeValueChangeEvent 修改事件 - ObjectUnkownEvent // 未知事件 -) - -func (et EventType) EventTypeEnumString() string { - switch et { - case ObjectCreationEvent: - return "ObjectCreationEvent" - case ObjectDeletionEvent: - return "ObjectDeletionEvent" - case ObjectAttributeValueChangeEvent: - return "ObjectAttributeValueChangeEvent" - default: - return "ObjectUnkownEvent" - } -} - -func (et EventType) EventTypeIntString() string { - return fmt.Sprintf("%d", et) -} - -// ParseCallTag 将字符串转换为 CallTag 枚举类型 -func EventTypeInt(s string) EventType { - if i, err := strconv.Atoi(s); err == nil { - return EventType(i) - } - // 如果转换失败,则按名称匹配(忽略大小写) - switch strings.ToLower(s) { - case "ObjectCreationEvent": - return ObjectCreationEvent - case "ObjectDeletionEvent": - return ObjectDeletionEvent - case "ObjectAttributeValueChangeEvent": - return ObjectAttributeValueChangeEvent - case "": - // 如果字符串为空,则返回未知事件 - return ObjectUnkownEvent - default: - return ObjectUnkownEvent - } -} - -// NbiCm 表结构 -type NbiCm struct { - Id string `json:"id" db:"id PRIMARY KEY"` - NeType string `json:"neType" db:"ne_type"` - NeId string `json:"neId" db:"ne_id"` - CmVersion string `json:"cmVersion" db:"cm_version"` - RmUid string `json:"rmUid" db:"rm_uid"` - EventType EventType `json:"eventType" db:"event_type"` - ObjectType string `json:"objectType" db:"object_type"` - ValueJson string `json:"valueJson" db:"value_json"` - Timestamp string `json:"timestamp" db:"timestamp"` -} - var NewProcessor = &BarProcessor{ progress: 0, count: 0, @@ -116,10 +47,29 @@ func (s *BarProcessor) Execute(data any) (any, error) { log.Errorf("SyncAmfNbiCM error: %v", err) return nil, err } + err = s.SyncPcfNbiCM() + if err != nil { + log.Errorf("SyncPcfNbiCM error: %v", err) + return nil, err + } + err = s.SyncUdmNbiCM() + if err != nil { + log.Errorf("SyncUdmNbiCM error: %v", err) + return nil, err + } + err = s.SyncSmfNbiCM() + if err != nil { + log.Errorf("SyncSmfNbiCM error: %v", err) + return nil, err + } + err = s.SyncUpfNbiCM() + if err != nil { + log.Errorf("SyncUpfNbiCM error: %v", err) + return nil, err + } // 返回结果,用于记录执行结果 return map[string]any{ - "msg": "sucess", - "affected": 1, + "msg": "sucess", }, nil } diff --git a/src/modules/crontask/processor/nbiNRM/pcf/schema.go b/src/modules/crontask/processor/nbiNRM/pcf/schema.go index a85536aa..3fb5391e 100644 --- a/src/modules/crontask/processor/nbiNRM/pcf/schema.go +++ b/src/modules/crontask/processor/nbiNRM/pcf/schema.go @@ -1,33 +1,39 @@ package pcf +import "be.ems/src/modules/crontask/processor/nbiNRM/common" + +const ( + PCF string = "PCF" // 网元类型 +) + type ManagedElement struct { - Id string `json:"id"` - UserLabel string `json:"userLabel"` - VendorName string `json:"vendorName"` - ManagedBy string `json:"managedBy"` - ManagementIpAddress string `json:"managementIpAddress"` - SwVersion string `json:"swVersion"` - PatchInfo string `json:"patchInfo"` - AdministrativeState string `json:"administrativeState"` - OperationalState string `json:"operationalState"` + Id string `json:"id"` + UserLabel string `json:"userLabel"` + VendorName string `json:"vendorName"` + ManagedBy string `json:"managedBy"` + ManagementIpAddress string `json:"managementIpAddress"` + SwVersion string `json:"swVersion"` + PatchInfo string `json:"patchInfo"` + AdministrativeState common.AdministrativeState `json:"administrativeState"` + OperationalState common.OperationalState `json:"operationalState"` } type PcfFunction struct { - Id string `json:"id"` - UserLabel string `json:"userLabel"` - AdministrativeState string `json:"administrativeState"` - OperationalState string `json:"operationalState"` - VnfInstanceId string `json:"vnfInstanceId"` - Fqdn string `json:"fqdn"` - SbiServiceList string `json:"sbiServiceList"` + Id string `json:"id"` + UserLabel string `json:"userLabel"` + AdministrativeState common.AdministrativeState `json:"administrativeState"` + OperationalState common.OperationalState `json:"operationalState"` + VnfInstanceId string `json:"vnfInstanceId"` + Fqdn string `json:"fqdn"` + SbiServiceList string `json:"sbiServiceList"` } type UdrFunction struct { - Id string `json:"id"` - UserLabel string `json:"userLabel"` - AdministrativeState string `json:"administrativeState"` - OperationalState string `json:"operationalState"` - MaxSubNbr int `json:"maxSubNbr"` + Id string `json:"id"` + UserLabel string `json:"userLabel"` + AdministrativeState common.AdministrativeState `json:"administrativeState"` + OperationalState common.OperationalState `json:"operationalState"` + MaxSubNbr int `json:"maxSubNbr"` } type IPResource struct { diff --git a/src/modules/crontask/processor/nbiNRM/syncAmf.go b/src/modules/crontask/processor/nbiNRM/syncAmf.go index 110ad84c..13622198 100644 --- a/src/modules/crontask/processor/nbiNRM/syncAmf.go +++ b/src/modules/crontask/processor/nbiNRM/syncAmf.go @@ -9,6 +9,7 @@ import ( "be.ems/lib/log" "be.ems/src/modules/crontask/processor/nbiNRM/amf" + "be.ems/src/modules/crontask/processor/nbiNRM/common" ) // SyncAmfNbiCM 从ne_info获取AMF网元并同步数据到nbi_cm表 @@ -16,16 +17,9 @@ func (s *BarProcessor) SyncAmfNbiCM() error { log.Info("Starting AMF NBI CM synchronization") // 从ne_info表获取AMF类型的网元 - var amfNEs []struct { - NeID string `db:"ne_id"` - NeType string `db:"ne_type"` - RmUid string `db:"rm_uid"` - } - - err := dborm.DefaultDB().Table("ne_info"). - Where("ne_type = ? AND status = 1", "AMF"). + var amfNEs []common.NeInfo + err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "AMF"). Find(&amfNEs).Error - if err != nil { log.Errorf("Failed to query AMF network elements: %v", err) return err @@ -39,39 +33,42 @@ func (s *BarProcessor) SyncAmfNbiCM() error { // 遍历每个AMF网元,生成对应的NBI CM记录 for _, ne := range amfNEs { + adminState, operState := common.ParseStateFromStatus(ne.Status) + // 为每个网元生成ManagedElement记录 managedElement := amf.ManagedElement{ - Id: "ManagedElement=" + ne.NeID, - UserLabel: "AMF-" + ne.NeID, - VendorName: "Vendor", - ManagedBy: "OMC", - ManagementIpAddress: "", // 可以从其他表获取IP地址 - SwVersion: "", // 可以从其他表获取软件版本 - AdministrativeState: "UNLOCKED", - OperationalState: "ENABLED", + Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), // 生成唯一ID + UserLabel: ne.NeName, + VendorName: ne.VendorName, + ManagedBy: ne.Dn, + ManagementIpAddress: ne.Ip, // 可以从其他表获取IP地址 + SwVersion: "", // 可以从其他表获取软件版本 + PatchInfo: "-", + AdministrativeState: adminState, + OperationalState: operState, } // 序列化为JSON meJSON, err := json.Marshal(managedElement) if err != nil { - log.Errorf("Failed to marshal ManagedElement for ne_id %s: %v", ne.NeID, err) + log.Errorf("Failed to marshal ManagedElement for ne_id %s: %v", ne.NeId, err) continue } // 生成唯一ID - meID := fmt.Sprintf("nbi-cm-%s-me-%d", ne.NeID, timestamp) + // meID := fmt.Sprintf("nbi-cm-%s-me-%d", ne.NeID, timestamp) // 插入ManagedElement记录 - nbiCM := NbiCm{ - Id: meID, + nbiCM := common.NbiCm{ + // Id: meID, NeType: ne.NeType, - NeId: ne.NeID, - CmVersion: CmVersion, + NeId: ne.NeId, + CmVersion: common.CmVersion, RmUid: ne.RmUid, - EventType: EventTypeInt("ObjectCreationEvent"), + EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "ManagedElement", ValueJson: string(meJSON), - Timestamp: fmt.Sprintf("%d", timestamp), + Timestamp: now.Format("2006-01-02 15:04:05"), } // 插入到数据库 @@ -83,14 +80,14 @@ func (s *BarProcessor) SyncAmfNbiCM() error { // 为每个网元生成AmfFunction记录 amfFunction := amf.AmfFunction{ - Id: "ManagedElement=" + ne.NeID + ",AmfFunction=1", - UserLabel: "AMF-" + ne.NeID + "-Func", - AdministrativeState: "UNLOCKED", - OperationalState: "ENABLED", - VnfInstanceId: "vnf-" + ne.NeID, - Fqdn: "amf-" + ne.NeID + ".example.com", + Id: "AF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), // 生成唯一ID + UserLabel: ne.NeName + "-AmfFunction", + AdministrativeState: adminState, + OperationalState: operState, + VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId, + Fqdn: "amf-" + ne.NeId + ".5gc.3gpp.org", SbiServiceList: "Namf_Communication,Namf_EventExposure,Namf_MT,Namf_Location", - AmfGuamiList: "[{\"mcc\":\"460\",\"mnc\":\"01\",\"amfId\":\"" + ne.NeID + "\"}]", + AmfGuamiList: "[{\"mcc\":\"460\",\"mnc\":\"01\",\"amfId\":\"" + ne.NeId + "\"}]", SnssaiList: "[{\"sst\":1,\"sd\":\"010203\"}]", MaxUser: 1000000, RelativeCapacity: 30, @@ -100,24 +97,24 @@ func (s *BarProcessor) SyncAmfNbiCM() error { // 序列化为JSON amfJSON, err := json.Marshal(amfFunction) if err != nil { - log.Errorf("Failed to marshal AmfFunction for ne_id %s: %v", ne.NeID, err) + log.Errorf("Failed to marshal AmfFunction for ne_id %s: %v", ne.NeId, err) continue } // 生成唯一ID - amfID := fmt.Sprintf("nbi-cm-%s-amf-%d", ne.NeID, timestamp) + // amfID := fmt.Sprintf("nbi-cm-%s-amf-%d", ne.NeID, timestamp) // 插入AmfFunction记录 - nbiCM = NbiCm{ - Id: amfID, + nbiCM = common.NbiCm{ + // Id: amfID, NeType: ne.NeType, - NeId: ne.NeID, - CmVersion: "1.0", - RmUid: "OMC-SYSTEM", - EventType: EventTypeInt("ObjectCreationEvent"), + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "AmfFunction", ValueJson: string(amfJSON), - Timestamp: fmt.Sprintf("%d", timestamp), + Timestamp: now.Format("2006-01-02 15:04:05"), } // 插入到数据库 @@ -127,8 +124,136 @@ func (s *BarProcessor) SyncAmfNbiCM() error { continue } - // 可以继续添加其他AMF相关对象,如EpRpDynN8Amf、IPResource等 - // 这里仅展示了基本结构,可根据实际需求扩展 + // 在 AmfFunction 记录创建完成后添加以下代码 + + // 创建 EpRpDynN8Amf 记录 + epRpDynN8Amf := amf.EpRpDynN8Amf{ + Id: "N8" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-N8", + LocIpAddrList: "[\"" + ne.Ip + "\"]", + FarIpSubnetworkList: "[\"10.10.0.0/16\"]", + } + + // 序列化为JSON + n8JSON, err := json.Marshal(epRpDynN8Amf) + if err != nil { + log.Errorf("Failed to marshal EpRpDynN8Amf for ne_id %s: %v", ne.NeId, err) + } else { + // 插入EpRpDynN8Amf记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "EpRpDynN8Amf", + ValueJson: string(n8JSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert EpRpDynN8Amf record: %v", err) + } + } + + // 创建 EpRpDynN11Amf 记录 + epRpDynN11Amf := amf.EpRpDynN11Amf{ + Id: "N11" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-N11", + LocIpAddrList: "[\"" + ne.Ip + "\"]", + FarIpSubnetworkList: "[\"10.11.0.0/16\"]", + } + + // 序列化为JSON + n11JSON, err := json.Marshal(epRpDynN11Amf) + if err != nil { + log.Errorf("Failed to marshal EpRpDynN11Amf for ne_id %s: %v", ne.NeId, err) + } else { + // 插入EpRpDynN11Amf记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "EpRpDynN11Amf", + ValueJson: string(n11JSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert EpRpDynN11Amf record: %v", err) + } + } + + // 创建 EpRpDynN12Amf 记录 + epRpDynN12Amf := amf.EpRpDynN12Amf{ + Id: "N12" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-N12", + LocIpAddrList: "[\"" + ne.Ip + "\"]", + FarIpSubnetworkList: "[\"10.12.0.0/16\"]", + } + + // 序列化为JSON + n12JSON, err := json.Marshal(epRpDynN12Amf) + if err != nil { + log.Errorf("Failed to marshal EpRpDynN12Amf for ne_id %s: %v", ne.NeId, err) + } else { + // 插入EpRpDynN12Amf记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "EpRpDynN12Amf", + ValueJson: string(n12JSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert EpRpDynN12Amf record: %v", err) + } + } + + // 创建 IPResource 记录 + ipResource := amf.IPResource{ + Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-IPResource", + InterfaceType: "{Mgt,N8,N11,N12}", + LocIpV4AddrList: fmt.Sprintf("{%s,Default,Default,Default}", ne.Ip), + LocIpV6AddrList: "{Default,Default,Default,Default}", + } + + // 序列化为JSON + ipJSON, err := json.Marshal(ipResource) + if err != nil { + log.Errorf("Failed to marshal IPResource for ne_id %s: %v", ne.NeId, err) + } else { + // 插入IPResource记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "IPResource", + ValueJson: string(ipJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert IPResource record: %v", err) + } + } } log.Info("AMF NBI CM synchronization completed") diff --git a/src/modules/crontask/processor/nbiNRM/syncPcf.go b/src/modules/crontask/processor/nbiNRM/syncPcf.go new file mode 100644 index 00000000..cdfd6b1a --- /dev/null +++ b/src/modules/crontask/processor/nbiNRM/syncPcf.go @@ -0,0 +1,182 @@ +package nbiNRM + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/lib/dborm" + "be.ems/lib/log" + "be.ems/src/modules/crontask/processor/nbiNRM/common" + "be.ems/src/modules/crontask/processor/nbiNRM/pcf" +) + +// SyncPcfNbiCM 从ne_info获取PCF网元并同步数据到nbi_cm表 +func (s *BarProcessor) SyncPcfNbiCM() error { + log.Info("Starting PCF NBI CM synchronization") + + // 从ne_info表获取PCF类型的网元 + var pcfNEs []common.NeInfo + err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "PCF"). + Find(&pcfNEs).Error + if err != nil { + log.Errorf("Failed to query PCF network elements: %v", err) + return err + } + + log.Infof("Found %d PCF network elements", len(pcfNEs)) + + // 当前时间戳 + now := time.Now() + timestamp := now.Unix() + + // 遍历每个PCF网元,生成对应的NBI CM记录 + for _, ne := range pcfNEs { + adminState, operState := common.ParseStateFromStatus(ne.Status) + + // 为每个网元生成ManagedElement记录 + managedElement := pcf.ManagedElement{ + Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName, + VendorName: ne.VendorName, + ManagedBy: ne.Dn, + ManagementIpAddress: ne.Ip, + SwVersion: "", + PatchInfo: "-", + AdministrativeState: adminState, + OperationalState: operState, + } + + // 序列化为JSON + meJSON, err := json.Marshal(managedElement) + if err != nil { + log.Errorf("Failed to marshal ManagedElement for PCF ne_id %s: %v", ne.NeId, err) + continue + } + + // 插入ManagedElement记录 + nbiCM := common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "ManagedElement", + ValueJson: string(meJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert PCF ManagedElement record: %v", err) + continue + } + + // 为每个网元生成PcfFunction记录 + pcfFunction := pcf.PcfFunction{ + Id: "PF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-PcfFunction", + AdministrativeState: adminState, + OperationalState: operState, + VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId, + Fqdn: "pcf-" + ne.NeId + ".5gc.3gpp.org", + SbiServiceList: "Npcf_AMPolicyControl,Npcf_PolicyAuthorization,Npcf_SMPolicyControl,Npcf_BDTPolicyControl", + } + + // 序列化为JSON + pcfJSON, err := json.Marshal(pcfFunction) + if err != nil { + log.Errorf("Failed to marshal PcfFunction for ne_id %s: %v", ne.NeId, err) + continue + } + + // 插入PcfFunction记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "PcfFunction", + ValueJson: string(pcfJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert PcfFunction record: %v", err) + continue + } + + // 为每个网元生成UdrFunction记录 + udrFunction := pcf.UdrFunction{ + Id: "UF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-UdrFunction", + AdministrativeState: adminState, + OperationalState: operState, + MaxSubNbr: 500000, + } + + // 序列化为JSON + udrJSON, err := json.Marshal(udrFunction) + if err != nil { + log.Errorf("Failed to marshal UdrFunction for ne_id %s: %v", ne.NeId, err) + } else { + // 插入UdrFunction记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "UdrFunction", + ValueJson: string(udrJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UdrFunction record: %v", err) + } + } + + // 创建 IPResource 记录 + ipResource := pcf.IPResource{ + Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-IPResource", + InterfaceType: "{Mgt,N5,N7,N15}", + LocIpV4AddrList: fmt.Sprintf("{%s,Default,Default,Default}", ne.Ip), + LocIpV6AddrList: "{Default,Default,Default,Default}", + } + + // 序列化为JSON + ipJSON, err := json.Marshal(ipResource) + if err != nil { + log.Errorf("Failed to marshal IPResource for PCF ne_id %s: %v", ne.NeId, err) + } else { + // 插入IPResource记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "IPResource", + ValueJson: string(ipJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert PCF IPResource record: %v", err) + } + } + } + + log.Info("PCF NBI CM synchronization completed") + return nil +} diff --git a/src/modules/crontask/processor/nbiNRM/syncSmf.go b/src/modules/crontask/processor/nbiNRM/syncSmf.go new file mode 100644 index 00000000..06415118 --- /dev/null +++ b/src/modules/crontask/processor/nbiNRM/syncSmf.go @@ -0,0 +1,249 @@ +package nbiNRM + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/lib/dborm" + "be.ems/lib/log" + "be.ems/src/modules/crontask/processor/nbiNRM/common" + "be.ems/src/modules/crontask/processor/nbiNRM/smf" +) + +// SyncSmfNbiCM 从ne_info获取SMF网元并同步数据到nbi_cm表 +func (s *BarProcessor) SyncSmfNbiCM() error { + log.Info("Starting SMF NBI CM synchronization") + + // 从ne_info表获取SMF类型的网元 + var smfNEs []common.NeInfo + err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "SMF"). + Find(&smfNEs).Error + if err != nil { + log.Errorf("Failed to query SMF network elements: %v", err) + return err + } + + log.Infof("Found %d SMF network elements", len(smfNEs)) + + // 当前时间戳 + now := time.Now() + timestamp := now.Unix() + + // 遍历每个SMF网元,生成对应的NBI CM记录 + for _, ne := range smfNEs { + adminState, operState := common.ParseStateFromStatus(ne.Status) + + // 为每个网元生成ManagedElement记录 + managedElement := smf.ManagedElement{ + Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName, + VendorName: ne.VendorName, + ManagedBy: ne.Dn, + ManagementIpAddress: ne.Ip, + SwVersion: "", + PatchInfo: "-", + AdministrativeState: string(adminState), + OperationalState: string(operState), + } + + // 序列化为JSON + meJSON, err := json.Marshal(managedElement) + if err != nil { + log.Errorf("Failed to marshal ManagedElement for SMF ne_id %s: %v", ne.NeId, err) + continue + } + + // 插入ManagedElement记录 + nbiCM := common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "ManagedElement", + ValueJson: string(meJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert SMF ManagedElement record: %v", err) + continue + } + + // 为每个网元生成SmfFunction记录 + smfFunction := smf.SmfFunction{ + Id: "SF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-SmfFunction", + AdministrativeState: string(adminState), + OperationalState: string(operState), + VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId, + Fqdn: "smf-" + ne.NeId + ".5gc.3gpp.org", + SbiServiceList: "Nsmf_PDUSession,Nsmf_EventExposure", + MaxPduSessions: 1000000, + MaxQfi: 64, + UpfList: "[\"UPF-1\",\"UPF-2\"]", + } + + // 序列化为JSON + smfJSON, err := json.Marshal(smfFunction) + if err != nil { + log.Errorf("Failed to marshal SmfFunction for ne_id %s: %v", ne.NeId, err) + continue + } + + // 插入SmfFunction记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "SmfFunction", + ValueJson: string(smfJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert SmfFunction record: %v", err) + continue + } + + // 创建 AddrPool 记录 + addrPool := smf.AddrPool{ + Id: "AP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-AddrPool", + AddrType: "UE", + IpVersion: "IPv4", + AddrSegList: "[\"10.60.0.0/16\",\"10.61.0.0/16\"]", + } + + // 序列化为JSON + addrPoolJSON, err := json.Marshal(addrPool) + if err != nil { + log.Errorf("Failed to marshal AddrPool for ne_id %s: %v", ne.NeId, err) + } else { + // 插入AddrPool记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "AddrPool", + ValueJson: string(addrPoolJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert AddrPool record: %v", err) + } + } + + // 创建 EpRpDynN7Smf 记录 + epRpDynN7Smf := smf.EpRpDynN7Smf{ + Id: "N7" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-N7", + LocIpAddrList: "[\"" + ne.Ip + "\"]", + FarIpSubnetworkList: "[\"10.17.0.0/16\"]", + } + + // 序列化为JSON + n7JSON, err := json.Marshal(epRpDynN7Smf) + if err != nil { + log.Errorf("Failed to marshal EpRpDynN7Smf for ne_id %s: %v", ne.NeId, err) + } else { + // 插入EpRpDynN7Smf记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "EpRpDynN7Smf", + ValueJson: string(n7JSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert EpRpDynN7Smf record: %v", err) + } + } + + // 创建 EpRpDynN10Smf 记录 + epRpDynN10Smf := smf.EpRpDynN10Smf{ + Id: "N10" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-N10", + LocIpAddrList: "[\"" + ne.Ip + "\"]", + FarIpSubnetworkList: "[\"10.20.0.0/16\"]", + } + + // 序列化为JSON + n10JSON, err := json.Marshal(epRpDynN10Smf) + if err != nil { + log.Errorf("Failed to marshal EpRpDynN10Smf for ne_id %s: %v", ne.NeId, err) + } else { + // 插入EpRpDynN10Smf记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "EpRpDynN10Smf", + ValueJson: string(n10JSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert EpRpDynN10Smf record: %v", err) + } + } + + // 创建 IPResource 记录 + ipResource := smf.IPResource{ + Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-IPResource", + InterfaceType: "{Mgt,N4,N7,N10,N11}", + LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip, ne.Ip), + LocIpV6AddrList: "{Default,Default,Default,Default,Default}", + } + + // 序列化为JSON + ipJSON, err := json.Marshal(ipResource) + if err != nil { + log.Errorf("Failed to marshal IPResource for SMF ne_id %s: %v", ne.NeId, err) + } else { + // 插入IPResource记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "IPResource", + ValueJson: string(ipJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert SMF IPResource record: %v", err) + } + } + } + + log.Info("SMF NBI CM synchronization completed") + return nil +} diff --git a/src/modules/crontask/processor/nbiNRM/syncUdm.go b/src/modules/crontask/processor/nbiNRM/syncUdm.go new file mode 100644 index 00000000..5ea8706b --- /dev/null +++ b/src/modules/crontask/processor/nbiNRM/syncUdm.go @@ -0,0 +1,221 @@ +package nbiNRM + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/lib/dborm" + "be.ems/lib/log" + "be.ems/src/modules/crontask/processor/nbiNRM/common" + "be.ems/src/modules/crontask/processor/nbiNRM/udm" +) + +// SyncUdmNbiCM 从ne_info获取UDM网元并同步数据到nbi_cm表 +func (s *BarProcessor) SyncUdmNbiCM() error { + log.Info("Starting UDM NBI CM synchronization") + + // 从ne_info表获取UDM类型的网元 + var udmNEs []common.NeInfo + err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "UDM"). + Find(&udmNEs).Error + if err != nil { + log.Errorf("Failed to query UDM network elements: %v", err) + return err + } + + log.Infof("Found %d UDM network elements", len(udmNEs)) + + // 当前时间戳 + now := time.Now() + timestamp := now.Unix() + + // 遍历每个UDM网元,生成对应的NBI CM记录 + for _, ne := range udmNEs { + adminState, operState := common.ParseStateFromStatus(ne.Status) + + // 为每个网元生成ManagedElement记录 + managedElement := udm.ManagedElement{ + Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName, + VendorName: ne.VendorName, + ManagedBy: ne.Dn, + ManagementIpAddress: ne.Ip, + SwVersion: "", + PatchInfo: "-", + AdministrativeState: string(adminState), + OperationalState: string(operState), + } + + // 序列化为JSON + meJSON, err := json.Marshal(managedElement) + if err != nil { + log.Errorf("Failed to marshal ManagedElement for UDM ne_id %s: %v", ne.NeId, err) + continue + } + + // 插入ManagedElement记录 + nbiCM := common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "ManagedElement", + ValueJson: string(meJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UDM ManagedElement record: %v", err) + continue + } + + // 为每个网元生成UdmFunction记录 + udmFunction := udm.UdmFunction{ + Id: "UF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-UdmFunction", + AdministrativeState: string(adminState), + OperationalState: string(operState), + VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId, + Fqdn: "udm-" + ne.NeId + ".5gc.3gpp.org", + SbiServiceList: "Nudm_UEAuthentication,Nudm_SubscriberDataManagement,Nudm_UEContextManagement", + } + + // 序列化为JSON + udmJSON, err := json.Marshal(udmFunction) + if err != nil { + log.Errorf("Failed to marshal UdmFunction for ne_id %s: %v", ne.NeId, err) + continue + } + + // 插入UdmFunction记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "UdmFunction", + ValueJson: string(udmJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UdmFunction record: %v", err) + continue + } + + // 为每个网元生成UdrFunction记录 + udrFunction := udm.UdrFunction{ + Id: "UDR" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-UdrFunction", + AdministrativeState: string(adminState), + OperationalState: string(operState), + VnfInstanceId: "vnf-UDR-" + ne.NeId, + Fqdn: "udr-" + ne.NeId + ".5gc.3gpp.org", + SbiServiceList: "Nudr_DataRepository", + MaxNumSupi: 800000, + MaxNumMsisdn: 800000, + } + + // 序列化为JSON + udrJSON, err := json.Marshal(udrFunction) + if err != nil { + log.Errorf("Failed to marshal UdrFunction for ne_id %s: %v", ne.NeId, err) + } else { + // 插入UdrFunction记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "UdrFunction", + ValueJson: string(udrJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UdrFunction record: %v", err) + } + } + + // 为每个网元生成AusfFunction记录 + ausfFunction := udm.AusfFunction{ + Id: "AUSF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-AusfFunction", + AdministrativeState: string(adminState), + OperationalState: string(operState), + VnfInstanceId: "vnf-AUSF-" + ne.NeId, + Fqdn: "ausf-" + ne.NeId + ".5gc.3gpp.org", + SbiServiceList: "Nausf_UEAuthentication,Nausf_SoRProtection", + } + + // 序列化为JSON + ausfJSON, err := json.Marshal(ausfFunction) + if err != nil { + log.Errorf("Failed to marshal AusfFunction for ne_id %s: %v", ne.NeId, err) + } else { + // 插入AusfFunction记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "AusfFunction", + ValueJson: string(ausfJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert AusfFunction record: %v", err) + } + } + + // 创建 IPResource 记录 + ipResource := udm.IPResource{ + Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-IPResource", + InterfaceType: "{Mgt,N8,N10,N12,N21}", + LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip, ne.Ip), + LocIpV6AddrList: "{Default,Default,Default,Default,Default}", + } + + // 序列化为JSON + ipJSON, err := json.Marshal(ipResource) + if err != nil { + log.Errorf("Failed to marshal IPResource for UDM ne_id %s: %v", ne.NeId, err) + } else { + // 插入IPResource记录 + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "IPResource", + ValueJson: string(ipJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UDM IPResource record: %v", err) + } + } + } + + log.Info("UDM NBI CM synchronization completed") + return nil +} diff --git a/src/modules/crontask/processor/nbiNRM/syncUpf.go b/src/modules/crontask/processor/nbiNRM/syncUpf.go new file mode 100644 index 00000000..bbfe85eb --- /dev/null +++ b/src/modules/crontask/processor/nbiNRM/syncUpf.go @@ -0,0 +1,393 @@ +package nbiNRM + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/lib/dborm" + "be.ems/lib/log" + "be.ems/src/modules/crontask/processor/nbiNRM/common" + "be.ems/src/modules/crontask/processor/nbiNRM/upf" +) + +// SyncUpfNbiCM 从ne_info获取UPF网元并同步数据到nbi_cm表 +func (s *BarProcessor) SyncUpfNbiCM() error { + log.Info("Starting UPF NBI CM synchronization") + + // 从ne_info表获取UPF类型的网元 + var upfNEs []common.NeInfo + err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "UPF"). + Find(&upfNEs).Error + if err != nil { + log.Errorf("Failed to query UPF network elements: %v", err) + return err + } + + log.Infof("Found %d UPF network elements", len(upfNEs)) + + // 当前时间戳 + now := time.Now() + timestamp := now.Unix() + + // 遍历每个UPF网元,生成对应的NBI CM记录 + for _, ne := range upfNEs { + adminState, operState := common.ParseStateFromStatus(ne.Status) + + // 为每个网元生成ManagedElement记录 + managedElement := upf.ManagedElement{ + Id: "ME" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName, + VendorName: ne.VendorName, + ManagedBy: ne.Dn, + ManagementIpAddress: ne.Ip, + SwVersion: "", + PatchInfo: "-", + AdministrativeState: string(adminState), + OperationalState: string(operState), + } + + // 序列化为JSON + meJSON, err := json.Marshal(managedElement) + if err != nil { + log.Errorf("Failed to marshal ManagedElement for UPF ne_id %s: %v", ne.NeId, err) + continue + } + + // 插入ManagedElement记录 + nbiCM := common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "ManagedElement", + ValueJson: string(meJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + // 插入到数据库 + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UPF ManagedElement record: %v", err) + continue + } + + // 生成 InventoryUnitRack 记录 + inventoryUnitRack := upf.InventoryUnitRack{ + Id: "RACK" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-Rack", + VendorUnitFamilyType: "5G-UPF", + VendorUnitTypeNumber: "UPF-" + ne.NeId, + VendorName: ne.VendorName, + SerialNumber: "SN-UPF-" + ne.NeId, + VersionNumber: "1.0.0", + DateOfManufacture: "2023-01-01", + DateOfLastService: now.Format("2006-01-02"), + ManufacturerData: "{}", + RackPosition: "1", + } + + // 序列化为JSON + rackJSON, err := json.Marshal(inventoryUnitRack) + if err != nil { + log.Errorf("Failed to marshal InventoryUnitRack for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "InventoryUnitRack", + ValueJson: string(rackJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert InventoryUnitRack record: %v", err) + } + } + + // 生成 InventoryUnitShelf 记录 + inventoryUnitShelf := upf.InventoryUnitShelf{ + Id: "SHELF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-Shelf", + VendorUnitFamilyType: "5G-UPF-SHELF", + VendorUnitTypeNumber: "UPF-SHELF-" + ne.NeId, + VendorName: ne.VendorName, + SerialNumber: "SN-UPF-SHELF-" + ne.NeId, + VersionNumber: "1.0.0", + DateOfManufacture: "2023-01-01", + DateOfLastService: now.Format("2006-01-02"), + ManufacturerData: "{}", + ShelfPosition: "1", + } + + // 序列化为JSON + shelfJSON, err := json.Marshal(inventoryUnitShelf) + if err != nil { + log.Errorf("Failed to marshal InventoryUnitShelf for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "InventoryUnitShelf", + ValueJson: string(shelfJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert InventoryUnitShelf record: %v", err) + } + } + + // 生成 InventoryUnitPack 记录 + inventoryUnitPack := upf.InventoryUnitPack{ + Id: "PACK" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-Pack", + VendorUnitFamilyType: "5G-UPF-PACK", + VendorUnitTypeNumber: "UPF-PACK-" + ne.NeId, + VendorName: ne.VendorName, + SerialNumber: "SN-UPF-PACK-" + ne.NeId, + VersionNumber: "1.0.0", + DateOfManufacture: "2023-01-01", + DateOfLastService: now.Format("2006-01-02"), + ManufacturerData: "{}", + SlotsOccupied: "1,2", + } + + // 序列化为JSON + packJSON, err := json.Marshal(inventoryUnitPack) + if err != nil { + log.Errorf("Failed to marshal InventoryUnitPack for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "InventoryUnitPack", + ValueJson: string(packJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert InventoryUnitPack record: %v", err) + } + } + + // 生成 InventoryUnitHost 记录 + inventoryUnitHost := upf.InventoryUnitHost{ + Id: "HOST" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-Host", + VendorUnitFamilyType: "5G-UPF-HOST", + VendorUnitTypeNumber: "UPF-HOST-" + ne.NeId, + VendorName: ne.VendorName, + SerialNumber: "SN-UPF-HOST-" + ne.NeId, + VersionNumber: "1.0.0", + DateOfManufacture: "2023-01-01", + DateOfLastService: now.Format("2006-01-02"), + ManufacturerData: "{}", + HostPosition: "1", + NumberOfCpu: "16", + MemSize: "64GB", + HardDiskSize: "1TB", + } + + // 序列化为JSON + hostJSON, err := json.Marshal(inventoryUnitHost) + if err != nil { + log.Errorf("Failed to marshal InventoryUnitHost for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "InventoryUnitHost", + ValueJson: string(hostJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert InventoryUnitHost record: %v", err) + } + } + + // 生成 InventoryUnitAccessory 记录 + inventoryUnitAccessory := upf.InventoryUnitAccessory{ + Id: "ACC" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-Accessory", + VendorUnitFamilyType: "5G-UPF-ACC", + VendorUnitTypeNumber: "UPF-ACC-" + ne.NeId, + VendorName: ne.VendorName, + SerialNumber: "SN-UPF-ACC-" + ne.NeId, + VersionNumber: "1.0.0", + DateOfManufacture: "2023-01-01", + DateOfLastService: now.Format("2006-01-02"), + ManufacturerData: "{}", + AccessoryPosition: "1", + AccessoryType: "FAN", + AddtionalInformation: "Cooling system", + } + + // 序列化为JSON + accJSON, err := json.Marshal(inventoryUnitAccessory) + if err != nil { + log.Errorf("Failed to marshal InventoryUnitAccessory for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "InventoryUnitAccessory", + ValueJson: string(accJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert InventoryUnitAccessory record: %v", err) + } + } + + // 生成 UpfFunction 记录 + upfFunction := upf.UpfFunction{ + Id: "UF" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-UpfFunction", + AdministrativeState: string(adminState), + OperationalState: string(operState), + VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId, + MaxQosFlows: "1000000", + MaxThroughput: "10Gbps", + } + + // 序列化为JSON + upfJSON, err := json.Marshal(upfFunction) + if err != nil { + log.Errorf("Failed to marshal UpfFunction for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "UpfFunction", + ValueJson: string(upfJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UpfFunction record: %v", err) + } + } + + // 创建 EpRpDynN3Upf 记录 + epRpDynN3Upf := upf.EpRpDynN3Upf{ + Id: "N3" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-N3", + LocIpAddrList: "[\"" + ne.Ip + "\"]", + FarIpSubnetworkList: "[\"10.13.0.0/16\"]", + } + + // 序列化为JSON + n3JSON, err := json.Marshal(epRpDynN3Upf) + if err != nil { + log.Errorf("Failed to marshal EpRpDynN3Upf for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "EpRpDynN3Upf", + ValueJson: string(n3JSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert EpRpDynN3Upf record: %v", err) + } + } + + // 创建 EpRpDynN9Upf 记录 + epRpDynN9Upf := upf.EpRpDynN9Upf{ + Id: "N9" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-N9", + LocIpAddrList: "[\"" + ne.Ip + "\"]", + FarIpSubnetworkList: "[\"10.19.0.0/16\"]", + } + + // 序列化为JSON + n9JSON, err := json.Marshal(epRpDynN9Upf) + if err != nil { + log.Errorf("Failed to marshal EpRpDynN9Upf for ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "EpRpDynN9Upf", + ValueJson: string(n9JSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert EpRpDynN9Upf record: %v", err) + } + } + + // 创建 IPResource 记录 + ipResource := upf.IPResource{ + Id: "IP" + fmt.Sprintf("-%s-%d", ne.NeId, timestamp), + UserLabel: ne.NeName + "-IPResource", + InterfaceType: "{Mgt,N3,N4,N9}", + LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip), + LocIpV6AddrList: "{Default,Default,Default,Default}", + } + + // 序列化为JSON + ipJSON, err := json.Marshal(ipResource) + if err != nil { + log.Errorf("Failed to marshal IPResource for UPF ne_id %s: %v", ne.NeId, err) + } else { + nbiCM = common.NbiCm{ + NeType: ne.NeType, + NeId: ne.NeId, + CmVersion: common.CmVersion, + RmUid: ne.RmUid, + EventType: common.EventTypeInt("ObjectCreationEvent"), + ObjectType: "IPResource", + ValueJson: string(ipJSON), + Timestamp: now.Format("2006-01-02 15:04:05"), + } + + err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error + if err != nil { + log.Errorf("Failed to insert UPF IPResource record: %v", err) + } + } + } + + log.Info("UPF NBI CM synchronization completed") + return nil +}