package syncNbiNRM import ( "encoding/json" "fmt" "time" "be.ems/lib/dborm" "be.ems/lib/log" "be.ems/src/modules/crontask/processor/syncNbiNRM/common" "be.ems/src/modules/crontask/processor/syncNbiNRM/upf" ) // SyncUpfNbiCM 从ne_info获取UPF网元并同步数据到nbi_cm表 func (s *BarProcessor) SyncUpfNbiCM() error { log.Info("Starting UPF NBI CM synchronization") // 从ne_info表获取UPF类型的网元 var upfNEs []common.NeInfo err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "UPF"). Find(&upfNEs).Error if err != nil { log.Errorf("Failed to query UPF network elements: %v", err) return err } log.Infof("Found %d UPF network elements", len(upfNEs)) // 当前时间戳 now := time.Now() timestamp := now.Unix() // 遍历每个UPF网元,生成对应的NBI CM记录 for _, ne := range upfNEs { adminState, operState := common.ParseStateFromStatus(ne.Status) var version string = "-" err := dborm.DefaultDB().Table("ne_version"). Where("ne_type = ? and ne_id = ?", ne.NeType, ne.NeId). Pluck("version", &version).Error if err != nil { log.Errorf("Failed to query %s version: %v", ne.NeName, err) return err } // 为每个网元生成ManagedElement记录 managedElement := upf.ManagedElement{ Id: fmt.Sprintf("%s-%s-ManagedElement-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName, VendorName: ne.VendorName, ManagedBy: ne.Dn, ManagementIpAddress: ne.Ip, SwVersion: version, PatchInfo: "-", AdministrativeState: string(adminState), OperationalState: string(operState), } // 序列化为JSON meJSON, err := json.Marshal(managedElement) if err != nil { log.Errorf("Failed to marshal ManagedElement for UPF ne_id %s: %v", ne.NeId, err) continue } // 插入ManagedElement记录 nbiCM := common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "ManagedElement", ValueJson: string(meJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } // 插入到数据库 err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert UPF ManagedElement record: %v", err) continue } // 生成 InventoryUnitRack 记录 inventoryUnitRack := upf.InventoryUnitRack{ Id: fmt.Sprintf("%s-%s-InventoryUnitRack-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-Rack", VendorUnitFamilyType: "5G-UPF", VendorUnitTypeNumber: "UPF-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", RackPosition: "1", } // 序列化为JSON rackJSON, err := json.Marshal(inventoryUnitRack) if err != nil { log.Errorf("Failed to marshal InventoryUnitRack for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "InventoryUnitRack", ValueJson: string(rackJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert InventoryUnitRack record: %v", err) } } // 生成 InventoryUnitShelf 记录 inventoryUnitShelf := upf.InventoryUnitShelf{ Id: fmt.Sprintf("%s-%s-InventoryUnitShelf-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-Shelf", VendorUnitFamilyType: "5G-UPF-SHELF", VendorUnitTypeNumber: "UPF-SHELF-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-SHELF-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", ShelfPosition: "1", } // 序列化为JSON shelfJSON, err := json.Marshal(inventoryUnitShelf) if err != nil { log.Errorf("Failed to marshal InventoryUnitShelf for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "InventoryUnitShelf", ValueJson: string(shelfJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert InventoryUnitShelf record: %v", err) } } // 生成 InventoryUnitPack 记录 inventoryUnitPack := upf.InventoryUnitPack{ Id: fmt.Sprintf("%s-%s-InventoryUnitPack-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-Pack", VendorUnitFamilyType: "5G-UPF-PACK", VendorUnitTypeNumber: "UPF-PACK-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-PACK-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", SlotsOccupied: "1,2", } // 序列化为JSON packJSON, err := json.Marshal(inventoryUnitPack) if err != nil { log.Errorf("Failed to marshal InventoryUnitPack for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "InventoryUnitPack", ValueJson: string(packJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert InventoryUnitPack record: %v", err) } } // 生成 InventoryUnitHost 记录 inventoryUnitHost := upf.InventoryUnitHost{ Id: fmt.Sprintf("%s-%s-InventoryUnitHost-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-Host", VendorUnitFamilyType: "5G-UPF-HOST", VendorUnitTypeNumber: "UPF-HOST-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-HOST-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", HostPosition: "1", NumberOfCpu: "16", MemSize: "64GB", HardDiskSize: "1TB", } // 序列化为JSON hostJSON, err := json.Marshal(inventoryUnitHost) if err != nil { log.Errorf("Failed to marshal InventoryUnitHost for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "InventoryUnitHost", ValueJson: string(hostJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert InventoryUnitHost record: %v", err) } } // 生成 InventoryUnitAccessory 记录 inventoryUnitAccessory := upf.InventoryUnitAccessory{ Id: fmt.Sprintf("%s-%s-InventoryUnitAccessory-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-Accessory", VendorUnitFamilyType: "5G-UPF-ACC", VendorUnitTypeNumber: "UPF-ACC-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-ACC-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", AccessoryPosition: "1", AccessoryType: "FAN", AddtionalInformation: "Cooling system", } // 序列化为JSON accJSON, err := json.Marshal(inventoryUnitAccessory) if err != nil { log.Errorf("Failed to marshal InventoryUnitAccessory for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "InventoryUnitAccessory", ValueJson: string(accJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert InventoryUnitAccessory record: %v", err) } } // 生成 UpfFunction 记录 upfFunction := upf.UpfFunction{ Id: fmt.Sprintf("%s-%s-UpfFunction-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-UpfFunction", AdministrativeState: string(adminState), OperationalState: string(operState), VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId, MaxQosFlows: "1000000", MaxThroughput: "10Gbps", } // 序列化为JSON upfJSON, err := json.Marshal(upfFunction) if err != nil { log.Errorf("Failed to marshal UpfFunction for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "UpfFunction", ValueJson: string(upfJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert UpfFunction record: %v", err) } } // 创建 EpRpDynN3Upf 记录 epRpDynN3Upf := upf.EpRpDynN3Upf{ Id: fmt.Sprintf("%s-%s-EpRpDynN3Upf-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-N3", LocIpAddrList: "[\"" + ne.Ip + "\"]", FarIpSubnetworkList: "[\"10.13.0.0/16\"]", } // 序列化为JSON n3JSON, err := json.Marshal(epRpDynN3Upf) if err != nil { log.Errorf("Failed to marshal EpRpDynN3Upf for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "EpRpDynN3Upf", ValueJson: string(n3JSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert EpRpDynN3Upf record: %v", err) } } // 创建 EpRpDynN9Upf 记录 epRpDynN9Upf := upf.EpRpDynN9Upf{ Id: fmt.Sprintf("%s-%s-EpRpDynN9Upf-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-N9", LocIpAddrList: "[\"" + ne.Ip + "\"]", FarIpSubnetworkList: "[\"10.19.0.0/16\"]", } // 序列化为JSON n9JSON, err := json.Marshal(epRpDynN9Upf) if err != nil { log.Errorf("Failed to marshal EpRpDynN9Upf for ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "EpRpDynN9Upf", ValueJson: string(n9JSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert EpRpDynN9Upf record: %v", err) } } // 创建 IPResource 记录 ipResource := upf.IPResource{ Id: fmt.Sprintf("%s-%s-IPResource-%d", ne.NeType, ne.NeId, timestamp), // 生成唯一ID UserLabel: ne.NeName + "-IPResource", InterfaceType: "{Mgt,N3,N4,N9}", LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip), LocIpV6AddrList: "{Default,Default,Default,Default}", } // 序列化为JSON ipJSON, err := json.Marshal(ipResource) if err != nil { log.Errorf("Failed to marshal IPResource for UPF ne_id %s: %v", ne.NeId, err) } else { nbiCM = common.NbiCm{ NeType: ne.NeType, NeId: ne.NeId, CmVersion: common.CmVersion, RmUid: ne.RmUid, EventType: common.EventTypeInt("ObjectCreationEvent"), ObjectType: "IPResource", ValueJson: string(ipJSON), Timestamp: now.Format("2006-01-02 15:04:05"), } err = dborm.DefaultDB().Table("nbi_cm").Create(&nbiCM).Error if err != nil { log.Errorf("Failed to insert UPF IPResource record: %v", err) } } } log.Info("UPF NBI CM synchronization completed") return nil }