Files
be.ems/src/modules/trace/service/trace_task.go

324 lines
8.9 KiB
Go

package service
import (
"encoding/base64"
"encoding/json"
"fmt"
"net"
"strings"
"be.ems/src/framework/config"
"be.ems/src/framework/logger"
"be.ems/src/framework/socket"
"be.ems/src/framework/utils/date"
"be.ems/src/framework/utils/parse"
neFetchlink "be.ems/src/modules/network_element/fetch_link"
neService "be.ems/src/modules/network_element/service"
"be.ems/src/modules/trace/model"
"be.ems/src/modules/trace/repository"
wsService "be.ems/src/modules/ws/service"
)
// 实例化数据层 TraceTask 结构体
var NewTraceTask = &TraceTask{
udpService: socket.SocketUDP{},
traceTaskRepository: repository.NewTraceTask,
traceDataRepository: repository.NewTraceData,
}
// TraceTask 跟踪任务 服务层处理
type TraceTask struct {
// UDP服务对象
udpService socket.SocketUDP
// 跟踪_任务数据信息
traceTaskRepository *repository.TraceTask
// 跟踪_数据信息
traceDataRepository *repository.TraceData
}
// CreateUDP 创建UDP数据通道
func (r *TraceTask) CreateUDP() error {
// 跟踪配置是否开启
if v := config.Get("trace.enabled"); v != nil {
if !v.(bool) {
return nil
}
}
host := "127.0.0.1"
if v := config.Get("trace.host"); v != nil {
host = v.(string)
}
var port int64 = 33033
if v := config.Get("trace.port"); v != nil {
port = parse.Number(v)
}
// 初始化UDP服务
r.udpService = socket.SocketUDP{Addr: host, Port: port}
if _, err := r.udpService.New(); err != nil {
return err
}
// 接收处理UDP数据
go r.udpService.Resolve(func(conn *net.UDPConn) {
// 读取数据
buf := make([]byte, 2048)
n, err := conn.Read(buf)
if err != nil {
logger.Errorf("error reading from UDP connection: %s", err.Error())
return
}
logger.Infof("socket UDP: %s", string(buf[:n]))
// logger.Infof("socket UDP Base64: %s", base64.StdEncoding.EncodeToString(buf[:n]))
mData, err := UDPDataHandler(buf, n)
if err != nil {
logger.Errorf("udp resolve data fail: %s", err.Error())
return
}
taskId := parse.Number(mData["taskId"])
// 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{
TaskId: taskId,
IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string),
IfType: parse.Number(mData["ifType"]),
MsgType: parse.Number(mData["msgType"]),
MsgDirect: parse.Number(mData["msgDirect"]),
Length: parse.Number(mData["dataLen"]),
RawMsg: mData["dataInfo"].(string),
Timestamp: parse.Number(mData["timestamp"]),
DecMsg: mData["decMsg"].(string),
})
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE_NE, taskId), taskId)
}
})
// ============ 测试接收网元UDP发过来的数据
// 初始化TCP服务 后续调整TODO
tcpService := socket.SocketTCP{Addr: host, Port: port + 1}
if _, err := tcpService.New(); err != nil {
return err
}
// 接收处理TCP数据
go tcpService.Resolve(func(conn *net.Conn) {
c := (*conn)
// 读取数据
buf := make([]byte, 2048)
n, err := c.Read(buf)
if err != nil {
logger.Errorf("error reading from TCP connection: %s", err.Error())
return
}
logger.Infof("socket TCP: %s", string(buf[:n]))
deData, _ := base64.StdEncoding.DecodeString(string(buf[:n]))
logger.Infof("socket TCP Base64: %s", deData)
mData, err := UDPDataHandler(deData, len(deData))
if err != nil {
logger.Errorf("tcp resolve data fail: %s", err.Error())
return
}
taskId := parse.Number(mData["taskId"])
// 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{
TaskId: taskId,
IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string),
IfType: parse.Number(mData["ifType"]),
MsgType: parse.Number(mData["msgType"]),
MsgDirect: parse.Number(mData["msgDirect"]),
Length: parse.Number(mData["dataLen"]),
RawMsg: mData["dataInfo"].(string),
Timestamp: parse.Number(mData["timestamp"]),
DecMsg: mData["decMsg"].(string),
})
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE_NE, taskId), taskId)
}
})
return nil
}
// CloseUDP 关闭UDP数据通道
func (r *TraceTask) CloseUDP() {
r.udpService.Close()
}
// SelectPage 根据条件分页查询
func (r *TraceTask) SelectPage(query map[string]any) map[string]any {
return r.traceTaskRepository.SelectPage(query)
}
// SelectById 通过ID查询
func (r *TraceTask) SelectById(id string) model.TraceTask {
tasks := r.traceTaskRepository.SelectByIds([]string{id})
if len(tasks) > 0 {
return tasks[0]
}
return model.TraceTask{}
}
// Insert 新增信息
func (r *TraceTask) Insert(task model.TraceTask) error {
// 跟踪配置是否开启
if v := config.Get("trace.enabled"); v != nil {
if !v.(bool) {
return fmt.Errorf("tracking is not enabled")
}
}
host := "127.0.0.1"
if v := config.Get("trace.host"); v != nil {
host = v.(string)
}
var port int64 = 33033
if v := config.Get("trace.port"); v != nil {
port = parse.Number(v)
}
task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port)
// 查询网元获取IP
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(task.NeType, task.NeId)
if neInfo.NeId != task.NeId || neInfo.IP == "" {
return fmt.Errorf("app.common.noNEInfo")
}
traceId := r.traceTaskRepository.LastID() + 1 // 生成任务ID < 65535
task.TraceId = fmt.Sprint(traceId)
// 发送任务给网元
data := map[string]any{
"neType": neInfo.NeType,
"neId": neInfo.NeId,
"notifyUrl": task.NotifyUrl,
"id": traceId,
"startTime": date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS),
"endTime": date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS),
}
switch task.TraceType {
case "1": // Interface
data["traceType"] = "Interface"
data["interfaces"] = strings.Split(task.Interfaces, ",")
case "2": // Device
data["traceType"] = "Device"
data["ueIp"] = task.UeIp
data["srcIp"] = task.SrcIp
data["dstIp"] = task.DstIp
data["signalPort"] = task.SignalPort
task.UeIp = neInfo.IP
case "3": // UE
data["traceType"] = "UE"
data["imsi"] = task.IMSI
data["msisdn"] = task.MSISDN
default:
return fmt.Errorf("trace type is not disabled")
}
msg, err := neFetchlink.NeTraceAdd(neInfo, data)
if err != nil {
return err
}
s, _ := json.Marshal(msg)
task.FetchMsg = string(s)
// 插入数据库
r.traceTaskRepository.Insert(task)
return nil
}
// Update 修改信息
func (r *TraceTask) Update(task model.TraceTask) error {
// 跟踪配置是否开启
if v := config.Get("trace.enabled"); v != nil {
if !v.(bool) {
return fmt.Errorf("tracking is not enabled")
}
}
host := "127.0.0.1"
if v := config.Get("trace.host"); v != nil {
host = v.(string)
}
var port int64 = 33033
if v := config.Get("trace.port"); v != nil {
port = parse.Number(v)
}
task.NotifyUrl = fmt.Sprintf("udp:%s:%d", host, port)
// 查询网元获取IP
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(task.NeType, task.NeId)
if neInfo.NeId != task.NeId || neInfo.IP == "" {
return fmt.Errorf("app.common.noNEInfo")
}
// 查询网元任务信息
if msg, err := neFetchlink.NeTraceInfo(neInfo, task.TraceId); err == nil {
s, _ := json.Marshal(msg)
task.FetchMsg = string(s)
// 修改任务信息
data := map[string]any{
"neType": neInfo.NeType,
"neId": neInfo.NeId,
"notifyUrl": task.NotifyUrl,
"id": parse.Number(task.TraceId),
"startTime": date.ParseDateToStr(task.StartTime, date.YYYY_MM_DD_HH_MM_SS),
"endTime": date.ParseDateToStr(task.EndTime, date.YYYY_MM_DD_HH_MM_SS),
}
switch task.TraceType {
case "1": // Interface
data["traceType"] = "Interface"
data["interfaces"] = strings.Split(task.Interfaces, ",")
case "2": // Device
task.UeIp = neInfo.IP
data["traceType"] = "Device"
data["ueIp"] = task.UeIp
data["srcIp"] = task.SrcIp
data["dstIp"] = task.DstIp
data["signalPort"] = task.SignalPort
case "3": // UE
data["traceType"] = "UE"
data["imsi"] = task.IMSI
data["msisdn"] = task.MSISDN
default:
return fmt.Errorf("trace type is not disabled")
}
neFetchlink.NeTraceEdit(neInfo, data)
}
// 更新数据库
r.traceTaskRepository.Update(task)
return nil
}
// DeleteByIds 批量删除信息
func (r *TraceTask) DeleteByIds(ids []string) (int64, error) {
// 检查是否存在
rows := r.traceTaskRepository.SelectByIds(ids)
if len(rows) <= 0 {
return 0, fmt.Errorf("not data")
}
if len(rows) == len(ids) {
// 停止任务
for _, v := range rows {
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID(v.NeType, v.NeId)
if neInfo.NeId != v.NeId || neInfo.IP == "" {
continue
}
neFetchlink.NeTraceDelete(neInfo, v.TraceId)
}
num := r.traceTaskRepository.DeleteByIds(ids)
return num, nil
}
// 删除信息失败!
return 0, fmt.Errorf("delete fail")
}