package trace import ( "encoding/json" "errors" "fmt" "io" "net/http" "strconv" "strings" "github.com/go-resty/resty/v2" "github.com/gorilla/mux" "be.ems/lib/dborm" "be.ems/lib/global" "be.ems/lib/log" "be.ems/lib/run" "be.ems/lib/services" "be.ems/restagent/config" ) var ( UriTraceTaskV1 = config.DefaultUriPrefix + "/traceManagement/v1/subscriptions" UriTraceTask = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/subscriptions" UriTraceRawMsg = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/rawMessage/{id}" UriTraceDecMsg = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/decMessage/{id}" // decode message api CustomUriTraceTaskV1 = config.UriPrefix + "/traceManagement/v1/subscriptions" CustomUriTraceTask = config.UriPrefix + "/traceManagement/{apiVersion}/subscriptions" ) type TraceTask struct { Id int `json:"id" xorm:"pk 'id' autoincr"` TraceType string `json:"traceType"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Imsi string `json:"imsi"` Msisdn string `json:"msisdn"` SrcIp string `json:"srcIp"` DstIp string `json:"dstIp"` SignalPort int16 `json:"signalPort"` NeType string `json:"neType"` NeId string `json:"neId"` UeIp string `json:"ueIp"` Interfaces []string `json:"interfaces"` NotifyUrl string `json:"notifyUrl" xorm:"-"` Status string `json:"-" xorm:"status"` SuccNEs []string `json:"-" xorm:"succ_nes"` FailNEs []string `json:"-" xorm:"fail_nes"` AccountID string `json:"accountId" xorm:"account_id"` Comment string `json:"comment" xorm:"comment"` UpdateTime string `json:"-" xorm:"-"` } var client = resty.New() /* func init() { client.SetTimeout(3 * time.Second) } */ // Post trace task to NF/NFs func PostTraceTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PostTraceTaskToNF processing... ") //vars := mux.Vars(r) // token, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } // log.Debug("AccessToken:", token) body, err := io.ReadAll(io.LimitReader(r.Body, int64(config.GetYamlConfig().Params.UriMaxLen))) if err != nil { log.Error("io.ReadAll is failed:", err) services.ResponseNotFound404UriNotExist(w, r) return } log.Trace("body:", string(body)) traceTask := new(TraceTask) _ = json.Unmarshal(body, traceTask) log.Debug("traceTask:", traceTask) var neTypes []string // do not set device if traceTask.NeType == "" { // query neType by interface if len(traceTask.Interfaces) > 0 { err := dborm.XormGetSingleColStringArrayByIn("trace_info", "ne_type", "interface", traceTask.Interfaces, &neTypes) if err != nil { log.Error("Failed to dborm.XormGetSingleCol:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } else { neTypes = []string{"AMF", "SMF", "UDM", "AUSF", "UPF"} } } else { neTypes = append(neTypes, traceTask.NeType) } log.Debug("neTypes:", neTypes) traceTask.Status = "Inactive" _, err = dborm.XormInsertTableOne("trace_task", traceTask) if err != nil { log.Error("Failed to dborm.XormInsertTableOne:", err) services.ResponseInternalServerError500ProcessError(w, err) return } traceTask.NotifyUrl = config.GetYamlConfig().OMC.GtpUri log.Trace("traceTask:", traceTask) for _, neType := range neTypes { var neInfos []dborm.NeInfo if traceTask.NeId == "" { err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } else { neInfo, err := dborm.XormGetNeInfo(neType, traceTask.NeId) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } neInfos = append(neInfos, *neInfo) } for _, neInfo := range neInfos { hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port) requestURI2NF := fmt.Sprintf("%s%s", hostUri, UriTraceTaskV1) log.Debug("requestURI2NF:", requestURI2NF) body, _ := json.Marshal(traceTask) log.Debug("body:", string(body)) resp, err := client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). Post(requestURI2NF) if err != nil { log.Error("Failed to Post:", err) failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId) traceTask.FailNEs = append(traceTask.FailNEs, failNE) } else { switch resp.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: succNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId) traceTask.SuccNEs = append(traceTask.SuccNEs, succNE) default: log.Warnf("Post return code:%d, message:%s", resp.StatusCode(), string(resp.Body())) failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId) traceTask.FailNEs = append(traceTask.FailNEs, failNE) } } } } if len(traceTask.SuccNEs) > 0 { traceTask.Status = "Active" _, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask) if err != nil { log.Error("Failed to dborm.XormUpdateTableById:", err) services.ResponseInternalServerError500ProcessError(w, err) return } services.ResponseStatusOK204NoContent(w) } else { traceTask.Status = "Failed" _, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask) if err != nil { log.Error("Failed to dborm.XormUpdateTableById:", err) services.ResponseInternalServerError500ProcessError(w, err) return } err = global.ErrTraceFailedDistributeToNEs log.Error(err) services.ResponseInternalServerError500ProcessError(w, err) return } } func PutTraceTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("PutTraceTaskToNF processing... ") //vars := mux.Vars(r) // token, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } // log.Debug("AccessToken:", token) body, err := io.ReadAll(io.LimitReader(r.Body, int64(config.GetYamlConfig().Params.UriMaxLen))) if err != nil { log.Error("io.ReadAll is failed:", err) services.ResponseNotFound404UriNotExist(w, r) return } traceTask := new(TraceTask) _ = json.Unmarshal(body, traceTask) traceTask.NotifyUrl = config.GetYamlConfig().OMC.GtpUri log.Debug("traceTask:", traceTask) var neTypes []string // do not set device if traceTask.NeType == "" { // query neType by interface if len(traceTask.Interfaces) > 0 { err := dborm.XormGetSingleColStringArrayByIn("trace_info", "ne_type", "interface", traceTask.Interfaces, &neTypes) if err != nil { log.Error("Failed to dborm.XormGetSingleColStringArrayByIn:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } else { neTypes = []string{"AMF", "SMF", "UDM", "AUSF", "UPF"} } } else { neTypes = append(neTypes, traceTask.NeType) } log.Debug("neTypes:", neTypes) for _, neType := range neTypes { var neInfos []dborm.NeInfo if traceTask.NeId == "" { err := dborm.XormGetNeInfoByNeType(neType, &neInfos) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } else { neInfo, err := dborm.XormGetNeInfo(neType, traceTask.NeId) if err != nil { log.Error("Failed to dborm.XormGetNeInfoByNeType:", err) services.ResponseInternalServerError500ProcessError(w, err) return } neInfos = append(neInfos, *neInfo) } for _, neInfo := range neInfos { hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port) requestURI2NF := fmt.Sprintf("%s%s", hostUri, UriTraceTaskV1) log.Debug("requestURI2NF:", requestURI2NF) body, _ := json.Marshal(traceTask) log.Debug("body:", string(body)) resp, err := client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). Put(requestURI2NF) if err != nil { log.Error("Failed to Put:", err) failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId) traceTask.FailNEs = append(traceTask.FailNEs, failNE) } else { switch resp.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: succNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId) traceTask.SuccNEs = append(traceTask.SuccNEs, succNE) default: log.Warn("Post return code:%d, message:%s", resp.StatusCode(), string(resp.Body())) failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId) traceTask.FailNEs = append(traceTask.FailNEs, failNE) } } } } if len(traceTask.SuccNEs) > 0 { traceTask.Status = "Active" _, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask) if err != nil { log.Error("Failed to dborm.XormUpdateTableById:", err) services.ResponseInternalServerError500ProcessError(w, err) return } services.ResponseStatusOK204NoContent(w) } else { traceTask.Status = "Failed" _, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask) if err != nil { log.Error("Failed to dborm.XormUpdateTableById:", err) services.ResponseInternalServerError500ProcessError(w, err) return } err = global.ErrTraceFailedDistributeToNEs log.Error(err) services.ResponseInternalServerError500ProcessError(w, err) return } } func DeleteTraceTaskToNF(w http.ResponseWriter, r *http.Request) { log.Debug("DeleteTraceTaskToNF processing... ") // token, err := services.CheckFrontValidRequest(w, r) // if err != nil { // log.Error("Request error:", err) // return // } // log.Debug("AccessToken:", token) vars := r.URL.Query() ids, ok := vars["id"] if !ok || len(ids) == 0 { err := global.ErrTraceNotCarriedTaskID log.Error(err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("ids:", ids) for _, id := range ids { log.Debug("id:", id) var succNes []string err := dborm.XormGetColStringArrayByWhere("trace_task", "succ_nes", fmt.Sprintf("id=%s", id), &succNes) if err != nil { log.Error("Failed to dborm.XormGetSingleColStringArrayByWhere:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Debug("succNes:", succNes) nes := new([]string) if len(succNes) > 0 { _ = json.Unmarshal([]byte(succNes[0]), nes) } log.Debug("nes:", nes) for _, ne := range *nes { i := strings.Index(ne, ".") neType := ne[0:i] neId := ne[i+1:] log.Debugf("ne:%s neType:%s neId:%s", ne, neType, neId) neInfo, err := dborm.XormGetNeInfo(neType, neId) if err != nil { log.Error("Failed to dborm.XormGetNeInfo:", err) services.ResponseInternalServerError500ProcessError(w, err) return } hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port) requestURI2NF := fmt.Sprintf("%s%s?id=%s", hostUri, UriTraceTaskV1, id) log.Debug("requestURI2NF:", requestURI2NF) _, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). Delete(requestURI2NF) if err != nil { log.Error("Failed to Delete:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } _, err = dborm.XormDeleteDataByWhere(fmt.Sprintf("id=%s", id), "trace_task") if err != nil { log.Error("Failed to dborm.XormDeleteDataByWhere:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } services.ResponseStatusOK204NoContent(w) } func GetRawMessage(w http.ResponseWriter, r *http.Request) { log.Debug("GetRawMessage processing... ") } func ParseRawMsg2Html(w http.ResponseWriter, r *http.Request) { log.Debug("ParseRawMsg2Html processing... ") vars := mux.Vars(r) idStr := vars["id"] id, _ := strconv.Atoi(idStr) traceData, err := dborm.XormGetTraceData(id) if err != nil { log.Error("Failed to dborm.XormGetTraceRawMsg:", err) services.ResponseInternalServerError500ProcessError(w, err) return } log.Trace("traceData:", traceData) filePath := traceData.DecMsg if traceData.DecMsg == "" { htmlFile := fmt.Sprintf("traceDecMessage-%d-%d.html", traceData.TaskID, traceData.ID) filePath = config.GetYamlConfig().OMC.FrontTraceDir + "/" + htmlFile command := fmt.Sprintf("/usr/local/omc/bin/data2html -f %s -t %d -i N%d -d %x", filePath, traceData.Timestamp, traceData.IfType, traceData.RawMsg) out, err := run.ExecCmd(command, "/") log.Tracef("Exec output: %v", string(out)) if err != nil { log.Errorf("Faile to ipdate2html:", err) services.ResponseInternalServerError500ProcessError(w, err) return } exist, err := global.FilePathExists(filePath) if err != nil { log.Errorf("Failed to stat:", err) services.ResponseInternalServerError500ProcessError(w, err) return } if !exist { err = errors.New(string(strings.ReplaceAll(string(out), "\n", ""))) services.ResponseInternalServerError500ProcessError(w, err) return } traceData.DecMsg = filePath _, err = dborm.XormUpdateTraceData(id, traceData) if err != nil { log.Errorf("Faile to XormUpdateTraceData:", err) services.ResponseInternalServerError500ProcessError(w, err) return } } services.ResponseHtmlContent(w, http.StatusOK, filePath) }