package service import ( "encoding/base64" "fmt" "net" "strings" "be.ems/src/framework/config" "be.ems/src/framework/logger" "be.ems/src/framework/socket" "be.ems/src/framework/utils/date" "be.ems/src/framework/utils/parse" neFetchlink "be.ems/src/modules/network_element/fetch_link" neModel "be.ems/src/modules/network_element/model" neService "be.ems/src/modules/network_element/service" "be.ems/src/modules/trace/model" "be.ems/src/modules/trace/repository" wsService "be.ems/src/modules/ws/service" ) // 实例化数据层 TraceTask 结构体 var NewTraceTask = &TraceTask{ udpService: socket.SocketUDP{}, tcpService: socket.SocketTCP{}, traceTaskRepository: repository.NewTraceTask, traceDataRepository: repository.NewTraceData, } // TraceTask 跟踪任务 服务层处理 type TraceTask struct { udpService socket.SocketUDP // UDP服务对象 tcpService socket.SocketTCP // 测试用,后续调整TODO traceTaskRepository *repository.TraceTask // 跟踪_任务数据信息 traceDataRepository *repository.TraceData // 跟踪_数据信息 } // CreateUDP 创建UDP数据通道 func (r *TraceTask) CreateUDP(reload bool) error { if reload { r.CloseUDP() // 关闭之前的UDP服务 } // 跟踪配置是否开启 host, port, err := r.traceNotify() if err != nil { return err } // 初始化UDP服务 r.udpService = socket.SocketUDP{Addr: host, Port: port} if _, err := r.udpService.New(); err != nil { return err } // 接收处理UDP数据 go r.udpService.Resolve(func(conn *net.UDPConn, err error) { if err != nil { logger.Errorf("UDP Resolve %s", err.Error()) return } // 读取数据 buf := make([]byte, 10*1048) n, addr, err := conn.ReadFromUDPAddrPort(buf) if err != nil { logger.Errorf("UDP Resolve ReadFromUDPAddrPort Error: %s", err.Error()) return } // logger.Infof("socket UDP: %s", string(buf[:n])) logger.Infof("socket UDP Base64 Encode: %s", base64.StdEncoding.EncodeToString(buf[:n])) // 解析数据 if err := r.pasreUDPData(buf[:n]); err != nil { logger.Errorf("UDP Resolve UDPData Error: %s", err.Error()) return } // 发送响应 if _, err := conn.WriteToUDPAddrPort([]byte("udp>"), addr); err != nil { logger.Errorf("UDP Resolve WriteToUDPAddrPort Error: %s", err.Error()) } buf = nil }) // ============ 本地测试接收网元UDP发过来的数据 后续调整TODO if config.Env() == "local" { // 初始化TCP服务 r.tcpService = socket.SocketTCP{Addr: host, Port: port + 1} if _, err := r.tcpService.New(); err != nil { return err } // 接收处理TCP数据 go r.tcpService.Resolve(func(conn *net.Conn, err error) { if err != nil { logger.Errorf("TCP Resolve %s", err.Error()) return } c := (*conn) // 读取数据 buf := make([]byte, 2048) n, err := c.Read(buf) if err != nil { logger.Errorf("TCP Resolve Read Error: %s", err.Error()) return } // logger.Infof("socket TCP: %s", string(buf[:n])) deData, _ := base64.StdEncoding.DecodeString(string(buf[:n])) // logger.Infof("socket TCP Base64 Decode: %s", deData) if err := r.pasreUDPData(deData); err != nil { logger.Errorf("TCP Resolve UDPData Error: %s", err.Error()) return } // 发送响应 if _, err = c.Write([]byte("tcp>")); err != nil { logger.Errorf("TCP Resolve Write Error: %s", err.Error()) } buf = nil }) } return nil } // pasreUDPData 解析数据 func (r *TraceTask) pasreUDPData(buf []byte) error { data, err := traceHandler(buf) if err != nil { logger.Errorf("UDP Resolve UDPDataHandler Error: %s", err.Error()) return err } taskId := parse.Number(data.NfTraceMsg.TraceId) // 插入数据库做记录 item := model.TraceData{ TraceId: taskId, IMSI: data.NfTraceMsg.IMSI, SrcAddr: data.NfTraceMsg.SrcIpStr, DstAddr: data.NfTraceMsg.DstIpStr, IfType: data.NfTraceMsg.IfType, MsgType: data.NfTraceMsg.MsgType, MsgDirect: data.NfTraceMsg.MsgDirect, MsgNe: data.NfTraceMsg.NfName, MsgEvent: data.NfTraceMsg.MsgEvent, Length: int64(data.TracePayloadLen), RawMsg: base64.StdEncoding.EncodeToString(data.TracePayload), Timestamp: data.NfTraceMsg.Timestamp, } item.ID = r.traceDataRepository.Insert(item) // 推送到ws订阅组 item.RawMsg = "" // 不推送原始数据 wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%d", wsService.GROUP_TRACE_NE, taskId), item) return err } // CloseUDP 关闭UDP数据通道 func (r *TraceTask) CloseUDP() { r.udpService.Close() r.tcpService.Close() } // FindByPage 根据条件分页查询 func (r TraceTask) FindByPage(query map[string]string) ([]model.TraceTask, int64) { return r.traceTaskRepository.SelectByPage(query) } // FindById 通过ID查询 func (r TraceTask) FindById(id int64) model.TraceTask { tasks := r.traceTaskRepository.SelectByIds([]int64{id}) if len(tasks) > 0 { return tasks[0] } return model.TraceTask{} } // Insert 新增信息 func (r TraceTask) Insert(task model.TraceTask) error { if err := r.createTaskToNe(&task, false); err != nil { return err } // 插入数据库 insertId := r.traceTaskRepository.Insert(task) if insertId <= 0 { return fmt.Errorf("insert task error") } return nil } // CreateTaskToNe 创建任务到网元 func (r TraceTask) createTaskToNe(task *model.TraceTask, ignoreErr bool) error { // 跟踪配置是否开启 host, port, err := r.traceNotify() if err != nil { return err } task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port) // 网元列表 neList := strings.Split(task.NeList, ",") if len(neList) <= 0 { return fmt.Errorf("ne list is empty") } // 生成任务ID if task.TraceId == "" { task.TraceId = fmt.Sprint(r.traceTaskRepository.LastID() + 1) // 生成任务ID < 65535 } // 发送任务给网元 errNe := []string{} for _, neTypeID := range neList { neTypeIDArr := strings.Split(neTypeID, "_") if len(neTypeIDArr) != 2 { logger.Warnf("ne type id is error") continue } neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { logger.Warnf("ne type id is not exist") continue } if err := r.traceNeTask(neInfo, *task); err != nil { logger.Errorf("ne type id is %s to %s error: %s", task.TraceId, neTypeID, err.Error()) errNe = append(errNe, neTypeID) continue } } if len(errNe) <= 0 && !ignoreErr { return nil } // 移除任务 for _, neTypeID := range neList { neTypeIDArr := strings.Split(neTypeID, "_") if len(neTypeIDArr) != 2 { continue } neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { continue } neFetchlink.NeTraceDelete(neInfo, task.TraceId) } return fmt.Errorf("task to ne error: %s", strings.Join(errNe, ",")) } // traceNotify 网元通知地址 func (r TraceTask) traceNotify() (string, int64, error) { if v := config.Get("trace.enabled"); !parse.Boolean(v) { return "", 0, fmt.Errorf("trace is not enabled") } host := "127.0.0.1" if v := config.Get("trace.host"); v != nil { host = v.(string) } var port int64 = 33033 if v := config.Get("trace.port"); v != nil { port = parse.Number(v) } return host, port, nil } // traceNeTask 网元创建任务 func (r TraceTask) traceNeTask(neInfo neModel.NeInfo, task model.TraceTask) error { data := model.TraceReq{ TraceId: parse.Number(task.TraceId), NotifyUrl: task.NotifyUrl, StartTime: date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS), EndTime: date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS), } switch task.TraceType { case "1": // Interface data.TraceType = "Interface" data.Interfaces = strings.Split(task.Interfaces, ",") case "2": // Device data.TraceType = "Device" data.SrcIp = task.SrcIp data.DstIp = task.DstIp case "3": // UE data.TraceType = "UE" data.IMSI = fmt.Sprintf("imsi-%s", task.IMSI) default: return fmt.Errorf("trace type is not support") } msg, err := neFetchlink.NeTraceAdd(neInfo, data) if err != nil { return err } if v, ok := msg["cause"]; ok { return fmt.Errorf("trace task add failed, %v", v) } return nil } // DeleteByIds 批量删除信息 func (r TraceTask) DeleteByIds(ids []int64) (int64, error) { // 检查是否存在 rows := r.traceTaskRepository.SelectByIds(ids) if len(rows) <= 0 { return 0, fmt.Errorf("not data") } if len(rows) == len(ids) { // 删除数据同时给网元发送停止任务 for _, v := range rows { // 删除数据 r.traceDataRepository.DeleteByTraceId(v.TraceId) // 网元列表 neList := strings.Split(v.NeList, ",") if len(neList) <= 0 { continue } // 停止任务 for _, neTypeID := range neList { neTypeIDArr := strings.Split(neTypeID, "_") if len(neTypeIDArr) != 2 { continue } neInfo := neService.NewNeInfo.FindByNeTypeAndNeID(neTypeIDArr[0], neTypeIDArr[1]) if neInfo.NeType != neTypeIDArr[0] || neInfo.IP == "" { continue } neFetchlink.NeTraceDelete(neInfo, v.TraceId) } } num := r.traceTaskRepository.DeleteByIds(ids) return num, nil } // 删除信息失败! return 0, fmt.Errorf("delete fail") } // RunUnstopped 启动跟踪未停止的任务 func (r TraceTask) RunUnstopped(neType string, neId string) { neStr := fmt.Sprintf("%s_%s", neType, neId) tasks := r.traceTaskRepository.SelectByUnstopped(neStr) for _, task := range tasks { r.createTaskToNe(&task, true) } }