Merge branch 'main' into multi-tenant

This commit is contained in:
2024-09-30 19:25:19 +08:00
44 changed files with 9391 additions and 805 deletions

View File

@@ -184,7 +184,7 @@ func GetKeys(source string, pattern string) ([]string, error) {
// 循环遍历获取匹配的键
for {
// 使用 SCAN 命令获取匹配的键
batchKeys, nextCursor, err := rdb.Scan(ctx, cursor, pattern, 100).Result()
batchKeys, nextCursor, err := rdb.Scan(ctx, cursor, pattern, 1000).Result()
if err != nil {
logger.Errorf("Failed to scan keys: %v", err)
return keys, err

View File

@@ -10,10 +10,10 @@ import (
// SocketTCP TCP服务端
type SocketTCP struct {
Addr string `json:"addr"` // 主机地址
Port int64 `json:"port"` // 端口
Listen *net.Listener `json:"listen"`
StopChan chan struct{} `json:"stop"` // 停止信号
Addr string `json:"addr"` // 主机地址
Port int64 `json:"port"` // 端口
Listener *net.TCPListener `json:"listener"`
StopChan chan struct{} `json:"stop"` // 停止信号
}
// New 创建TCP服务端
@@ -26,53 +26,52 @@ func (s *SocketTCP) New() (*SocketTCP, error) {
}
address := fmt.Sprintf("%s:%d", s.Addr, s.Port)
ln, err := net.Listen(proto, address)
// 解析 TCP 地址
tcpAddr, err := net.ResolveTCPAddr(proto, address)
if err != nil {
return nil, err
}
s.Listen = &ln
// 监听 TCP 地址
listener, err := net.ListenTCP(proto, tcpAddr)
if err != nil {
return nil, err
}
s.Listener = listener
s.StopChan = make(chan struct{}, 1)
return s, nil
}
// Close 关闭当前TCP服务端
func (s *SocketTCP) Close() {
if s.Listen != nil {
if s.Listener != nil {
s.StopChan <- struct{}{}
(*s.Listen).Close()
(*s.Listener).Close()
}
}
// Resolve 处理消息
func (s *SocketTCP) Resolve(bufferSize int, callback func([]byte, int)) error {
if s.Listen == nil {
func (s *SocketTCP) Resolve(callback func(conn *net.Conn)) error {
if s.Listener == nil {
return fmt.Errorf("tcp service not created")
}
ln := *s.Listen
buffer := make([]byte, bufferSize)
listener := *s.Listener
for {
select {
case <-s.StopChan:
return fmt.Errorf("udp service stop")
default:
conn, err := ln.Accept()
conn, err := listener.Accept()
if err != nil {
logger.Errorf("Error accepting connection: %v ", err)
continue
}
defer conn.Close()
// 读取数据
n, err := conn.Read(buffer)
if err != nil {
fmt.Println("Error reading from TCP connection:", err)
continue
}
callback(buffer, n)
// 处理连接
callback(&conn)
// 发送响应
if _, err = conn.Write([]byte("tcp>")); err != nil {

View File

@@ -50,29 +50,20 @@ func (s *SocketUDP) Close() {
}
// Resolve 处理消息
func (s *SocketUDP) Resolve(bufferSize int, callback func([]byte, int)) error {
func (s *SocketUDP) Resolve(callback func(*net.UDPConn)) error {
if s.Conn == nil {
return fmt.Errorf("udp service not created")
}
buffer := make([]byte, bufferSize)
for {
select {
case <-s.StopChan:
return fmt.Errorf("udp service stop")
default:
// 读取数据
n, addr, err := s.Conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading from UDP connection:", err)
continue
}
callback(buffer, n)
callback(s.Conn)
// 发送响应
if _, err = s.Conn.WriteToUDP([]byte("udp>"), addr); err != nil {
if _, err := s.Conn.WriteTo([]byte("udp>"), s.Conn.RemoteAddr()); err != nil {
fmt.Println("Error sending response:", err)
}
}

View File

@@ -293,7 +293,7 @@ func (s *SMFController) SubUserList(c *gin.Context) {
}
// 网元直连
data, err := neFetchlink.SMFSubInfo(neInfo, map[string]string{
data, err := neFetchlink.SMFSubInfoList(neInfo, map[string]string{
"imsi": query.IMSI,
"msisdn": query.MSISDN,
"upstate": query.Upstate,
@@ -306,24 +306,23 @@ func (s *SMFController) SubUserList(c *gin.Context) {
// 对数据进行处理去掉前缀并加入imsi拓展信息
rows := data["rows"].([]any)
arr := &rows
for i := range *arr {
item := (*arr)[i].(map[string]any)
if v, ok := item["imsi"]; ok && v != nil {
imsiStr := v.(string)
imsiStr = strings.TrimPrefix(imsiStr, "imsi-")
item["imsi"] = imsiStr
// 查UDM拓展信息
info := s.udmUserInfoService.SelectByIMSIAndNeID(imsiStr, "")
item["remark"] = info.Remark
}
if v, ok := item["msisdn"]; ok && v != nil {
item["msisdn"] = strings.TrimPrefix(v.(string), "msisdn-")
if len(rows) > 0 {
arr := &rows
for i := range *arr {
item := (*arr)[i].(map[string]any)
if v, ok := item["imsi"]; ok && v != nil {
imsiStr := v.(string)
imsiStr = strings.TrimPrefix(imsiStr, "imsi-")
item["imsi"] = imsiStr
// 查UDM拓展信息
info := s.udmUserInfoService.SelectByIMSIAndNeID(imsiStr, "")
item["remark"] = info.Remark
}
if v, ok := item["msisdn"]; ok && v != nil {
item["msisdn"] = strings.TrimPrefix(v.(string), "msisdn-")
}
}
}
c.JSON(200, result.Ok(map[string]any{
"total": data["total"],
"rows": data["rows"],
}))
c.JSON(200, result.Ok(data))
}

View File

@@ -10,15 +10,13 @@ import (
"be.ems/src/modules/network_element/model"
)
// SMFSubInfo SMF在线订阅用户列表信息
// SMFSubInfoList SMF在线订阅用户列表信息
//
// 查询参数 {"imsi":"360000100000130","msisdn":"8612300000130","upstate":"Inactive","pageNum":"1"}
//
// 返回结果 {"rows":[],"total":0}
func SMFSubInfo(neInfo model.NeInfo, data map[string]string) (map[string]any, error) {
// neUrl := "http://127.0.0.1:4523/m1/3157310-1528434-82b449ee/api/rest/ueManagement/v1/elementType/smf/objectType/ueInfo?apifoxApiId=150640017"
neUrl := fmt.Sprintf("http://%s:%s/api/rest/ueManagement/v1/elementType/smf/objectType/ueInfo", "172.16.20.150", "33030")
// neUrl := fmt.Sprintf("http://%s:%d/api/rest/ueManagement/v1/elementType/smf/objectType/ueInfo", neInfo.IP, neInfo.Port)
func SMFSubInfoList(neInfo model.NeInfo, data map[string]string) (map[string]any, error) {
neUrl := fmt.Sprintf("http://%s:%d/api/rest/ueManagement/v1/elementType/smf/objectType/ueInfo", neInfo.IP, neInfo.Port)
// 查询参数拼接
query := []string{}
if v, ok := data["imsi"]; ok && v != "" {

View File

@@ -0,0 +1,111 @@
package controller
import (
"be.ems/src/framework/i18n"
"be.ems/src/framework/utils/ctx"
"be.ems/src/framework/vo/result"
traceService "be.ems/src/modules/trace/service"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
)
// 实例化控制层 PacketController 结构体
var NewPacket = &PacketController{
packetService: traceService.NewPacket,
}
// 信令跟踪
//
// PATH /trace/packet
type PacketController struct {
packetService *traceService.Packet // 信令跟踪服务
}
// 网元跟踪网卡设备列表
//
// GET /devices
func (s *PacketController) Devices(c *gin.Context) {
data := s.packetService.NetworkDevices()
c.JSON(200, result.OkData(data))
}
// 网元跟踪开始
//
// POST /start
func (s *PacketController) Start(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
Device string `json:"device" binding:"required"` // 网卡设备
TaskNo string `json:"taskNo" binding:"required"` // 任务编号
Output string `json:"output" ` // 输出PCAP文件路径为空则不输出
}
if err := c.ShouldBindBodyWith(&body, binding.JSON); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
msg, err := s.packetService.LiveStart(body.TaskNo, body.Device, body.Output)
if err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return
}
c.JSON(200, result.OkData(msg))
}
// 网元跟踪结束
//
// POST /stop
func (s *PacketController) Stop(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
TaskNo string `json:"taskNo" binding:"required"` // 任务编号
}
if err := c.ShouldBindBodyWith(&body, binding.JSON); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if err := s.packetService.LiveStop(body.TaskNo); err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return
}
c.JSON(200, result.Ok(nil))
}
// 网元跟踪过滤
//
// PUT /filter
func (s *PacketController) Filter(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
TaskNo string `json:"taskNo" binding:"required"` // 任务编号
Expr string `json:"expr" binding:"required"` // 过滤表达式port 33030 or 33040
}
if err := c.ShouldBindBodyWith(&body, binding.JSON); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if err := s.packetService.LiveFilter(body.TaskNo, body.Expr); err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return
}
c.JSON(200, result.Ok(nil))
}
// 网元跟踪续期保活
//
// PUT /keep-alive
func (s *PacketController) KeepAlive(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
IntervalIn int `json:"intervalIn" ` // 服务续约的频率默认设置为60秒
Duration int `json:"duration" ` // 服务失效的时间默认设置为120秒
}
err := c.ShouldBindBodyWith(&body, binding.JSON)
if err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
c.JSON(200, result.Ok(nil))
}

View File

@@ -13,26 +13,24 @@ import (
"github.com/gin-gonic/gin/binding"
)
// 实例化控制层 TcpdumpController 结构体
var NewTcpdump = &TcpdumpController{
TcpdumpService: traceService.NewTCPdump,
// 实例化控制层 TCPdumpController 结构体
var NewTCPdump = &TCPdumpController{
tcpdumpService: traceService.NewTCPdump,
neInfoService: neService.NewNeInfoImpl,
}
// 信令抓包请求
// 信令抓包
//
// PATH /tcpdump
type TcpdumpController struct {
// 信令抓包服务
TcpdumpService *traceService.TCPdump
// 网元信息服务
neInfoService neService.INeInfo
type TCPdumpController struct {
tcpdumpService *traceService.TCPdump // 信令抓包服务
neInfoService neService.INeInfo // 网元信息服务
}
// 网元抓包PACP 开始
//
// POST /start
func (s *TcpdumpController) DumpStart(c *gin.Context) {
func (s *TCPdumpController) DumpStart(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
NeType string `json:"neType" binding:"required"` // 网元类型
@@ -45,7 +43,7 @@ func (s *TcpdumpController) DumpStart(c *gin.Context) {
return
}
taskCode, err := s.TcpdumpService.DumpStart(body.NeType, body.NeId, body.Cmd)
taskCode, err := s.tcpdumpService.DumpStart(body.NeType, body.NeId, body.Cmd)
if err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return
@@ -56,7 +54,7 @@ func (s *TcpdumpController) DumpStart(c *gin.Context) {
// 网元抓包PACP 结束
//
// POST /stop
func (s *TcpdumpController) DumpStop(c *gin.Context) {
func (s *TCPdumpController) DumpStop(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
NeType string `json:"neType" binding:"required"` // 网元类型
@@ -69,7 +67,7 @@ func (s *TcpdumpController) DumpStop(c *gin.Context) {
return
}
taskLog, err := s.TcpdumpService.DumpStop(body.NeType, body.NeId, body.TaskCode)
taskLog, err := s.tcpdumpService.DumpStop(body.NeType, body.NeId, body.TaskCode)
if err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return
@@ -80,7 +78,7 @@ func (s *TcpdumpController) DumpStop(c *gin.Context) {
// 网元抓包PACP 下载
//
// GET /download
func (s *TcpdumpController) DumpDownload(c *gin.Context) {
func (s *TCPdumpController) DumpDownload(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var query struct {
NeType string `form:"neType" binding:"required"` // 网元类型
@@ -93,7 +91,7 @@ func (s *TcpdumpController) DumpDownload(c *gin.Context) {
return
}
zipFilePath, err := s.TcpdumpService.DumpDownload(query.NeType, query.NeID, query.TaskCode)
zipFilePath, err := s.tcpdumpService.DumpDownload(query.NeType, query.NeID, query.TaskCode)
if err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return
@@ -110,7 +108,7 @@ func (s *TcpdumpController) DumpDownload(c *gin.Context) {
// UPF标准版内部抓包
//
// POST /upf
func (s *TcpdumpController) UPFTrace(c *gin.Context) {
func (s *TCPdumpController) UPFTrace(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
NeType string `json:"neType" binding:"required"` // 网元类型
@@ -123,7 +121,7 @@ func (s *TcpdumpController) UPFTrace(c *gin.Context) {
return
}
msg, err := s.TcpdumpService.UPFTrace(body.NeType, body.NeId, body.Cmd)
msg, err := s.tcpdumpService.UPFTrace(body.NeType, body.NeId, body.Cmd)
if err != nil {
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
return

View File

@@ -1,6 +1,9 @@
package controller
import (
"fmt"
"path/filepath"
"runtime"
"strings"
"be.ems/src/framework/i18n"
@@ -129,3 +132,24 @@ func (s *TraceTaskController) Remove(c *gin.Context) {
msg := i18n.TTemplate(language, "app.common.deleteSuccess", map[string]any{"num": rows})
c.JSON(200, result.OkMsg(msg))
}
// 跟踪任务文件
//
// GET /filePull
func (s *TraceTaskController) FilePull(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var querys struct {
TraceId string `form:"traceId" binding:"required"`
}
if err := c.ShouldBindQuery(&querys); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
fileName := fmt.Sprintf("task_%s.pcap", querys.TraceId)
localFilePath := filepath.Join("/tmp/omc/trace", fileName)
if runtime.GOOS == "windows" {
localFilePath = fmt.Sprintf("C:%s", localFilePath)
}
c.FileAttachment(localFilePath, fileName)
}

View File

@@ -0,0 +1,191 @@
package service
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/vo"
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/pcap"
"github.com/gopacket/gopacket/pcapgo"
)
// 实例化服务层 Packet 结构体
var NewPacket = &Packet{
taskMap: sync.Map{},
}
// 信令跟踪 服务层处理
type Packet struct {
taskMap sync.Map // 捕获任务
}
// task 任务信息
type task struct {
TaskNo string // 任务编号
Handle *pcap.Handle // 捕获句柄
File *os.File // 捕获信息输出文件句柄
Writer *pcapgo.Writer // 捕获信息输出句柄
Expire time.Time // 过期时间
context context.Context // 上下文 控制完成结束
}
// NetworkDevices 获取网卡设备信息
func (s *Packet) NetworkDevices() []vo.TreeSelect {
arr := make([]vo.TreeSelect, 0)
devices, err := pcap.FindAllDevs()
if err != nil {
logger.Errorf("interfaces find all devices err: %s", err.Error())
return arr
}
for _, device := range devices {
if len(device.Addresses) == 0 {
continue
}
lable := device.Description
if lable == "" {
lable = device.Name
}
item := vo.TreeSelect{
ID: device.Name,
Label: lable,
Children: []vo.TreeSelect{},
}
for _, address := range device.Addresses {
if address.IP != nil {
ip := address.IP.String()
item.Children = append(item.Children, vo.TreeSelect{ID: ip, Label: ip})
}
}
arr = append(arr, item)
}
return arr
}
// verifyDevice 检查网卡设备是否存在
func (s *Packet) verifyDevice(str string) (string, bool) {
devices, err := pcap.FindAllDevs()
if err != nil {
logger.Errorf("interfaces find all devices err: %s", err.Error())
return "", false
}
for _, device := range devices {
if len(device.Addresses) == 0 {
continue
}
if device.Name == str {
return device.Name, true
}
for _, address := range device.Addresses {
if address.IP.String() == str {
return device.Name, true
}
}
}
return "", false
}
// LiveStart 开始捕获数据
func (s *Packet) LiveStart(taskNo, deviceName, outputFile string) (string, error) {
if _, ok := s.taskMap.Load(taskNo); ok {
return "", fmt.Errorf("task no. %s already exist", taskNo)
}
// Verify the specified network interface exists
device, deviceOk := s.verifyDevice(deviceName)
if !deviceOk {
return "", fmt.Errorf("network device not exist: %s", deviceName)
}
snapshotLength := 262144
// open device
handle, err := pcap.OpenLive(device, int32(snapshotLength), true, pcap.BlockForever)
if err != nil {
logger.Errorf("open live err: %s", err.Error())
if strings.Contains(err.Error(), "operation not permitted") {
return "", fmt.Errorf("you don't have permission to capture on that/these device(s)")
}
return "", err
}
// write a new file
var w *pcapgo.Writer
var f *os.File
if outputFile != "" {
f, err = os.Create(outputFile)
if err != nil {
return "", err
}
w = pcapgo.NewWriter(f)
w.WriteFileHeader(uint32(snapshotLength), handle.LinkType())
}
// capture packets
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packetSource.Lazy = false
packetSource.NoCopy = true
packetSource.DecodeStreamsAsDatagrams = true
// save tasks
ctx := context.Background()
task := &task{
TaskNo: taskNo,
Handle: handle,
File: f,
Writer: w,
Expire: time.Now().Add(time.Second * 120),
context: ctx,
}
s.taskMap.Store(taskNo, task)
// start capture
go func() {
for packet := range packetSource.PacketsCtx(ctx) {
if packet == nil {
continue
}
if packet.Metadata().Timestamp.Before(time.Now()) {
continue
}
if w != nil {
w.WritePacket(packet.Metadata().CaptureInfo, packet.Data())
}
fmt.Println(packet.Dump())
}
}()
return "task initiated", nil
}
// LiveFilter 捕获过滤
func (s *Packet) LiveFilter(taskNo, expr string) error {
info, ok := s.taskMap.Load(taskNo)
if !ok {
return fmt.Errorf("task no. %s not exist", taskNo)
}
return info.(*task).Handle.SetBPFFilter(expr)
}
// LiveStop 停止捕获数据
func (s *Packet) LiveStop(taskNo string) error {
info, ok := s.taskMap.Load(taskNo)
if !ok {
return fmt.Errorf("task no. %s not exist", taskNo)
}
info.(*task).context.Done()
info.(*task).Handle.Close()
if info.(task).File != nil {
info.(task).File.Close()
}
s.taskMap.Delete(taskNo)
return nil
}

View File

@@ -1,8 +1,10 @@
package service
import (
"encoding/base64"
"encoding/json"
"fmt"
"net"
"strings"
"be.ems/src/framework/config"
@@ -14,6 +16,7 @@ import (
neService "be.ems/src/modules/network_element/service"
"be.ems/src/modules/trace/model"
"be.ems/src/modules/trace/repository"
wsService "be.ems/src/modules/ws/service"
)
// 实例化数据层 TraceTask 结构体
@@ -57,16 +60,27 @@ func (r *TraceTask) CreateUDP() error {
}
// 接收处理UDP数据
go r.udpService.Resolve(2048, func(data []byte, n int) {
logger.Infof("socket UDP: %s", string(data))
mData, err := UDPDataHandler(data, n)
go r.udpService.Resolve(func(conn *net.UDPConn) {
// 读取数据
buf := make([]byte, 2048)
n, err := conn.Read(buf)
if err != nil {
logger.Errorf("error reading from UDP connection: %s", err.Error())
return
}
logger.Infof("socket UDP: %s", string(buf[:n]))
// logger.Infof("socket UDP Base64: %s", base64.StdEncoding.EncodeToString(buf[:n]))
mData, err := UDPDataHandler(buf, n)
if err != nil {
logger.Errorf("udp resolve data fail: %s", err.Error())
return
}
taskId := parse.Number(mData["taskId"])
// 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{
TaskId: parse.Number(mData["taskId"]),
TaskId: taskId,
IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string),
@@ -82,6 +96,7 @@ func (r *TraceTask) CreateUDP() error {
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
}
})
@@ -92,16 +107,29 @@ func (r *TraceTask) CreateUDP() error {
return err
}
// 接收处理TCP数据
go tcpService.Resolve(1024, func(data []byte, n int) {
logger.Infof("socket TCP: %s", string(data))
mData, err := UDPDataHandler(data, n)
go tcpService.Resolve(func(conn *net.Conn) {
c := (*conn)
// 读取数据
buf := make([]byte, 2048)
n, err := c.Read(buf)
if err != nil {
logger.Errorf("error reading from TCP connection: %s", err.Error())
return
}
logger.Infof("socket TCP: %s", string(buf[:n]))
deData, _ := base64.StdEncoding.DecodeString(string(buf[:n]))
logger.Infof("socket TCP Base64: %s", deData)
mData, err := UDPDataHandler(deData, len(deData))
if err != nil {
logger.Errorf("tcp resolve data fail: %s", err.Error())
return
}
taskId := parse.Number(mData["taskId"])
// 插入数据库做记录
r.traceDataRepository.Insert(model.TraceData{
TaskId: parse.Number(mData["taskId"]),
TaskId: taskId,
IMSI: mData["imsi"].(string),
SrcAddr: mData["srcAddr"].(string),
DstAddr: mData["dstAddr"].(string),
@@ -117,6 +145,7 @@ func (r *TraceTask) CreateUDP() error {
// 推送文件
if v, ok := mData["pcapFile"]; ok && v != "" {
logger.Infof("pcapFile: %s", v)
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s%d", wsService.GROUP_TRACE, taskId), taskId)
}
})
return nil

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"time"
@@ -264,6 +265,9 @@ const versionMinor = 4
func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) error {
var err error
var file *os.File
if err := os.MkdirAll(filepath.Dir(filename), 0775); err != nil {
return err
}
if _, err = os.Stat(filename); os.IsNotExist(err) {
file, err = os.Create(filename)
// File Header
@@ -318,7 +322,7 @@ func writeEmptyPcap(filename string, timeStamp int64, length int, data []byte) e
// writePcap 写Pcap文件并返回文件路径
func writePcap(extHdr ExtHeader) string {
filePath := fmt.Sprintf("/tmp/trace_%d .pcap", extHdr.TaskId)
filePath := fmt.Sprintf("/tmp/omc/trace/task_%d.pcap", extHdr.TaskId)
if runtime.GOOS == "windows" {
filePath = fmt.Sprintf("C:%s", filePath)
}

View File

@@ -17,34 +17,61 @@ func Setup(router *gin.Engine) {
// 启动时需要的初始参数
InitLoad()
traceGroup := router.Group("/trace")
// 信令抓包
tcpdumpGroup := traceGroup.Group("/tcpdump")
tcpdumpGroup := router.Group("/trace/tcpdump")
{
tcpdumpGroup.POST("/start",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.tcpdump", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewTcpdump.DumpStart,
controller.NewTCPdump.DumpStart,
)
tcpdumpGroup.POST("/stop",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.tcpdump", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewTcpdump.DumpStop,
controller.NewTCPdump.DumpStop,
)
tcpdumpGroup.GET("/download",
middleware.PreAuthorize(nil),
controller.NewTcpdump.DumpDownload,
controller.NewTCPdump.DumpDownload,
)
tcpdumpGroup.POST("/upf",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.tcpdump", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewTcpdump.UPFTrace,
controller.NewTCPdump.UPFTrace,
)
}
// 信令跟踪
packetGroup := router.Group("/trace/packet")
{
packetGroup.GET("/devices",
middleware.PreAuthorize(nil),
controller.NewPacket.Devices,
)
packetGroup.POST("/start",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.packet", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewPacket.Start,
)
packetGroup.POST("/stop",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.packet", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewPacket.Stop,
)
packetGroup.PUT("/filter",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.packet", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewPacket.Filter,
)
packetGroup.PUT("/keep-alive",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.packet", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewPacket.KeepAlive,
)
}
// 跟踪任务 网元HLR (免登录)
taskHLRGroup := traceGroup.Group("/task/hlr")
taskHLRGroup := router.Group("/trace/task/hlr")
{
taskHLRGroup.GET("/list",
controller.NewTraceTaskHlr.List,
@@ -67,7 +94,7 @@ func Setup(router *gin.Engine) {
}
// 跟踪任务
taskGroup := traceGroup.Group("/task")
taskGroup := router.Group("/trace/task")
{
taskGroup.GET("/list",
middleware.PreAuthorize(nil),
@@ -92,10 +119,14 @@ func Setup(router *gin.Engine) {
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.task", collectlogs.BUSINESS_TYPE_DELETE)),
controller.NewTraceTask.Remove,
)
taskGroup.GET("/filePull",
middleware.PreAuthorize(nil),
controller.NewTraceTask.FilePull,
)
}
// 跟踪数据
taskDataGroup := traceGroup.Group("/data")
taskDataGroup := router.Group("/trace/data")
{
taskDataGroup.GET("/list",
middleware.PreAuthorize(nil),

View File

@@ -21,8 +21,8 @@ import (
// NewWSController 实例化控制层 WSController 结构体
var NewWSController = &WSController{
wsService: service.NewWSImpl,
wsSendService: service.NewWSSendImpl,
wsService: service.NewWS,
wsSendService: service.NewWSSend,
neHostService: neService.NewNeHostImpl,
neInfoService: neService.NewNeInfoImpl,
}
@@ -32,9 +32,9 @@ var NewWSController = &WSController{
// PATH /ws
type WSController struct {
// WebSocket 服务
wsService service.IWS
wsService *service.WS
// WebSocket消息发送 服务
wsSendService service.IWSSend
wsSendService *service.WSSend
// 网元主机连接服务
neHostService neService.INeHost
// 网元信息服务

View File

@@ -1,32 +1,220 @@
package service
import (
"encoding/json"
"net/http"
"sync"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/generate"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"github.com/gorilla/websocket"
)
// IWS WebSocket通信 服务层接口
type IWS interface {
// UpgraderWs http升级ws请求
UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn
var (
wsClients sync.Map // ws客户端 [clientId: client]
wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds]
wsGroup sync.Map // ws组对应的多个客户端id [groupId:clientIds]
)
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient
// NewWS 实例化服务层 WS 结构体
var NewWS = &WS{}
// ClientClose 客户端关闭
ClientClose(clientID string)
// WS WebSocket通信 服务层处理
type WS struct{}
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
ClientReadListen(wsClient *model.WSClient, receiveType int)
// ClientWriteListen 客户端写入消息监听
ClientWriteListen(wsClient *model.WSClient)
// UpgraderWs http升级ws请求
func (s *WS) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn {
wsUpgrader := websocket.Upgrader{
Subprotocols: []string{"omc-ws"},
// 设置消息发送缓冲区大小byte如果这个值设置得太小可能会导致服务端在发送大型消息时遇到问题
WriteBufferSize: 1024,
// 消息包启用压缩
EnableCompression: true,
// ws握手超时时间
HandshakeTimeout: 5 * time.Second,
// ws握手过程中允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("ws Upgrade err: %s", err.Error())
}
return conn
}
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
func (s *WS) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
// clientID也可以用其他方式生成只要能保证在所有服务端中都能保证唯一即可
clientID := generate.Code(16)
wsClient := &model.WSClient{
ID: clientID,
Conn: conn,
LastHeartbeat: time.Now().UnixMilli(),
BindUid: uid,
SubGroup: groupIDs,
MsgChan: make(chan []byte, 100),
StopChan: make(chan struct{}, 1), // 卡死循环标记
ChildConn: childConn,
}
// 存入客户端
wsClients.Store(clientID, wsClient)
// 存入用户持有客户端
if uid != "" {
if v, ok := wsUsers.Load(uid); ok {
uidClientIds := v.(*[]string)
*uidClientIds = append(*uidClientIds, clientID)
} else {
wsUsers.Store(uid, &[]string{clientID})
}
}
// 存入用户订阅组
if uid != "" && len(groupIDs) > 0 {
for _, groupID := range groupIDs {
if v, ok := wsGroup.Load(groupID); ok {
groupClientIds := v.(*[]string)
*groupClientIds = append(*groupClientIds, clientID)
} else {
wsGroup.Store(groupID, &[]string{clientID})
}
}
}
return wsClient
}
// ClientClose 客户端关闭
func (s *WS) ClientClose(clientID string) {
v, ok := wsClients.Load(clientID)
if !ok {
return
}
client := v.(*model.WSClient)
defer func() {
client.MsgChan <- []byte("ws:close")
client.StopChan <- struct{}{}
client.Conn.Close()
wsClients.Delete(clientID)
}()
// 客户端断线时自动踢出Uid绑定列表
if client.BindUid != "" {
if v, ok := wsUsers.Load(client.BindUid); ok {
uidClientIds := v.(*[]string)
if len(*uidClientIds) > 0 {
tempClientIds := make([]string, 0, len(*uidClientIds))
for _, v := range *uidClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*uidClientIds = tempClientIds
}
}
}
// 客户端断线时自动踢出已加入的组
if len(client.SubGroup) > 0 {
for _, groupID := range client.SubGroup {
v, ok := wsGroup.Load(groupID)
if !ok {
continue
}
groupClientIds := v.(*[]string)
if len(*groupClientIds) > 0 {
tempClientIds := make([]string, 0, len(*groupClientIds))
for _, v := range *groupClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*groupClientIds = tempClientIds
}
}
}
}
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
func (s *WS) ClientReadListen(wsClient *model.WSClient, receiveType int) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws ReadMessage Panic Error: %v", err)
}
}()
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
// fmt.Println(messageType, string(msg))
// 文本 只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
if err := json.Unmarshal(msg, &reqMsg); err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format json error"))
wsClient.MsgChan <- msgByte
continue
}
// 接收器处理
switch receiveType {
case ReceiveCommont:
go NewWSReceive.Commont(wsClient, reqMsg)
case ReceiveShell:
go NewWSReceive.Shell(wsClient, reqMsg)
case ReceiveShellView:
go NewWSReceive.ShellView(wsClient, reqMsg)
case ReceiveTelnet:
go NewWSReceive.Telnet(wsClient, reqMsg)
}
}
}
}
// ClientWriteListen 客户端写入消息监听
func (s *WS) ClientWriteListen(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws WriteMessage Panic Error: %v", err)
}
}()
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": wsClient.ID,
}))
wsClient.MsgChan <- msgByte
// 消息发送监听
for msg := range wsClient.MsgChan {
// 关闭句柄
if string(msg) == "ws:close" {
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
wsClient.LastHeartbeat = time.Now().UnixMilli()
}
}

View File

@@ -1,220 +0,0 @@
package service
import (
"encoding/json"
"net/http"
"sync"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/utils/generate"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"github.com/gorilla/websocket"
)
var (
wsClients sync.Map // ws客户端 [clientId: client]
wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds]
wsGroup sync.Map // ws组对应的多个客户端id [groupId:clientIds]
)
// NewWSImpl 实例化服务层 WSImpl 结构体
var NewWSImpl = &WSImpl{}
// WSImpl WebSocket通信 服务层处理
type WSImpl struct{}
// UpgraderWs http升级ws请求
func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn {
wsUpgrader := websocket.Upgrader{
Subprotocols: []string{"omc-ws"},
// 设置消息发送缓冲区大小byte如果这个值设置得太小可能会导致服务端在发送大型消息时遇到问题
WriteBufferSize: 1024,
// 消息包启用压缩
EnableCompression: true,
// ws握手超时时间
HandshakeTimeout: 5 * time.Second,
// ws握手过程中允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("ws Upgrade err: %s", err.Error())
}
return conn
}
// ClientCreate 客户端新建
//
// uid 登录用户ID
// groupIDs 用户订阅组
// conn ws连接实例
// childConn 子连接实例
func (s *WSImpl) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
// clientID也可以用其他方式生成只要能保证在所有服务端中都能保证唯一即可
clientID := generate.Code(16)
wsClient := &model.WSClient{
ID: clientID,
Conn: conn,
LastHeartbeat: time.Now().UnixMilli(),
BindUid: uid,
SubGroup: groupIDs,
MsgChan: make(chan []byte, 100),
StopChan: make(chan struct{}, 1), // 卡死循环标记
ChildConn: childConn,
}
// 存入客户端
wsClients.Store(clientID, wsClient)
// 存入用户持有客户端
if uid != "" {
if v, ok := wsUsers.Load(uid); ok {
uidClientIds := v.(*[]string)
*uidClientIds = append(*uidClientIds, clientID)
} else {
wsUsers.Store(uid, &[]string{clientID})
}
}
// 存入用户订阅组
if uid != "" && len(groupIDs) > 0 {
for _, groupID := range groupIDs {
if v, ok := wsGroup.Load(groupID); ok {
groupClientIds := v.(*[]string)
*groupClientIds = append(*groupClientIds, clientID)
} else {
wsGroup.Store(groupID, &[]string{clientID})
}
}
}
return wsClient
}
// ClientClose 客户端关闭
func (s *WSImpl) ClientClose(clientID string) {
v, ok := wsClients.Load(clientID)
if !ok {
return
}
client := v.(*model.WSClient)
defer func() {
client.MsgChan <- []byte("ws:close")
client.StopChan <- struct{}{}
client.Conn.Close()
wsClients.Delete(clientID)
}()
// 客户端断线时自动踢出Uid绑定列表
if client.BindUid != "" {
if v, ok := wsUsers.Load(client.BindUid); ok {
uidClientIds := v.(*[]string)
if len(*uidClientIds) > 0 {
tempClientIds := make([]string, 0, len(*uidClientIds))
for _, v := range *uidClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*uidClientIds = tempClientIds
}
}
}
// 客户端断线时自动踢出已加入的组
if len(client.SubGroup) > 0 {
for _, groupID := range client.SubGroup {
v, ok := wsGroup.Load(groupID)
if !ok {
continue
}
groupClientIds := v.(*[]string)
if len(*groupClientIds) > 0 {
tempClientIds := make([]string, 0, len(*groupClientIds))
for _, v := range *groupClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*groupClientIds = tempClientIds
}
}
}
}
// ClientReadListen 客户端读取消息监听
// receiveType 根据接收类型进行消息处理
func (s *WSImpl) ClientReadListen(wsClient *model.WSClient, receiveType int) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws ReadMessage Panic Error: %v", err)
}
}()
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
// fmt.Println(messageType, string(msg))
// 文本 只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
if err := json.Unmarshal(msg, &reqMsg); err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format json error"))
wsClient.MsgChan <- msgByte
continue
}
// 接收器处理
switch receiveType {
case ReceiveCommont:
go NewWSReceiveImpl.Commont(wsClient, reqMsg)
case ReceiveShell:
go NewWSReceiveImpl.Shell(wsClient, reqMsg)
case ReceiveShellView:
go NewWSReceiveImpl.ShellView(wsClient, reqMsg)
case ReceiveTelnet:
go NewWSReceiveImpl.Telnet(wsClient, reqMsg)
}
}
}
}
// ClientWriteListen 客户端写入消息监听
func (s *WSImpl) ClientWriteListen(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws WriteMessage Panic Error: %v", err)
}
}()
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": wsClient.ID,
}))
wsClient.MsgChan <- msgByte
// 消息发送监听
for msg := range wsClient.MsgChan {
// 关闭句柄
if string(msg) == "ws:close" {
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.ClientClose(wsClient.ID)
return
}
wsClient.LastHeartbeat = time.Now().UnixMilli()
}
}

View File

@@ -1,6 +1,18 @@
package service
import "be.ems/src/modules/ws/model"
import (
"encoding/json"
"fmt"
"io"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/telnet"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"be.ems/src/modules/ws/processor"
)
const (
ReceiveCommont = iota // Commont 接收通用业务处理
@@ -9,17 +21,245 @@ const (
ReceiveTelnet // Telnet 接收终端交互业务处理
)
// IWSReceive WebSocket消息接收处理 服务层接口
type IWSReceive interface {
// Commont 接收通用业务处理
Commont(client *model.WSClient, reqMsg model.WSRequest)
// 实例化服务层 WSReceive 结构体
var NewWSReceive = &WSReceive{}
// Shell 接收终端交互业务处理
Shell(client *model.WSClient, reqMsg model.WSRequest)
// WSReceive WebSocket消息接收处理 服务层处理
type WSReceive struct{}
// ShellView 接收查看文件终端交互业务处理
ShellView(client *model.WSClient, reqMsg model.WSRequest)
// Telnet 接收终端交互业务处理
Telnet(client *model.WSClient, reqMsg model.WSRequest)
// close 关闭服务连接
func (s *WSReceive) close(client *model.WSClient) {
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
NewWS.ClientClose(client.ID)
}
// Commont 接收通用业务处理
func (s *WSReceive) Commont(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ps":
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
case "net":
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
case "ims_cdr":
resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data)
case "smf_cdr":
resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data)
case "smsc_cdr":
resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data)
case "amf_ue":
resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data)
case "mme_ue":
resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data)
case "upf_tf":
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
case "ne_state":
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Shell 接收终端交互业务处理
func (s *WSReceive) Shell(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ssh":
// SSH会话消息接收写入会话
command := reqMsg.Data.(string)
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
case "ssh_resize":
// SSH会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// ShellView 接收查看文件终端交互业务处理
func (s *WSReceive) ShellView(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws ShellView UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "cat", "tail":
var command string
if reqMsg.Type == "cat" {
command, err = processor.ParseCat(reqMsg.Data)
}
if reqMsg.Type == "tail" {
command, err = processor.ParseTail(reqMsg.Data)
}
if command != "" && err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ctrl-c":
// 模拟按下 Ctrl+C
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write("\u0003\n")
case "resize":
// 会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws ShellView UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Telnet 接收终端交互业务处理
func (s *WSReceive) Telnet(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "telnet":
// Telnet会话消息接收写入会话
command := reqMsg.Data.(string)
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
_, err = telnetClientSession.Write(command)
case "telnet_resize":
// Telnet会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
// telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
// _ = telnetClientSession.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}

View File

@@ -1,258 +0,0 @@
package service
import (
"encoding/json"
"fmt"
"io"
"time"
"be.ems/src/framework/logger"
"be.ems/src/framework/telnet"
"be.ems/src/framework/utils/ssh"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
"be.ems/src/modules/ws/processor"
)
// 实例化服务层 WSReceiveImpl 结构体
var NewWSReceiveImpl = &WSReceiveImpl{}
// WSReceiveImpl WebSocket消息接收处理 服务层处理
type WSReceiveImpl struct{}
// Commont 接收通用业务处理
func (s *WSReceiveImpl) close(client *model.WSClient) {
// 主动关闭
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
client.MsgChan <- resultByte
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
NewWSImpl.ClientClose(client.ID)
}
// Commont 接收通用业务处理
func (s *WSReceiveImpl) Commont(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ps":
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
case "net":
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
case "ims_cdr":
resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data)
case "smf_cdr":
resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data)
case "smsc_cdr":
resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data)
case "amf_ue":
resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data)
case "mme_ue":
resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data)
case "upf_tf":
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
case "ne_state":
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Shell 接收终端交互业务处理
func (s *WSReceiveImpl) Shell(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "ssh":
// SSH会话消息接收写入会话
command := reqMsg.Data.(string)
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
case "ssh_resize":
// SSH会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// ShellView 接收查看文件终端交互业务处理
func (s *WSReceiveImpl) ShellView(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws ShellView UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "cat", "tail":
var command string
if reqMsg.Type == "cat" {
command, err = processor.ParseCat(reqMsg.Data)
}
if reqMsg.Type == "tail" {
command, err = processor.ParseTail(reqMsg.Data)
}
if command != "" && err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write(command)
}
case "ctrl-c":
// 模拟按下 Ctrl+C
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
_, err = sshClientSession.Write("\u0003\n")
case "resize":
// 会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws ShellView UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}
// Telnet 接收终端交互业务处理
func (s *WSReceiveImpl) Telnet(client *model.WSClient, reqMsg model.WSRequest) {
// 必传requestId确认消息
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "close":
s.close(client)
return
case "telnet":
// Telnet会话消息接收写入会话
command := reqMsg.Data.(string)
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
_, err = telnetClientSession.Write(command)
case "telnet_resize":
// Telnet会话窗口重置
msgByte, _ := json.Marshal(reqMsg.Data)
var data struct {
Cols int `json:"cols"`
Rows int `json:"rows"`
}
err = json.Unmarshal(msgByte, &data)
if err == nil {
// telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
// _ = telnetClientSession.WindowChange(data.Rows, data.Cols)
}
default:
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
}
if err != nil {
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
if err == io.EOF {
// 等待1s后关闭连接
time.Sleep(1 * time.Second)
client.StopChan <- struct{}{}
}
return
}
if len(resByte) > 0 {
client.MsgChan <- resByte
}
}

View File

@@ -1,10 +1,87 @@
package service
// IWSSend WebSocket消息发送处理 服务层接口
type IWSSend interface {
// ByClientID 给已知客户端发消息
ByClientID(clientID string, data any) error
import (
"encoding/json"
"fmt"
// ByGroupID 给订阅组的客户端发送消息
ByGroupID(gid string, data any) error
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
)
// 订阅组指定编号为支持服务器向客户端主动推送数据
const (
// 组号-其他
GROUP_OTHER = "0"
// 组号-跟踪任务数据变更 2_traceId
GROUP_TRACE = "2_"
// 组号-指标通用 10_neType_neId
GROUP_KPI = "10_"
// 组号-指标UPF 12_neId
GROUP_KPI_UPF = "12_"
// 组号-自定义KPI指标20_neType_neId
GROUP_KPI_C = "20_"
// 组号-IMS_CDR会话事件 1005_neId
GROUP_IMS_CDR = "1005_"
// 组号-SMF_CDR会话事件 1006_neId
GROUP_SMF_CDR = "1006_"
// 组号-SMSC_CDR会话事件 1007_neId
GROUP_SMSC_CDR = "1007_"
// 组号-AMF_UE会话事件
GROUP_AMF_UE = "1010"
// 组号-MME_UE会话事件 1011_neId
GROUP_MME_UE = "1011_"
)
// 实例化服务层 WSSend 结构体
var NewWSSend = &WSSend{}
// WSSend WebSocket消息发送处理 服务层处理
type WSSend struct{}
// ByClientID 给已知客户端发消息
func (s *WSSend) ByClientID(clientID string, data any) error {
v, ok := wsClients.Load(clientID)
if !ok {
return fmt.Errorf("no fount client ID: %s", clientID)
}
dataByte, err := json.Marshal(result.OkData(data))
if err != nil {
return err
}
client := v.(*model.WSClient)
if len(client.MsgChan) > 90 {
NewWS.ClientClose(client.ID)
return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID)
}
client.MsgChan <- dataByte
return nil
}
// ByGroupID 给订阅组的客户端发送消息
func (s *WSSend) ByGroupID(groupID string, data any) error {
clientIds, ok := wsGroup.Load(groupID)
if !ok {
return fmt.Errorf("no fount Group ID: %s", groupID)
}
// 检查组内是否有客户端
ids := clientIds.(*[]string)
if len(*ids) == 0 {
return fmt.Errorf("no members in the group")
}
// 遍历给客户端发消息
for _, clientId := range *ids {
err := s.ByClientID(clientId, map[string]any{
"groupId": groupID,
"data": data,
})
if err != nil {
continue
}
}
return nil
}

View File

@@ -1,85 +0,0 @@
package service
import (
"encoding/json"
"fmt"
"be.ems/src/framework/vo/result"
"be.ems/src/modules/ws/model"
)
// 订阅组指定编号为支持服务器向客户端主动推送数据
const (
// 组号-其他
GROUP_OTHER = "0"
// 组号-指标通用 10_neType_neId
GROUP_KPI = "10_"
// 组号-指标UPF 12_neId
GROUP_KPI_UPF = "12_"
// 组号-自定义KPI指标20_neType_neId
GROUP_KPI_C = "20_"
// 组号-IMS_CDR会话事件 1005_neId
GROUP_IMS_CDR = "1005_"
// 组号-SMF_CDR会话事件 1006_neId
GROUP_SMF_CDR = "1006_"
// 组号-SMSC_CDR会话事件 1007_neId
GROUP_SMSC_CDR = "1007_"
// 组号-AMF_UE会话事件
GROUP_AMF_UE = "1010"
// 组号-MME_UE会话事件 1011_neId
GROUP_MME_UE = "1011_"
)
// 实例化服务层 WSSendImpl 结构体
var NewWSSendImpl = &WSSendImpl{}
// IWSSend WebSocket消息发送处理 服务层处理
type WSSendImpl struct{}
// ByClientID 给已知客户端发消息
func (s *WSSendImpl) ByClientID(clientID string, data any) error {
v, ok := wsClients.Load(clientID)
if !ok {
return fmt.Errorf("no fount client ID: %s", clientID)
}
dataByte, err := json.Marshal(result.OkData(data))
if err != nil {
return err
}
client := v.(*model.WSClient)
if len(client.MsgChan) > 90 {
NewWSImpl.ClientClose(client.ID)
return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID)
}
client.MsgChan <- dataByte
return nil
}
// ByGroupID 给订阅组的客户端发送消息
func (s *WSSendImpl) ByGroupID(groupID string, data any) error {
clientIds, ok := wsGroup.Load(groupID)
if !ok {
return fmt.Errorf("no fount Group ID: %s", groupID)
}
// 检查组内是否有客户端
ids := clientIds.(*[]string)
if len(*ids) == 0 {
return fmt.Errorf("no members in the group")
}
// 遍历给客户端发消息
for _, clientId := range *ids {
err := s.ByClientID(clientId, map[string]any{
"groupId": groupID,
"data": data,
})
if err != nil {
continue
}
}
return nil
}