package cdr import ( "encoding/json" "fmt" "net/http" "strings" "time" "be.ems/lib/config" "be.ems/lib/core/ctx" "be.ems/lib/log" "be.ems/lib/services" "be.ems/src/framework/database/db" neService "be.ems/src/modules/network_element/service" wsService "be.ems/src/modules/ws/service" ) var ( UriCDREvent = config.DefaultUriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrEvent" UriCDRFile = config.DefaultUriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrFile" CustomUriCDREvent = config.UriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrEvent" CustomUriCDRFile = config.UriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrFile" ) // PostCDREventFrom 接收CDR数据请求 func PostCDREventFrom(w http.ResponseWriter, r *http.Request) { log.Info("PostCDREventFrom processing... ") neType := ctx.GetParam(r, "elementTypeValue") var body struct { NeType string `json:"neType" ` NeName string `json:"neName" ` RmUID string `json:"rmUID" ` Timestamp int `json:"timestamp" ` CDR map[string]any `json:"CDR" ` } if err := ctx.ShouldBindJSON(r, &body); err != nil { services.ResponseInternalServerError500ProcessError(w, err) return } neTypeLower := strings.ToLower(body.NeType) if neType == "" || neType != neTypeLower { services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("inconsistent network element types")) return } // 是否存在网元 neInfo := neService.NewNeInfo.FindByRmuid(body.RmUID) if neInfo.NeType != body.NeType || neInfo.RmUID != body.RmUID { services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("network element does not exist")) return } cdrByte, err := json.Marshal(body.CDR) if err != nil { services.ResponseInternalServerError500ProcessError(w, err) return } // 执行插入表 type CDREvent struct { ID int64 `json:"-" gorm:"column:id;primaryKey;autoIncrement"` NeType string `json:"neType" gorm:"column:ne_type"` NeName string `json:"neName" gorm:"column:ne_name"` RmUid string `json:"rmUid" gorm:"column:rm_uid"` Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` // 接收到的timestamp秒级存储毫秒时间戳 CdrJson string `json:"cdrJSON" gorm:"column:cdr_json"` // data JSON String CreatedAt int64 `json:"-" gorm:"column:created_at"` // 记录创建存储毫秒 } data := CDREvent{ NeType: body.NeType, NeName: body.NeName, RmUid: body.RmUID, Timestamp: int64(body.Timestamp) * 1000, CdrJson: string(cdrByte), CreatedAt: time.Now().UnixMilli(), } tableName := fmt.Sprintf("cdr_event_%s", neTypeLower) if err := db.DB("").Table(tableName).Create(&data).Error; err != nil { log.Error("Failed to insert "+tableName, err) services.ResponseInternalServerError500ProcessError(w, err) return } // 推送到ws订阅组 switch neInfo.NeType { case "IMS": if v, ok := body.CDR["recordType"]; ok && (v == "MOC" || v == "MTSM") { wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_IMS_CDR, neInfo.NeId), data) } case "SMF": wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_SMF_CDR, neInfo.NeId), data) case "SMSC": wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_SMSC_CDR, neInfo.NeId), data) case "SGWC": wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_SGWC_CDR, neInfo.NeId), data) } services.ResponseStatusOK204NoContent(w) }