ref: 重构packet跟踪解析
This commit is contained in:
@@ -1,20 +1,64 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
packetTask "be.ems/src/modules/trace/packet_task"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gopacket/gopacket"
|
||||
"github.com/gopacket/gopacket/layers"
|
||||
"github.com/gopacket/gopacket/pcap"
|
||||
"github.com/gopacket/gopacket/pcapgo"
|
||||
|
||||
"be.ems/src/framework/logger"
|
||||
wsService "be.ems/src/modules/ws/service"
|
||||
)
|
||||
|
||||
// Task 任务信息
|
||||
type Task struct {
|
||||
TaskNo string // 任务编号
|
||||
Handle *pcap.Handle // 捕获句柄
|
||||
File *os.File // 捕获信息输出文件句柄
|
||||
Writer *pcapgo.Writer // 捕获信息输出句柄
|
||||
Filter string // 过滤表达式
|
||||
Ticker *time.Ticker // 任务失效定时器
|
||||
}
|
||||
|
||||
// FrameMeta 数据帧元信息
|
||||
type FrameMeta struct {
|
||||
Number int `json:"number"`
|
||||
Time int64 `json:"time"`
|
||||
Source string `json:"source"`
|
||||
Destination string `json:"destination"`
|
||||
Protocol string `json:"protocol"`
|
||||
Length int `json:"length"`
|
||||
Info string `json:"info"`
|
||||
Data string `json:"data"` // 原始数据byte[] base64 编码数据
|
||||
}
|
||||
|
||||
// 实例化服务层 Packet 结构体
|
||||
var NewPacket = &Packet{}
|
||||
var NewPacket = &Packet{
|
||||
taskMap: sync.Map{},
|
||||
}
|
||||
|
||||
// 信令跟踪 服务层处理
|
||||
type Packet struct{}
|
||||
type Packet struct {
|
||||
taskMap sync.Map
|
||||
}
|
||||
|
||||
// NetworkDevices 获取网卡设备信息
|
||||
func (s *Packet) NetworkDevices() []map[string]any {
|
||||
arr := make([]map[string]any, 0)
|
||||
devices, err := packetTask.NetworkDevices()
|
||||
devices, err := pcap.FindAllDevs()
|
||||
if err != nil {
|
||||
logger.Errorf("interfaces find all devices err: %s", err.Error())
|
||||
return arr
|
||||
}
|
||||
|
||||
@@ -47,20 +91,343 @@ func (s *Packet) NetworkDevices() []map[string]any {
|
||||
|
||||
// LiveStart 开始捕获数据
|
||||
func (s *Packet) LiveStart(taskNo, deviceName, filterBPF string, outputPCAP bool) (string, error) {
|
||||
return packetTask.LiveStart(taskNo, deviceName, filterBPF, outputPCAP)
|
||||
if _, ok := s.taskMap.Load(taskNo); ok {
|
||||
return "", fmt.Errorf("task no. %s already exist", taskNo)
|
||||
}
|
||||
|
||||
// Verify the specified network interface exists
|
||||
device, deviceOk := s.verifyDevice(deviceName)
|
||||
if !deviceOk {
|
||||
return "", fmt.Errorf("network device not exist: %s", deviceName)
|
||||
}
|
||||
|
||||
// listening on eth1, link-type EN10MB (Ethernet), snapshot length 262144 bytes
|
||||
snapshotLength := 262144
|
||||
|
||||
// open device
|
||||
handle, err := pcap.OpenLive(device, int32(snapshotLength), true, pcap.BlockForever)
|
||||
if err != nil {
|
||||
logger.Errorf("open live err: %s", err.Error())
|
||||
if strings.Contains(err.Error(), "operation not permitted") {
|
||||
return "", fmt.Errorf("you don't have permission to capture on that/these device(s)")
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
// write a new file
|
||||
var w *pcapgo.Writer
|
||||
var f *os.File
|
||||
if outputPCAP {
|
||||
// 网管本地路径
|
||||
localFilePath := fmt.Sprintf("/tmp/omc/packet/%s.pcap", taskNo)
|
||||
if runtime.GOOS == "windows" {
|
||||
localFilePath = fmt.Sprintf("C:%s", localFilePath)
|
||||
}
|
||||
f, w, err = s.outputPCAPFile(uint32(snapshotLength), handle.LinkType(), localFilePath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
// set filter
|
||||
if filterBPF != "" {
|
||||
if err = handle.SetBPFFilter(filterBPF); err != nil {
|
||||
logger.Errorf("packet BPF Filter %s => %s", filterBPF, err.Error())
|
||||
filterBPF = ""
|
||||
}
|
||||
}
|
||||
|
||||
// save tasks
|
||||
taskInfo := &Task{
|
||||
TaskNo: taskNo,
|
||||
Handle: handle,
|
||||
File: f,
|
||||
Writer: w,
|
||||
Filter: filterBPF,
|
||||
Ticker: time.NewTicker(time.Second * 120),
|
||||
}
|
||||
|
||||
go s.capturePacketSource(taskInfo)
|
||||
s.taskMap.Store(taskNo, taskInfo)
|
||||
return fmt.Sprintf("task no. %s initiated", taskNo), nil
|
||||
}
|
||||
|
||||
// verifyDevice 检查网卡设备是否存在
|
||||
func (s *Packet) verifyDevice(str string) (string, bool) {
|
||||
devices, err := pcap.FindAllDevs()
|
||||
if err != nil {
|
||||
logger.Errorf("interfaces find all devices err: %s", err.Error())
|
||||
return "", false
|
||||
}
|
||||
for _, device := range devices {
|
||||
if len(device.Addresses) == 0 {
|
||||
continue
|
||||
}
|
||||
if device.Name == str {
|
||||
return device.Name, true
|
||||
}
|
||||
for _, address := range device.Addresses {
|
||||
if address.IP.String() == str {
|
||||
return device.Name, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// capturePacketSource 捕获数据
|
||||
func (s *Packet) capturePacketSource(taskInfo *Task) {
|
||||
// capture packets
|
||||
packetSource := gopacket.NewPacketSource(taskInfo.Handle, taskInfo.Handle.LinkType())
|
||||
packetSource.Lazy = false
|
||||
packetSource.NoCopy = true
|
||||
packetSource.DecodeStreamsAsDatagrams = true
|
||||
|
||||
// 协程停止后关闭句柄并移除任务信息
|
||||
defer func() {
|
||||
taskInfo.Ticker.Stop()
|
||||
taskInfo.Handle.Close()
|
||||
if taskInfo.File != nil {
|
||||
taskInfo.File.Close()
|
||||
}
|
||||
s.taskMap.Delete(taskInfo.TaskNo)
|
||||
}()
|
||||
|
||||
var frameNumber int64 = 0 // 帧编号
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-taskInfo.Ticker.C:
|
||||
return
|
||||
case packet := <-packetSource.Packets():
|
||||
if packet == nil {
|
||||
continue
|
||||
}
|
||||
if packet.Metadata().Timestamp.Before(time.Now()) {
|
||||
continue
|
||||
}
|
||||
|
||||
// 使用原子操作获取当前帧号并自增
|
||||
currentFrameNumber := atomic.AddInt64(&frameNumber, 1)
|
||||
frameSrc := "" // 源主机IP
|
||||
frameDst := "" // 目的主机IP
|
||||
frameProtocol := "" // 协议
|
||||
frameInfo := "" // 信息
|
||||
// fmt.Printf("---------- packet %d layers:%d time: %s, len: %d \n", currentFrameNumber, len(packet.Layers()), packet.Metadata().Timestamp, packet.Metadata().Length)
|
||||
|
||||
// 网络层
|
||||
// fmt.Println(packet.NetworkLayer())
|
||||
if networkLayer := packet.NetworkLayer(); networkLayer != nil {
|
||||
src, dst := networkLayer.NetworkFlow().Endpoints()
|
||||
frameSrc = src.String()
|
||||
frameDst = dst.String()
|
||||
if frameDst == "ff:ff:ff:ff" {
|
||||
frameDst = "Broadcast"
|
||||
}
|
||||
}
|
||||
|
||||
// 传输层
|
||||
// fmt.Println(packet.TransportLayer())
|
||||
if transportLayer := packet.TransportLayer(); transportLayer != nil {
|
||||
frameProtocol = transportLayer.LayerType().String()
|
||||
switch layer := transportLayer.(type) {
|
||||
case *layers.TCP: // 传输控制协议,提供可靠的数据传输。
|
||||
var flagsDesc []string
|
||||
if layer.FIN {
|
||||
flagsDesc = append(flagsDesc, "FIN")
|
||||
}
|
||||
if layer.SYN {
|
||||
flagsDesc = append(flagsDesc, "SYN")
|
||||
}
|
||||
if layer.RST {
|
||||
flagsDesc = append(flagsDesc, "RST")
|
||||
}
|
||||
if layer.PSH {
|
||||
flagsDesc = append(flagsDesc, "PSH")
|
||||
}
|
||||
if layer.ACK {
|
||||
flagsDesc = append(flagsDesc, "ACK")
|
||||
}
|
||||
if layer.URG {
|
||||
flagsDesc = append(flagsDesc, "URG")
|
||||
}
|
||||
if layer.ECE {
|
||||
flagsDesc = append(flagsDesc, "ECE")
|
||||
}
|
||||
if layer.CWR {
|
||||
flagsDesc = append(flagsDesc, "CWR")
|
||||
}
|
||||
if layer.NS {
|
||||
flagsDesc = append(flagsDesc, "NS")
|
||||
}
|
||||
|
||||
frameInfo = fmt.Sprintf("%v -> %v [%s], Seq=%d Ack=%d Win=%d Len=%d ", layer.SrcPort, layer.DstPort, strings.Join(flagsDesc, ", "), layer.Seq, layer.Ack, layer.Window, len(layer.Payload))
|
||||
case *layers.UDP: // 用户数据报协议,提供无连接的快速数据传输。
|
||||
frameInfo = fmt.Sprintf("%v -> %v Len=%d ", layer.SrcPort, layer.DstPort, len(layer.Payload))
|
||||
case *layers.UDPLite:
|
||||
frameInfo = fmt.Sprintf("%v -> %v Len=%d ", layer.SrcPort, layer.DstPort, len(layer.Payload))
|
||||
case *layers.SCTP: // 流控制传输协议,支持多流和多宿主机。
|
||||
frameInfo = fmt.Sprintf("%v -> %v Len=%d ", layer.SrcPort, layer.DstPort, len(layer.Payload))
|
||||
}
|
||||
}
|
||||
|
||||
// 应用协议层判断
|
||||
switch {
|
||||
case packet.Layer(layers.LayerTypeARP) != nil:
|
||||
arp := packet.Layer(layers.LayerTypeARP).(*layers.ARP)
|
||||
frameSrc = net.IP(arp.SourceProtAddress).String()
|
||||
frameDst = net.IP(arp.DstProtAddress).String()
|
||||
frameProtocol = "ARP"
|
||||
frameInfo = fmt.Sprintf("Who has %s? Tell %s", frameDst, frameSrc)
|
||||
case packet.Layer(layers.LayerTypeVRRP) != nil:
|
||||
frameProtocol = "VRRP"
|
||||
frameInfo = "Announcement (v2)"
|
||||
case packet.Layer(layers.LayerTypeIGMP) != nil:
|
||||
switch layer := packet.Layer(layers.LayerTypeIGMP).(type) {
|
||||
case *layers.IGMP:
|
||||
frameProtocol = fmt.Sprintf("IGMPv%d", layer.Version)
|
||||
frameInfo = fmt.Sprintf("%s %s", layer.Type.String(), layer.GroupAddress.String())
|
||||
case *layers.IGMPv1or2:
|
||||
frameProtocol = fmt.Sprintf("IGMPv%d", layer.Version)
|
||||
frameInfo = fmt.Sprintf("%s %s", layer.Type.String(), layer.GroupAddress.String())
|
||||
}
|
||||
case packet.Layer(layers.LayerTypeICMPv4) != nil:
|
||||
icmpv4 := packet.Layer(layers.LayerTypeICMPv4).(*layers.ICMPv4)
|
||||
frameProtocol = "ICMP"
|
||||
frameInfo = icmpv4.TypeCode.String()
|
||||
case packet.Layer(layers.LayerTypeICMPv6) != nil:
|
||||
icmpv6 := packet.Layer(layers.LayerTypeICMPv6).(*layers.ICMPv6)
|
||||
frameProtocol = "ICMPv6"
|
||||
frameInfo = icmpv6.TypeCode.String()
|
||||
case packet.Layer(layers.LayerTypeSTP) != nil:
|
||||
stp := packet.Layer(layers.LayerTypeSTP).(*layers.STP)
|
||||
rootIdentifier := stp.RouteID
|
||||
frameSrc = rootIdentifier.HwAddr.String()
|
||||
frameDst = stp.BridgeID.HwAddr.String()
|
||||
frameProtocol = "STP"
|
||||
frameInfo = fmt.Sprintf("MST. Root = %d/%d/%s Cost = %d Port = 0x%x", rootIdentifier.Priority, rootIdentifier.SysID, frameSrc, stp.Cost, stp.PortID)
|
||||
case packet.Layer(layers.LayerTypeSIP) != nil:
|
||||
sip := packet.Layer(layers.LayerTypeSIP).(*layers.SIP)
|
||||
frameProtocol = "SIP"
|
||||
if sip.IsResponse {
|
||||
frameInfo = fmt.Sprintf("%d %s", sip.ResponseCode, sip.ResponseStatus)
|
||||
} else {
|
||||
frameInfo = fmt.Sprintf("%s %s", sip.Method, sip.RequestURI)
|
||||
}
|
||||
case packet.Layer(layers.LayerTypeDNS) != nil:
|
||||
dns := packet.Layer(layers.LayerTypeDNS).(*layers.DNS)
|
||||
frameProtocol = "DNS"
|
||||
question := []string{}
|
||||
if len(dns.Questions) > 0 {
|
||||
question = append(question, fmt.Sprintf("%s %s", dns.Questions[0].Type, dns.Questions[0].Name))
|
||||
}
|
||||
frameInfo = fmt.Sprintf("%s %s 0x%x %s Answers %d", dns.ResponseCode.String(), dns.OpCode.String(), dns.ID, question, len(dns.Answers))
|
||||
case packet.Layer(layers.LayerTypeDHCPv6) != nil:
|
||||
dhcpv6 := packet.Layer(layers.LayerTypeDHCPv6).(*layers.DHCPv6)
|
||||
frameProtocol = "DHCPv6"
|
||||
frameInfo = fmt.Sprintf("%s XID: 0x%+x", dhcpv6.MsgType.String(), dhcpv6.TransactionID)
|
||||
case packet.Layer(layers.LayerTypeTLS) != nil:
|
||||
tls := packet.Layer(layers.LayerTypeTLS).(*layers.TLS)
|
||||
if len(tls.AppData) > 0 {
|
||||
item := tls.AppData[0]
|
||||
frameProtocol = item.Version.String()
|
||||
frameInfo = item.ContentType.String()
|
||||
} else if len(tls.Handshake) > 0 {
|
||||
item := tls.Handshake[0]
|
||||
frameProtocol = item.ClientHello.ProtocolVersion.String()
|
||||
frameInfo = "Client Hello"
|
||||
} else {
|
||||
frameProtocol = "TLS"
|
||||
frameInfo = "Unknown"
|
||||
}
|
||||
}
|
||||
|
||||
if frameProtocol == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// 数据
|
||||
frameMeta := FrameMeta{
|
||||
Number: int(currentFrameNumber),
|
||||
Time: packet.Metadata().Timestamp.UnixNano(),
|
||||
Source: frameSrc,
|
||||
Destination: frameDst,
|
||||
Protocol: frameProtocol,
|
||||
Length: packet.Metadata().Length,
|
||||
Info: frameInfo,
|
||||
Data: base64.StdEncoding.EncodeToString(packet.Data()),
|
||||
}
|
||||
// 推送到ws订阅组
|
||||
wsService.NewWSSend.ByGroupID(fmt.Sprintf("%s_%s", wsService.GROUP_TRACE_PACKET, taskInfo.TaskNo), frameMeta)
|
||||
|
||||
// 写入文件
|
||||
if taskInfo.Writer != nil {
|
||||
taskInfo.Writer.WritePacket(packet.Metadata().CaptureInfo, packet.Data())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// outputPCAPFile 输出 pcap 文件
|
||||
// 新文件时需要 snaplen 最大长度 linktype 链路类型
|
||||
func (s *Packet) outputPCAPFile(snaplen uint32, linktype layers.LinkType, outputFile string) (*os.File, *pcapgo.Writer, error) {
|
||||
var err error
|
||||
var f *os.File
|
||||
if err := os.MkdirAll(filepath.Dir(outputFile), 0775); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// 检查文件是否存在
|
||||
if _, err = os.Stat(outputFile); os.IsNotExist(err) {
|
||||
f, err = os.Create(outputFile)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
w := pcapgo.NewWriter(f)
|
||||
w.WriteFileHeader(snaplen, linktype) // new file, must do this.
|
||||
return f, w, nil
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(outputFile, os.O_APPEND, 0700)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
w := pcapgo.NewWriter(f)
|
||||
return f, w, nil
|
||||
}
|
||||
|
||||
// LiveFilter 捕获过滤
|
||||
func (s *Packet) LiveFilter(taskNo, expr string) error {
|
||||
return packetTask.LiveFilter(taskNo, expr)
|
||||
info, ok := s.taskMap.Load(taskNo)
|
||||
if !ok {
|
||||
return fmt.Errorf("task no. %s not exist", taskNo)
|
||||
}
|
||||
task := info.(*Task)
|
||||
task.Filter = expr
|
||||
err := task.Handle.SetBPFFilter(expr)
|
||||
if err != nil {
|
||||
logger.Errorf("packet BPF Filter %s => %s", expr, err.Error())
|
||||
return fmt.Errorf("can't parse filter expression")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LiveTimeout 更新捕获失效时间
|
||||
func (s *Packet) LiveTimeout(taskNo string, seconds int) error {
|
||||
return packetTask.LiveTimeout(taskNo, seconds)
|
||||
info, ok := s.taskMap.Load(taskNo)
|
||||
if !ok {
|
||||
return fmt.Errorf("task no. %s not exist", taskNo)
|
||||
}
|
||||
info.(*Task).Ticker.Reset(time.Duration(seconds) * time.Second)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LiveStop 停止捕获数据
|
||||
func (s *Packet) LiveStop(taskNo string) error {
|
||||
return packetTask.LiveStop(taskNo)
|
||||
info, ok := s.taskMap.Load(taskNo)
|
||||
if !ok {
|
||||
return fmt.Errorf("task no. %s not exist", taskNo)
|
||||
}
|
||||
info.(*Task).Ticker.Reset(time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user