Files
nms_cxy/features/trace/trace.go
2024-03-12 10:58:33 +08:00

438 lines
13 KiB
Go

package trace
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/go-resty/resty/v2"
"github.com/gorilla/mux"
"nms_nbi/lib/dborm"
"nms_nbi/lib/global"
"nms_nbi/lib/log"
"nms_nbi/lib/run"
"nms_nbi/lib/services"
"nms_nbi/restagent/config"
)
var (
UriTraceTaskV1 = config.DefaultUriPrefix + "/traceManagement/v1/subscriptions"
UriTraceTask = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/subscriptions"
UriTraceRawMsg = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/rawMessage/{id}"
UriTraceDecMsg = config.DefaultUriPrefix + "/traceManagement/{apiVersion}/decMessage/{id}" // decode message api
CustomUriTraceTaskV1 = config.UriPrefix + "/traceManagement/v1/subscriptions"
CustomUriTraceTask = config.UriPrefix + "/traceManagement/{apiVersion}/subscriptions"
)
type TraceTask struct {
Id int `json:"id" xorm:"pk 'id' autoincr"`
TraceType string `json:"traceType"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Imsi string `json:"imsi"`
Msisdn string `json:"msisdn"`
SrcIp string `json:"srcIp"`
DstIp string `json:"dstIp"`
SignalPort int16 `json:"signalPort"`
NeType string `json:"neType"`
NeId string `json:"neId"`
UeIp string `json:"ueIp"`
Interfaces []string `json:"interfaces"`
NotifyUrl string `json:"notifyUrl" xorm:"-"`
Status string `json:"-" xorm:"status"`
SuccNEs []string `json:"-" xorm:"succ_nes"`
FailNEs []string `json:"-" xorm:"fail_nes"`
AccountID string `json:"accountId" xorm:"account_id"`
Comment string `json:"comment" xorm:"comment"`
UpdateTime string `json:"-" xorm:"-"`
}
var client = resty.New()
/*
func init() {
client.SetTimeout(3 * time.Second)
}
*/
// Post trace task to NF/NFs
func PostTraceTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PostTraceTaskToNF processing... ")
//vars := mux.Vars(r)
token, err := services.CheckFrontValidRequest(w, r)
if err != nil {
log.Error("Request error:", err)
return
}
log.Debug("AccessToken:", token)
body, err := io.ReadAll(io.LimitReader(r.Body, int64(config.GetYamlConfig().Params.UriMaxLen)))
if err != nil {
log.Error("io.ReadAll is failed:", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
log.Trace("body:", string(body))
traceTask := new(TraceTask)
_ = json.Unmarshal(body, traceTask)
log.Debug("traceTask:", traceTask)
var neTypes []string
// do not set device
if traceTask.NeType == "" {
// query neType by interface
if len(traceTask.Interfaces) > 0 {
err := dborm.XormGetSingleColStringArrayByIn("trace_info", "ne_type", "interface", traceTask.Interfaces, &neTypes)
if err != nil {
log.Error("Failed to dborm.XormGetSingleCol:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
} else {
neTypes = []string{"AMF", "SMF", "UDM", "AUSF", "UPF"}
}
} else {
neTypes = append(neTypes, traceTask.NeType)
}
log.Debug("neTypes:", neTypes)
traceTask.Status = "Inactive"
_, err = dborm.XormInsertTableOne("trace_task", traceTask)
if err != nil {
log.Error("Failed to dborm.XormInsertTableOne:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
traceTask.NotifyUrl = config.GetYamlConfig().OMC.GtpUri
log.Trace("traceTask:", traceTask)
for _, neType := range neTypes {
var neInfos []dborm.NeInfo
if traceTask.NeId == "" {
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
} else {
neInfo, err := dborm.XormGetNeInfo(neType, traceTask.NeId)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
neInfos = append(neInfos, *neInfo)
}
for _, neInfo := range neInfos {
hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port)
requestURI2NF := fmt.Sprintf("%s%s", hostUri, UriTraceTaskV1)
log.Debug("requestURI2NF:", requestURI2NF)
body, _ := json.Marshal(traceTask)
log.Debug("body:", string(body))
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(body).
Post(requestURI2NF)
if err != nil {
log.Error("Failed to Post:", err)
failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId)
traceTask.FailNEs = append(traceTask.FailNEs, failNE)
} else {
switch resp.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
succNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId)
traceTask.SuccNEs = append(traceTask.SuccNEs, succNE)
default:
log.Warnf("Post return code:%d, message:%s", resp.StatusCode(), string(resp.Body()))
failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId)
traceTask.FailNEs = append(traceTask.FailNEs, failNE)
}
}
}
}
if len(traceTask.SuccNEs) > 0 {
traceTask.Status = "Active"
_, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask)
if err != nil {
log.Error("Failed to dborm.XormUpdateTableById:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
services.ResponseStatusOK204NoContent(w)
} else {
traceTask.Status = "Failed"
_, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask)
if err != nil {
log.Error("Failed to dborm.XormUpdateTableById:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
err = global.ErrTraceFailedDistributeToNEs
log.Error(err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
func PutTraceTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("PutTraceTaskToNF processing... ")
//vars := mux.Vars(r)
token, err := services.CheckFrontValidRequest(w, r)
if err != nil {
log.Error("Request error:", err)
return
}
log.Debug("AccessToken:", token)
body, err := io.ReadAll(io.LimitReader(r.Body, int64(config.GetYamlConfig().Params.UriMaxLen)))
if err != nil {
log.Error("io.ReadAll is failed:", err)
services.ResponseNotFound404UriNotExist(w, r)
return
}
traceTask := new(TraceTask)
_ = json.Unmarshal(body, traceTask)
traceTask.NotifyUrl = config.GetYamlConfig().OMC.GtpUri
log.Debug("traceTask:", traceTask)
var neTypes []string
// do not set device
if traceTask.NeType == "" {
// query neType by interface
if len(traceTask.Interfaces) > 0 {
err := dborm.XormGetSingleColStringArrayByIn("trace_info", "ne_type", "interface", traceTask.Interfaces, &neTypes)
if err != nil {
log.Error("Failed to dborm.XormGetSingleColStringArrayByIn:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
} else {
neTypes = []string{"AMF", "SMF", "UDM", "AUSF", "UPF"}
}
} else {
neTypes = append(neTypes, traceTask.NeType)
}
log.Debug("neTypes:", neTypes)
for _, neType := range neTypes {
var neInfos []dborm.NeInfo
if traceTask.NeId == "" {
err := dborm.XormGetNeInfoByNeType(neType, &neInfos)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
} else {
neInfo, err := dborm.XormGetNeInfo(neType, traceTask.NeId)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfoByNeType:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
neInfos = append(neInfos, *neInfo)
}
for _, neInfo := range neInfos {
hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port)
requestURI2NF := fmt.Sprintf("%s%s", hostUri, UriTraceTaskV1)
log.Debug("requestURI2NF:", requestURI2NF)
body, _ := json.Marshal(traceTask)
log.Debug("body:", string(body))
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
SetBody(body).
Put(requestURI2NF)
if err != nil {
log.Error("Failed to Put:", err)
failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId)
traceTask.FailNEs = append(traceTask.FailNEs, failNE)
} else {
switch resp.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
succNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId)
traceTask.SuccNEs = append(traceTask.SuccNEs, succNE)
default:
log.Warn("Post return code:%d, message:%s", resp.StatusCode(), string(resp.Body()))
failNE := fmt.Sprintf("%s.%s", neInfo.NeType, neInfo.NeId)
traceTask.FailNEs = append(traceTask.FailNEs, failNE)
}
}
}
}
if len(traceTask.SuccNEs) > 0 {
traceTask.Status = "Active"
_, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask)
if err != nil {
log.Error("Failed to dborm.XormUpdateTableById:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
services.ResponseStatusOK204NoContent(w)
} else {
traceTask.Status = "Failed"
_, err = dborm.XormUpdateTableById(traceTask.Id, "trace_task", traceTask)
if err != nil {
log.Error("Failed to dborm.XormUpdateTableById:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
err = global.ErrTraceFailedDistributeToNEs
log.Error(err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
func DeleteTraceTaskToNF(w http.ResponseWriter, r *http.Request) {
log.Debug("DeleteTraceTaskToNF processing... ")
token, err := services.CheckFrontValidRequest(w, r)
if err != nil {
log.Error("Request error:", err)
return
}
log.Debug("AccessToken:", token)
vars := r.URL.Query()
ids, ok := vars["id"]
if !ok || len(ids) == 0 {
err = global.ErrTraceNotCarriedTaskID
log.Error(err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Debug("ids:", ids)
for _, id := range ids {
log.Debug("id:", id)
var succNes []string
err = dborm.XormGetColStringArrayByWhere("trace_task", "succ_nes", fmt.Sprintf("id=%s", id), &succNes)
if err != nil {
log.Error("Failed to dborm.XormGetSingleColStringArrayByWhere:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Debug("succNes:", succNes)
nes := new([]string)
if len(succNes) > 0 {
_ = json.Unmarshal([]byte(succNes[0]), nes)
}
log.Debug("nes:", nes)
for _, ne := range *nes {
i := strings.Index(ne, ".")
neType := ne[0:i]
neId := ne[i+1:]
log.Debugf("ne:%s neType:%s neId:%s", ne, neType, neId)
neInfo, err := dborm.XormGetNeInfo(neType, neId)
if err != nil {
log.Error("Failed to dborm.XormGetNeInfo:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port)
requestURI2NF := fmt.Sprintf("%s%s?id=%s", hostUri, UriTraceTaskV1, id)
log.Debug("requestURI2NF:", requestURI2NF)
_, err = client.R().
EnableTrace().
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Delete(requestURI2NF)
if err != nil {
log.Error("Failed to Delete:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
_, err = dborm.XormDeleteDataByWhere(fmt.Sprintf("id=%s", id), "trace_task")
if err != nil {
log.Error("Failed to dborm.XormDeleteDataByWhere:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
services.ResponseStatusOK204NoContent(w)
}
func GetRawMessage(w http.ResponseWriter, r *http.Request) {
log.Debug("GetRawMessage processing... ")
}
func ParseRawMsg2Html(w http.ResponseWriter, r *http.Request) {
log.Debug("ParseRawMsg2Html processing... ")
vars := mux.Vars(r)
idStr := vars["id"]
id, _ := strconv.Atoi(idStr)
traceData, err := dborm.XormGetTraceData(id)
if err != nil {
log.Error("Failed to dborm.XormGetTraceRawMsg:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
log.Trace("traceData:", traceData)
filePath := traceData.DecMsg
if traceData.DecMsg == "" {
htmlFile := fmt.Sprintf("traceDecMessage-%d-%d.html", traceData.TaskID, traceData.ID)
filePath = config.GetYamlConfig().OMC.FrontTraceDir + "/" + htmlFile
command := fmt.Sprintf("/usr/local/omc/bin/data2html -f %s -t %d -i N%d -d %x", filePath, traceData.Timestamp, traceData.IfType, traceData.RawMsg)
out, err := run.ExecCmd(command, "/")
log.Tracef("Exec output: %v", string(out))
if err != nil {
log.Errorf("Faile to ipdate2html:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
exist, err := global.FilePathExists(filePath)
if err != nil {
log.Errorf("Failed to stat:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
if !exist {
err = errors.New(string(strings.ReplaceAll(string(out), "\n", "")))
services.ResponseInternalServerError500ProcessError(w, err)
return
}
traceData.DecMsg = filePath
_, err = dborm.XormUpdateTraceData(id, traceData)
if err != nil {
log.Errorf("Faile to XormUpdateTraceData:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
}
}
services.ResponseHtmlContent(w, http.StatusOK, filePath)
}