package upf import ( "encoding/json" "fmt" "strconv" "time" "be.ems/features/nbi/redisqueue" "be.ems/lib/dborm" "be.ems/lib/log" "be.ems/src/modules/crontask/processor/syncNbiNRM/common" ) // SyncUpfNbiCM 从ne_info获取UPF网元并同步数据到nbi_cm表 func SyncNbiCM() error { log.Info("Starting UPF NBI CM synchronization") // 从ne_info表获取UPF类型的网元 var upfNEs []common.NeInfo err := dborm.DefaultDB().Table("ne_info").Where("ne_type = ?", "UPF"). Find(&upfNEs).Error if err != nil { log.Errorf("Failed to query UPF network elements: %v", err) return err } log.Infof("Found %d UPF network elements", len(upfNEs)) now := time.Now() for _, ne := range upfNEs { adminState, operState := common.ParseStateFromStatus(ne.Status) var version string = "-" err := dborm.DefaultDB().Table("ne_version"). Where("ne_type = ? and ne_id = ?", ne.NeType, ne.NeId). Pluck("version", &version).Error if err != nil { log.Errorf("Failed to query %s version: %v", ne.NeName, err) return err } // ========== ManagedElement ========== managedElement := ManagedElement{ Id: fmt.Sprintf("%s-%s-ManagedElement", ne.NeType, ne.NeId), UserLabel: ne.NeName, VendorName: ne.VendorName, ManagedBy: ne.Dn, ManagementIpAddress: ne.Ip, SwVersion: version, PatchInfo: "-", AdministrativeState: adminState, OperationalState: operState, } meJSON, err := json.Marshal(managedElement) if err != nil { log.Errorf("Failed to marshal ManagedElement for UPF ne_id %s: %v", ne.NeId, err) continue } var lastJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "ManagedElement", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastJson).Error newValueJson := string(meJSON) if err != nil || lastJson == "" { common.InsertNbiCm(ne, version, "ManagedElement", newValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "ManagedElement", newValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastJson, newValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "ManagedElement", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "ManagedElement", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "ManagedElement", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "ManagedElement", newValueJson, common.ObjectOriginalEvent) } } // ========== InventoryUnitRack ========== inventoryUnitRack := InventoryUnitRack{ Id: fmt.Sprintf("%s-%s-InventoryUnitRack", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-InventoryUnitRack", VendorUnitFamilyType: "5G-UPF", VendorUnitTypeNumber: "UPF-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", RackPosition: "1", } rackJSON, err := json.Marshal(inventoryUnitRack) if err != nil { log.Errorf("Failed to marshal InventoryUnitRack for ne_id %s: %v", ne.NeId, err) } else { var lastRackJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "InventoryUnitRack", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastRackJson).Error newRackValueJson := string(rackJSON) if err != nil || lastRackJson == "" { common.InsertNbiCm(ne, version, "InventoryUnitRack", newRackValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitRack", newRackValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastRackJson, newRackValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitRack", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitRack", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitRack", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "InventoryUnitRack", newRackValueJson, common.ObjectOriginalEvent) } } } // ========== InventoryUnitShelf ========== inventoryUnitShelf := InventoryUnitShelf{ Id: fmt.Sprintf("%s-%s-InventoryUnitShelf", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-InventoryUnitShelf", VendorUnitFamilyType: "5G-UPF-SHELF", VendorUnitTypeNumber: "UPF-SHELF-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-SHELF-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", ShelfPosition: "1", } shelfJSON, err := json.Marshal(inventoryUnitShelf) if err != nil { log.Errorf("Failed to marshal InventoryUnitShelf for ne_id %s: %v", ne.NeId, err) } else { var lastShelfJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "InventoryUnitShelf", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastShelfJson).Error newShelfValueJson := string(shelfJSON) if err != nil || lastShelfJson == "" { common.InsertNbiCm(ne, version, "InventoryUnitShelf", newShelfValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitShelf", newShelfValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastShelfJson, newShelfValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitShelf", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitShelf", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitShelf", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "InventoryUnitShelf", newShelfValueJson, common.ObjectOriginalEvent) } } } // ========== InventoryUnitPack ========== inventoryUnitPack := InventoryUnitPack{ Id: fmt.Sprintf("%s-%s-InventoryUnitPack", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-InventoryUnitPack", VendorUnitFamilyType: "5G-UPF-PACK", VendorUnitTypeNumber: "UPF-PACK-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-PACK-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", SlotsOccupied: "1,2", } packJSON, err := json.Marshal(inventoryUnitPack) if err != nil { log.Errorf("Failed to marshal InventoryUnitPack for ne_id %s: %v", ne.NeId, err) } else { var lastPackJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "InventoryUnitPack", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastPackJson).Error newPackValueJson := string(packJSON) if err != nil || lastPackJson == "" { common.InsertNbiCm(ne, version, "InventoryUnitPack", newPackValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitPack", newPackValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastPackJson, newPackValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitPack", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitPack", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitPack", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "InventoryUnitPack", newPackValueJson, common.ObjectOriginalEvent) } } } // ========== InventoryUnitHost ========== inventoryUnitHost := InventoryUnitHost{ Id: fmt.Sprintf("%s-%s-InventoryUnitHost", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-InventoryUnitHost", VendorUnitFamilyType: "5G-UPF-HOST", VendorUnitTypeNumber: "UPF-HOST-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-HOST-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", HostPosition: "1", NumberOfCpu: "16", MemSize: "64GB", HardDiskSize: "1TB", } hostJSON, err := json.Marshal(inventoryUnitHost) if err != nil { log.Errorf("Failed to marshal InventoryUnitHost for ne_id %s: %v", ne.NeId, err) } else { var lastHostJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "InventoryUnitHost", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastHostJson).Error newHostValueJson := string(hostJSON) if err != nil || lastHostJson == "" { common.InsertNbiCm(ne, version, "InventoryUnitHost", newHostValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitHost", newHostValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastHostJson, newHostValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitHost", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitHost", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitHost", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "InventoryUnitHost", newHostValueJson, common.ObjectOriginalEvent) } } } // ========== InventoryUnitAccessory ========== inventoryUnitAccessory := InventoryUnitAccessory{ Id: fmt.Sprintf("%s-%s-InventoryUnitAccessory", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-InventoryUnitAccessory", VendorUnitFamilyType: "5G-UPF-ACC", VendorUnitTypeNumber: "UPF-ACC-" + ne.NeId, VendorName: ne.VendorName, SerialNumber: "SN-UPF-ACC-" + ne.NeId, VersionNumber: "1.0.0", DateOfManufacture: "2023-01-01", DateOfLastService: now.Format("2006-01-02"), ManufacturerData: "{}", AccessoryPosition: "1", AccessoryType: "FAN", AddtionalInformation: "Cooling system", } accJSON, err := json.Marshal(inventoryUnitAccessory) if err != nil { log.Errorf("Failed to marshal InventoryUnitAccessory for ne_id %s: %v", ne.NeId, err) } else { var lastAccJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "InventoryUnitAccessory", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastAccJson).Error newAccValueJson := string(accJSON) if err != nil || lastAccJson == "" { common.InsertNbiCm(ne, version, "InventoryUnitAccessory", newAccValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitAccessory", newAccValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastAccJson, newAccValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitAccessory", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitAccessory", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "InventoryUnitAccessory", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "InventoryUnitAccessory", newAccValueJson, common.ObjectOriginalEvent) } } } var capability int err = dborm.DefaultDB().Table("ne_license").Where("ne_type = ? and ne_id = ?", ne.NeType, ne.NeId). Pluck("capability", &capability).Error if err != nil { log.Errorf("Failed to query capability for ne_id %s: %v", ne.NeId, err) capability = 0 } // ========== UpfFunction ========== upfFunction := UpfFunction{ Id: fmt.Sprintf("%s-%s-UpfFunction", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-UpfFunction", AdministrativeState: adminState, OperationalState: operState, VnfInstanceId: "vnf-" + ne.NeType + "-" + ne.NeId, MaxQosFlows: fmt.Sprintf("%d", capability), MaxThroughput: "10Gbps", } upfJSON, err := json.Marshal(upfFunction) if err != nil { log.Errorf("Failed to marshal UpfFunction for ne_id %s: %v", ne.NeId, err) } else { var lastUpfJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "UpfFunction", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastUpfJson).Error newUpfValueJson := string(upfJSON) if err != nil || lastUpfJson == "" { common.InsertNbiCm(ne, version, "UpfFunction", newUpfValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "UpfFunction", newUpfValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastUpfJson, newUpfValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "UpfFunction", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "UpfFunction", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "UpfFunction", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "UpfFunction", newUpfValueJson, common.ObjectOriginalEvent) } } } // ========== EpRpDynN3Upf ========== epRpDynN3Upf := EpRpDynN3Upf{ Id: fmt.Sprintf("%s-%s-EpRpDynN3Upf", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-EpRpDynN3Upf", LocIpAddrList: `{"` + ne.Ip + `"}`, FarIpSubnetworkList: `{{"Subnetwork":"10.13.1.0","Mask":"255.255.255.255"}}`, } n3JSON, err := json.Marshal(epRpDynN3Upf) if err != nil { log.Errorf("Failed to marshal EpRpDynN3Upf for ne_id %s: %v", ne.NeId, err) } else { var lastN3Json string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "EpRpDynN3Upf", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastN3Json).Error newN3ValueJson := string(n3JSON) if err != nil || lastN3Json == "" { common.InsertNbiCm(ne, version, "EpRpDynN3Upf", newN3ValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN3Upf", newN3ValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastN3Json, newN3ValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN3Upf", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN3Upf", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN3Upf", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "EpRpDynN3Upf", newN3ValueJson, common.ObjectOriginalEvent) } } } // ========== EpRpDynN9Upf ========== epRpDynN9Upf := EpRpDynN9Upf{ Id: fmt.Sprintf("%s-%s-EpRpDynN9Upf", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-EpRpDynN9Upf", LocIpAddrList: `{` + ne.Ip + `}`, FarIpSubnetworkList: `{{Subnetwork:10.19.1.0,Mask:255.255.255.255}}`, } n9JSON, err := json.Marshal(epRpDynN9Upf) if err != nil { log.Errorf("Failed to marshal EpRpDynN9Upf for ne_id %s: %v", ne.NeId, err) } else { var lastN9Json string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "EpRpDynN9Upf", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastN9Json).Error newN9ValueJson := string(n9JSON) if err != nil || lastN9Json == "" { common.InsertNbiCm(ne, version, "EpRpDynN9Upf", newN9ValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN9Upf", newN9ValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastN9Json, newN9ValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN9Upf", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN9Upf", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "EpRpDynN9Upf", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "EpRpDynN9Upf", newN9ValueJson, common.ObjectOriginalEvent) } } } // ========== IPResource ========== ipResource := IPResource{ Id: fmt.Sprintf("%s-%s-IPResource", ne.NeType, ne.NeId), UserLabel: ne.NeName + "-IPResource", InterfaceType: "{Mgt,N3,N4,N9}", LocIpV4AddrList: fmt.Sprintf("{%s,%s,%s,%s}", ne.Ip, ne.Ip, ne.Ip, ne.Ip), LocIpV6AddrList: "{Default,Default,Default,Default}", } ipJSON, err := json.Marshal(ipResource) if err != nil { log.Errorf("Failed to marshal IPResource for UPF ne_id %s: %v", ne.NeId, err) } else { var lastIpJson string err = dborm.DefaultDB().Table("nbi_cm"). Where("ne_type = ? AND ne_id = ? AND object_type = ? AND event_type = ?", ne.NeType, ne.NeId, "IPResource", common.ObjectOriginalEvent). Order("timestamp ASC").Pluck("value_json", &lastIpJson).Error newIpValueJson := string(ipJSON) if err != nil || lastIpJson == "" { common.InsertNbiCm(ne, version, "IPResource", newIpValueJson, common.ObjectOriginalEvent) nbiCm := common.InsertNbiCm(ne, version, "IPResource", newIpValueJson, common.ObjectCreationEvent) redisqueue.AddNbiCMQueue([]string{strconv.Itoa(nbiCm.Id)}) } else { var ids []string added, modified, deleted, _ := common.CompareJSON(lastIpJson, newIpValueJson) if len(added) > 0 { nbiCm := common.InsertNbiCm(ne, version, "IPResource", common.ToJson(added), common.ObjectCreationEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(modified) > 0 { nbiCm := common.InsertNbiCm(ne, version, "IPResource", common.ToJson(modified), common.ObjectAttributeValueChangeEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(deleted) > 0 { nbiCm := common.InsertNbiCm(ne, version, "IPResource", common.ToJson(deleted), common.ObjectDeletionEvent) ids = append(ids, strconv.Itoa(nbiCm.Id)) } if len(ids) > 0 { redisqueue.AddNbiCMQueue(ids) common.InsertNbiCm(ne, version, "IPResource", newIpValueJson, common.ObjectOriginalEvent) } } } } log.Info("UPF NBI CM synchronization completed") return nil }