Files
be.ems/features/cdr/cdrevent.go
2025-06-10 17:50:54 +08:00

103 lines
3.6 KiB
Go

package cdr
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"be.ems/lib/config"
"be.ems/lib/core/ctx"
"be.ems/lib/log"
"be.ems/lib/services"
"be.ems/src/framework/database/db"
neService "be.ems/src/modules/network_element/service"
wsService "be.ems/src/modules/ws/service"
)
var (
UriCDREvent = config.DefaultUriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrEvent"
UriCDRFile = config.DefaultUriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrFile"
CustomUriCDREvent = config.UriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrEvent"
CustomUriCDRFile = config.UriPrefix + "/cdrManagement/v1/elementType/{elementTypeValue}/objectType/cdrFile"
)
// PostCDREventFrom 接收CDR数据请求
func PostCDREventFrom(w http.ResponseWriter, r *http.Request) {
log.Info("PostCDREventFrom processing... ")
neType := ctx.GetParam(r, "elementTypeValue")
var body struct {
NeType string `json:"neType" `
NeName string `json:"neName" `
RmUID string `json:"rmUID" `
Timestamp int `json:"timestamp" `
CDR map[string]any `json:"CDR" `
}
if err := ctx.ShouldBindJSON(r, &body); err != nil {
services.ResponseInternalServerError500ProcessError(w, err)
return
}
neTypeLower := strings.ToLower(body.NeType)
if neType == "" || neType != neTypeLower {
services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("inconsistent network element types"))
return
}
// 是否存在网元
neInfo := neService.NewNeInfo.FindByCoreUidAndNeUid("*", body.RmUID)
if neInfo.NeType != body.NeType || neInfo.NeUID != body.RmUID {
services.ResponseInternalServerError500ProcessError(w, fmt.Errorf("network element does not exist"))
return
}
cdrByte, err := json.Marshal(body.CDR)
if err != nil {
services.ResponseInternalServerError500ProcessError(w, err)
return
}
// 执行插入表
type CDREvent struct {
ID int64 `json:"-" gorm:"column:id;primaryKey;autoIncrement"`
NeType string `json:"neType" gorm:"column:ne_type"`
NeName string `json:"neName" gorm:"column:ne_name"`
RmUid string `json:"rmUid" gorm:"column:rm_uid"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` // 接收到的timestamp秒级存储毫秒时间戳
CdrJson string `json:"cdrJSON" gorm:"column:cdr_json"` // data JSON String
CreatedAt int64 `json:"-" gorm:"column:created_at"` // 记录创建存储毫秒
}
data := CDREvent{
NeType: body.NeType,
NeName: body.NeName,
RmUid: body.RmUID,
Timestamp: int64(body.Timestamp) * 1000,
CdrJson: string(cdrByte),
CreatedAt: time.Now().UnixMilli(),
}
tableName := fmt.Sprintf("cdr_event_%s", neTypeLower)
if err := db.DB("").Table(tableName).Create(&data).Error; err != nil {
log.Error("Failed to insert "+tableName, err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
// 推送到ws订阅组
switch neInfo.NeType {
case "IMS":
if v, ok := body.CDR["recordType"]; ok && (v == "MOC" || v == "MTSM") {
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_IMS_CDR, neInfo.CoreUID, neInfo.NeUID), data)
}
case "SMF":
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_SMF_CDR, neInfo.CoreUID, neInfo.NeUID), data)
case "SMSC":
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_SMSC_CDR, neInfo.CoreUID, neInfo.NeUID), data)
case "SGWC":
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s_%s", wsService.GROUP_SGWC_CDR, neInfo.CoreUID, neInfo.NeUID), data)
}
services.ResponseStatusOK204NoContent(w)
}