Files
be.ems/src/modules/trace/service/packet.go
2025-06-06 15:02:45 +08:00

436 lines
13 KiB
Go

package service
import (
"encoding/base64"
"fmt"
"net"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/pcap"
"github.com/gopacket/gopacket/pcapgo"
"be.ems/src/framework/logger"
wsService "be.ems/src/modules/ws/service"
)
// Task 任务信息
type Task struct {
TaskNo string // 任务编号
Handle *pcap.Handle // 捕获句柄
File *os.File // 捕获信息输出文件句柄
Writer *pcapgo.Writer // 捕获信息输出句柄
Filter string // 过滤表达式
Ticker *time.Ticker // 任务失效定时器
}
// FrameMeta 数据帧元信息
type FrameMeta struct {
Number int `json:"number"`
Time int64 `json:"time"`
Source string `json:"source"`
Destination string `json:"destination"`
Protocol string `json:"protocol"`
Length int `json:"length"`
Info string `json:"info"`
Data string `json:"data"` // 原始数据byte[] base64 编码数据
}
// 实例化服务层 Packet 结构体
var NewPacket = &Packet{
taskMap: sync.Map{},
}
// 信令跟踪 服务层处理
type Packet struct {
taskMap sync.Map
}
// NetworkDevices 获取网卡设备信息
func (s *Packet) NetworkDevices() []map[string]any {
arr := make([]map[string]any, 0)
devices, err := pcap.FindAllDevs()
if err != nil {
logger.Errorf("interfaces find all devices err: %s", err.Error())
return arr
}
for _, device := range devices {
if len(device.Addresses) == 0 {
continue
}
lable := device.Description
if lable == "" {
lable = device.Name
}
item := map[string]any{
"id": device.Name,
"label": lable,
"children": []map[string]any{},
}
for _, address := range device.Addresses {
if address.IP != nil {
ip := address.IP.String()
item["children"] = append(item["children"].([]map[string]any), map[string]any{"id": ip, "label": ip})
}
}
arr = append(arr, item)
}
return arr
}
// LiveStart 开始捕获数据
func (s *Packet) LiveStart(taskNo, deviceName, filterBPF string, outputPCAP bool) (string, error) {
if _, ok := s.taskMap.Load(taskNo); ok {
return "", fmt.Errorf("task no. %s already exist", taskNo)
}
// Verify the specified network interface exists
device, deviceOk := s.verifyDevice(deviceName)
if !deviceOk {
return "", fmt.Errorf("network device not exist: %s", deviceName)
}
// listening on eth1, link-type EN10MB (Ethernet), snapshot length 262144 bytes
snapshotLength := 262144
// open device
handle, err := pcap.OpenLive(device, int32(snapshotLength), true, pcap.BlockForever)
if err != nil {
logger.Errorf("open live err: %s", err.Error())
if strings.Contains(err.Error(), "operation not permitted") {
return "", fmt.Errorf("you don't have permission to capture on that/these device(s)")
}
return "", err
}
// write a new file
var w *pcapgo.Writer
var f *os.File
if outputPCAP {
// 网管本地路径
localFilePath := fmt.Sprintf("/usr/local/omc/packet/%s.pcap", taskNo)
if runtime.GOOS == "windows" {
localFilePath = fmt.Sprintf("C:%s", localFilePath)
}
f, w, err = s.outputPCAPFile(uint32(snapshotLength), handle.LinkType(), localFilePath)
if err != nil {
return "", err
}
}
// set filter
if filterBPF != "" {
if err = handle.SetBPFFilter(filterBPF); err != nil {
logger.Errorf("packet BPF Filter %s => %s", filterBPF, err.Error())
filterBPF = ""
}
}
// save tasks
taskInfo := &Task{
TaskNo: taskNo,
Handle: handle,
File: f,
Writer: w,
Filter: filterBPF,
Ticker: time.NewTicker(time.Second * 120),
}
go s.capturePacketSource(taskInfo)
s.taskMap.Store(taskNo, taskInfo)
return fmt.Sprintf("task no: %s initiated", taskNo), nil
}
// verifyDevice 检查网卡设备是否存在
func (s *Packet) verifyDevice(str string) (string, bool) {
devices, err := pcap.FindAllDevs()
if err != nil {
logger.Errorf("interfaces find all devices err: %s", err.Error())
return "", false
}
for _, device := range devices {
if len(device.Addresses) == 0 {
continue
}
if device.Name == str {
return device.Name, true
}
for _, address := range device.Addresses {
if address.IP.String() == str {
return device.Name, true
}
}
}
return "", false
}
// capturePacketSource 捕获数据
func (s *Packet) capturePacketSource(taskInfo *Task) {
// capture packets
packetSource := gopacket.NewPacketSource(taskInfo.Handle, taskInfo.Handle.LinkType())
packetSource.Lazy = false
packetSource.NoCopy = true
packetSource.DecodeStreamsAsDatagrams = true
// 协程停止后关闭句柄并移除任务信息
defer func() {
taskInfo.Ticker.Stop()
taskInfo.Handle.Close()
if taskInfo.File != nil {
taskInfo.File.Close()
}
s.taskMap.Delete(taskInfo.TaskNo)
}()
var frameNumber int64 = 0 // 帧编号
for {
select {
case <-taskInfo.Ticker.C:
return
case packet := <-packetSource.Packets():
if packet == nil {
continue
}
// 如果延迟超过1秒跳过
timeDiff := time.Since(packet.Metadata().Timestamp)
if timeDiff > time.Second {
continue
}
// 使用原子操作获取当前帧号并自增
currentFrameNumber := atomic.AddInt64(&frameNumber, 1)
frameSrc := "" // 源主机IP
frameDst := "" // 目的主机IP
frameProtocol := "" // 协议
frameInfo := "" // 信息
// fmt.Printf("---------- packet %d layers:%d time: %s, len: %d \n", currentFrameNumber, len(packet.Layers()), packet.Metadata().Timestamp, packet.Metadata().Length)
// 网络层
// fmt.Println(packet.NetworkLayer())
if networkLayer := packet.NetworkLayer(); networkLayer != nil {
src, dst := networkLayer.NetworkFlow().Endpoints()
frameSrc = src.String()
frameDst = dst.String()
if frameDst == "ff:ff:ff:ff" {
frameDst = "Broadcast"
}
}
// 传输层
// fmt.Println(packet.TransportLayer())
if transportLayer := packet.TransportLayer(); transportLayer != nil {
frameProtocol = transportLayer.LayerType().String()
switch layer := transportLayer.(type) {
case *layers.TCP: // 传输控制协议,提供可靠的数据传输。
var flagsDesc []string
if layer.FIN {
flagsDesc = append(flagsDesc, "FIN")
}
if layer.SYN {
flagsDesc = append(flagsDesc, "SYN")
}
if layer.RST {
flagsDesc = append(flagsDesc, "RST")
}
if layer.PSH {
flagsDesc = append(flagsDesc, "PSH")
}
if layer.ACK {
flagsDesc = append(flagsDesc, "ACK")
}
if layer.URG {
flagsDesc = append(flagsDesc, "URG")
}
if layer.ECE {
flagsDesc = append(flagsDesc, "ECE")
}
if layer.CWR {
flagsDesc = append(flagsDesc, "CWR")
}
if layer.NS {
flagsDesc = append(flagsDesc, "NS")
}
frameInfo = fmt.Sprintf("%v -> %v [%s], Seq=%d Ack=%d Win=%d Len=%d ", layer.SrcPort, layer.DstPort, strings.Join(flagsDesc, ", "), layer.Seq, layer.Ack, layer.Window, len(layer.Payload))
case *layers.UDP: // 用户数据报协议,提供无连接的快速数据传输。
frameInfo = fmt.Sprintf("%v -> %v Len=%d ", layer.SrcPort, layer.DstPort, len(layer.Payload))
case *layers.UDPLite:
frameInfo = fmt.Sprintf("%v -> %v Len=%d ", layer.SrcPort, layer.DstPort, len(layer.Payload))
case *layers.SCTP: // 流控制传输协议,支持多流和多宿主机。
frameInfo = fmt.Sprintf("%v -> %v Len=%d ", layer.SrcPort, layer.DstPort, len(layer.Payload))
}
}
// 应用协议层判断
switch {
case packet.Layer(layers.LayerTypeARP) != nil:
arp := packet.Layer(layers.LayerTypeARP).(*layers.ARP)
frameSrc = net.IP(arp.SourceProtAddress).String()
frameDst = net.IP(arp.DstProtAddress).String()
frameProtocol = "ARP"
frameInfo = fmt.Sprintf("Who has %s? Tell %s", frameDst, frameSrc)
case packet.Layer(layers.LayerTypeVRRP) != nil:
frameProtocol = "VRRP"
frameInfo = "Announcement (v2)"
case packet.Layer(layers.LayerTypeIGMP) != nil:
switch layer := packet.Layer(layers.LayerTypeIGMP).(type) {
case *layers.IGMP:
frameProtocol = fmt.Sprintf("IGMPv%d", layer.Version)
frameInfo = fmt.Sprintf("%s %s", layer.Type.String(), layer.GroupAddress.String())
case *layers.IGMPv1or2:
frameProtocol = fmt.Sprintf("IGMPv%d", layer.Version)
frameInfo = fmt.Sprintf("%s %s", layer.Type.String(), layer.GroupAddress.String())
}
case packet.Layer(layers.LayerTypeICMPv4) != nil:
icmpv4 := packet.Layer(layers.LayerTypeICMPv4).(*layers.ICMPv4)
frameProtocol = "ICMP"
frameInfo = icmpv4.TypeCode.String()
case packet.Layer(layers.LayerTypeICMPv6) != nil:
icmpv6 := packet.Layer(layers.LayerTypeICMPv6).(*layers.ICMPv6)
frameProtocol = "ICMPv6"
frameInfo = icmpv6.TypeCode.String()
case packet.Layer(layers.LayerTypeSTP) != nil:
stp := packet.Layer(layers.LayerTypeSTP).(*layers.STP)
rootIdentifier := stp.RouteID
frameSrc = rootIdentifier.HwAddr.String()
frameDst = stp.BridgeID.HwAddr.String()
frameProtocol = "STP"
frameInfo = fmt.Sprintf("MST. Root = %d/%d/%s Cost = %d Port = 0x%x", rootIdentifier.Priority, rootIdentifier.SysID, frameSrc, stp.Cost, stp.PortID)
case packet.Layer(layers.LayerTypeSIP) != nil:
sip := packet.Layer(layers.LayerTypeSIP).(*layers.SIP)
frameProtocol = "SIP"
if sip.IsResponse {
frameInfo = fmt.Sprintf("%d %s", sip.ResponseCode, sip.ResponseStatus)
} else {
frameInfo = fmt.Sprintf("%s %s", sip.Method, sip.RequestURI)
}
case packet.Layer(layers.LayerTypeDNS) != nil:
dns := packet.Layer(layers.LayerTypeDNS).(*layers.DNS)
frameProtocol = "DNS"
question := []string{}
if len(dns.Questions) > 0 {
question = append(question, fmt.Sprintf("%s %s", dns.Questions[0].Type, dns.Questions[0].Name))
}
frameInfo = fmt.Sprintf("%s %s 0x%x %s Answers %d", dns.ResponseCode.String(), dns.OpCode.String(), dns.ID, question, len(dns.Answers))
case packet.Layer(layers.LayerTypeDHCPv6) != nil:
dhcpv6 := packet.Layer(layers.LayerTypeDHCPv6).(*layers.DHCPv6)
frameProtocol = "DHCPv6"
frameInfo = fmt.Sprintf("%s XID: 0x%+x", dhcpv6.MsgType.String(), dhcpv6.TransactionID)
case packet.Layer(layers.LayerTypeTLS) != nil:
tls := packet.Layer(layers.LayerTypeTLS).(*layers.TLS)
if len(tls.AppData) > 0 {
item := tls.AppData[0]
frameProtocol = item.Version.String()
frameInfo = item.ContentType.String()
} else if len(tls.Handshake) > 0 {
item := tls.Handshake[0]
frameProtocol = item.ClientHello.ProtocolVersion.String()
frameInfo = "Client Hello"
} else {
frameProtocol = "TLS"
frameInfo = "Unknown"
}
}
if frameProtocol == "" {
continue
}
// 数据
frameMeta := FrameMeta{
Number: int(currentFrameNumber),
Time: packet.Metadata().Timestamp.UnixNano(),
Source: frameSrc,
Destination: frameDst,
Protocol: frameProtocol,
Length: packet.Metadata().Length,
Info: frameInfo,
Data: base64.StdEncoding.EncodeToString(packet.Data()),
}
// 推送到ws订阅组
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_TRACE_PACKET, taskInfo.TaskNo), frameMeta)
// 写入文件
if taskInfo.Writer != nil {
taskInfo.Writer.WritePacket(packet.Metadata().CaptureInfo, packet.Data())
}
}
}
}
// outputPCAPFile 输出 pcap 文件
// 新文件时需要 snaplen 最大长度 linktype 链路类型
func (s *Packet) outputPCAPFile(snaplen uint32, linktype layers.LinkType, outputFile string) (*os.File, *pcapgo.Writer, error) {
var err error
var f *os.File
if err := os.MkdirAll(filepath.Dir(outputFile), 0775); err != nil {
return nil, nil, err
}
// 检查文件是否存在
if _, err = os.Stat(outputFile); os.IsNotExist(err) {
f, err = os.Create(outputFile)
if err != nil {
return nil, nil, err
}
w := pcapgo.NewWriter(f)
w.WriteFileHeader(snaplen, linktype) // new file, must do this.
return f, w, nil
}
f, err = os.OpenFile(outputFile, os.O_APPEND, 0700)
if err != nil {
return nil, nil, err
}
w := pcapgo.NewWriter(f)
return f, w, nil
}
// LiveFilter 捕获过滤
func (s *Packet) LiveFilter(taskNo, expr string) error {
info, ok := s.taskMap.Load(taskNo)
if !ok {
return fmt.Errorf("task no: %s does not exist or has stopped", taskNo)
}
task := info.(*Task)
task.Filter = expr
err := task.Handle.SetBPFFilter(expr)
if err != nil {
logger.Errorf("packet BPF Filter %s => %s", expr, err.Error())
return fmt.Errorf("can't parse filter expression")
}
return nil
}
// LiveTimeout 更新捕获失效时间
func (s *Packet) LiveTimeout(taskNo string, seconds int) error {
info, ok := s.taskMap.Load(taskNo)
if !ok {
return fmt.Errorf("task no: %s does not exist or has stopped", taskNo)
}
info.(*Task).Ticker.Reset(time.Duration(seconds) * time.Second)
return nil
}
// LiveStop 停止捕获数据
func (s *Packet) LiveStop(taskNo string) error {
info, ok := s.taskMap.Load(taskNo)
if !ok {
return fmt.Errorf("task no: %s does not exist or has stopped", taskNo)
}
info.(*Task).Ticker.Reset(time.Millisecond)
return nil
}