Files
be.ems/captrace/captrace.go

637 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"encoding/binary"
"encoding/hex"
"fmt"
"net"
"os"
"os/exec"
"strings"
"sync"
"be.ems/captrace/config"
"be.ems/lib/dborm"
"be.ems/lib/global"
"be.ems/lib/log"
_ "github.com/go-sql-driver/mysql"
"golang.org/x/net/http/httpguts"
"golang.org/x/net/http2/hpack"
)
// goroutine
var limitChan = make(chan bool, 1024)
const (
GTPU_V1_VERSION = 1 << 5
GTPU_VER_MASK = 7 << 5
GTPU_PT_GTP = 1 << 4
GTPU_HEADER_LEN = 12
GTPU_E_S_PB_BIT = 7
GTPU_E_BI = 1 << 2
)
const (
GTPU_HEADER_VERSION_INDEX = 0
GTPU_HEADER_MSG_TYPE_INDEX = 1
GTPU_HEADER_LENGTH_INDEX = 2
GTPU_HEADER_TEID_INDEX = 4
)
type ExtHeader struct {
TaskId uint32
IMSI string
IfType byte
MsgType byte
MsgDirect byte // 0-recv,1-send
TimeStamp int64
SrcIP string
DstIP string
SrcPort uint16
DstPort uint16
Proto int
PPI int // only for SCTP
DataLen uint16
DataInfo []byte
}
type MsgInfo struct {
TaskId uint32
TimeStamp int64
IfType byte
MsgType byte
MsgDirect byte
SrcAddr string // IP:Port
DstAddr string // IP:Port
}
// func (fr *Framer) ReadFrame() (Frame, error)
// ReadMetaHeaders *hpack.Decoder
func validWireHeaderFieldName(v string) bool {
if len(v) == 0 {
return false
}
for _, r := range v {
if !httpguts.IsTokenRune(r) {
return false
}
if 'A' <= r && r <= 'Z' {
return false
}
}
return true
}
// tshark -r gtp.pcap -T json -d tcp.port==8080,http2 -Y "http2"
// -T pdml: Packet Details Markup Language
// -T psml: Packet Summary Markup Language
func execTshark(filename string, proto string) {
pcapPath := filename
tshark := exec.Command("tshark", "-r"+pcapPath,
"-Y"+proto,
"-T", "pdml")
out, err := tshark.CombinedOutput()
if err != nil {
log.Errorf("Failed to exec tshark:", err)
} else {
log.Debug("combined out:", string(out))
}
}
const magicMicroseconds = 0xa1b2c3d4
const versionMajor = 2
const versionMinor = 4
func WriteEmptyPcap(filename string, timestamp int64, length int, data []byte) error {
var cooked = [...]byte{0x00, 0x00, 0x03, 0x04, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00}
var buf []byte
//24+16+16 = 56
buf = make([]byte, 56+length)
binary.LittleEndian.PutUint32(buf[0:4], magicMicroseconds)
binary.LittleEndian.PutUint16(buf[4:6], versionMajor)
binary.LittleEndian.PutUint16(buf[6:8], versionMinor)
// bytes 8:12 stay 0 (timezone = UTC)
// bytes 12:16 stay 0 (sigfigs is always set to zero, according to
// http://wiki.wireshark.org/Development/LibpcapFileFormat
binary.LittleEndian.PutUint32(buf[16:20], 0x00040000)
binary.LittleEndian.PutUint32(buf[20:24], 0x00000071)
// Packet Header
binary.LittleEndian.PutUint64(buf[24:32], uint64(timestamp))
binary.LittleEndian.PutUint32(buf[32:36], uint32(length+16))
binary.LittleEndian.PutUint32(buf[36:40], uint32(length+16))
copy(buf[40:], cooked[:])
copy(buf[56:], data[:])
err := os.WriteFile(filename, buf[:], 0644)
//log.Debugf("CAP: %v\n", buf)
return err
}
func ngapDataHandle(emsg []byte, timestamp int64, data []byte) int {
filePath := fmt.Sprintf("/tmp/ng%d.pcap", timestamp)
err := WriteEmptyPcap(filePath, timestamp, len(data), data)
if err != nil {
log.Error("Failed to tshark:", err)
return -1
} else {
execTshark(filePath, "ngap")
}
return 0
}
func pfcpDataHandle(emsg []byte, timestamp int64, data []byte) int {
filePath := fmt.Sprintf("/tmp/pf%d.pcap", timestamp)
err := WriteEmptyPcap(filePath, timestamp, len(data), data)
if err != nil {
log.Error("Failed to tshark:", err)
} else {
execTshark(filePath, "pfcp")
}
return 0
}
func httpDataHandle(emsg []byte, timestamp int64, data []byte) int {
filePath := fmt.Sprintf("/tmp/sb%d.pcap", timestamp)
err := WriteEmptyPcap(filePath, timestamp, len(data), data)
if err != nil {
log.Error("Failed to tshark:", err)
} else {
execTshark(filePath, "http2")
}
return 0
}
func httpHeaderDataHandle(emsg []byte, header []byte, data []byte) int {
var remainSize = uint32(16 << 20)
var sawRegular bool
var invalid bool // pseudo header field errors
var Fields []hpack.HeaderField
invalid = false
hdec := hpack.NewDecoder(4096, nil)
hdec.SetEmitEnabled(true)
hdec.SetMaxStringLength(int(16 << 20))
hdec.SetEmitFunc(func(hf hpack.HeaderField) {
if !httpguts.ValidHeaderFieldValue(hf.Value) {
// Don't include the value in the error, because it may be sensitive.
invalid = true
}
isPseudo := strings.HasPrefix(hf.Name, ":")
if isPseudo {
if sawRegular {
invalid = true
}
} else {
sawRegular = true
if !validWireHeaderFieldName(hf.Name) {
invalid = true
}
}
if invalid {
hdec.SetEmitEnabled(false)
return
}
size := hf.Size()
if size > remainSize {
hdec.SetEmitEnabled(false)
//mh.Truncated = true
return
}
remainSize -= size
Fields = append(Fields, hf)
})
// defer hdec.SetEmitFunc(func(hf hpack.HeaderField) {})
frag := header
if _, err := hdec.Write(frag); err != nil {
return -1
}
if err := hdec.Close(); err != nil {
return -1
}
hdec.SetEmitFunc(func(hf hpack.HeaderField) {})
var headers []byte
var line string
for i := range Fields {
line = fmt.Sprintf("\"%s\":\"%s\",", Fields[i].Name, Fields[i].Value)
headers = append(headers, []byte(line)...)
}
if data != nil && len(data) > 0 {
encode := fmt.Sprintf("%s \"content\":%s\n", string(headers), string(data))
emsg = append(emsg, []byte(encode)...)
log.Debug("encode:", string(encode))
} else {
log.Debug("headers:", string(headers))
emsg = append(emsg, []byte(headers)...)
}
return 0
}
func gtpuHandler(rvMsg []byte, rvLen int) {
var extHdr ExtHeader
var tr dborm.TraceData
var off, ret int
msg := rvMsg
verFlags := msg[GTPU_HEADER_VERSION_INDEX]
gtpuHdrLen := GTPU_HEADER_LEN
localTeid := binary.BigEndian.Uint32(msg[GTPU_HEADER_TEID_INDEX:])
extHdr.TaskId = localTeid
if (verFlags & GTPU_E_S_PB_BIT) != 0 {
if (verFlags & GTPU_E_BI) != 0 {
extTypeIndex := GTPU_HEADER_LEN - 1
extType := msg[extTypeIndex]
if extType == 0xFE {
extHdr.IMSI = string(msg[extTypeIndex+2 : extTypeIndex+17])
extHdr.IfType = msg[extTypeIndex+17]
extHdr.MsgType = msg[extTypeIndex+18]
extHdr.MsgDirect = msg[extTypeIndex+19]
extHdr.TimeStamp = int64(binary.BigEndian.Uint64(msg[extTypeIndex+19:]))
log.Debugf("ext info %v %s %d %d %d", msg[(extTypeIndex+2):(extTypeIndex+20)], extHdr.IMSI, extHdr.IfType, extHdr.MsgType, extHdr.MsgDirect)
// set offset of IP Packet
off = 40 + 4
//src ip: msg+40+12
extHdr.SrcIP = fmt.Sprintf("%d.%d.%d.%d", msg[off+12], msg[off+13], msg[off+14], msg[off+15])
//dst ip: msg+40+12+4
extHdr.DstIP = fmt.Sprintf("%d.%d.%d.%d", msg[off+16], msg[off+17], msg[off+18], msg[off+19])
extHdr.SrcPort = uint16(binary.BigEndian.Uint16(msg[off+20:]))
extHdr.DstPort = uint16(binary.BigEndian.Uint16(msg[off+22:]))
log.Debugf("info %s:%d %s:%d", extHdr.SrcIP, extHdr.SrcPort, extHdr.DstIP, extHdr.DstPort)
// ip header start msg+40
tr.TaskID = int(extHdr.TaskId)
tr.Timestamp = extHdr.TimeStamp
tr.Imsi = extHdr.IMSI
tr.IfType = int(extHdr.IfType)
tr.SrcAddr = fmt.Sprintf("%s:%d", extHdr.SrcIP, extHdr.SrcPort)
tr.DstAddr = fmt.Sprintf("%s:%d", extHdr.DstIP, extHdr.DstPort)
tr.MsgType = int(extHdr.MsgType)
tr.MsgDirect = int(extHdr.MsgDirect)
tr.Length = int(rvLen - off)
tr.RawMsg = make([]byte, int(rvLen-off))
copy(tr.RawMsg, []byte(msg[off:]))
extHdr.Proto = int(msg[off+9])
if extHdr.Proto == 132 { //SCTP
extHdr.PPI = int(msg[off+47])
extHdr.DataLen = uint16(binary.BigEndian.Uint16(msg[(off+34):]) - 16)
log.Debugf("dat len %d %d", extHdr.DataLen, extHdr.PPI)
if extHdr.PPI == 60 { // NGAP
extHdr.DataInfo = make([]byte, extHdr.DataLen)
copy(extHdr.DataInfo, msg[(off+48):])
//append(extHdr.DataInfo, msg[88:]...)
log.Debugf("dataInfo %v", extHdr.DataInfo)
ret = ngapDataHandle([]byte(tr.DecMsg), tr.Timestamp, tr.RawMsg)
}
} else if extHdr.Proto == 6 { // TCP
iplen := uint16(binary.BigEndian.Uint16(msg[off+2:]))
tcplen := uint16(iplen - 32 - 20)
hdrlen := uint16(binary.BigEndian.Uint16(msg[off+20+32+1:]))
offset := uint16(off + 52)
log.Debugf("HTTP %d %d %d\n", iplen, tcplen, hdrlen)
extHdr.DataLen = tcplen
extHdr.DataInfo = make([]byte, extHdr.DataLen)
copy(extHdr.DataInfo, msg[offset:])
//ret = httpDataHandle(tr.DecodedMsg, tr.Timestamp, tr.RawMsg)
if tcplen > (hdrlen + 9) { // has data
doffset := uint16(offset + hdrlen + 9)
datlen := uint16(binary.BigEndian.Uint16(msg[doffset+1:]))
log.Debugf("HTTP datlen %d", datlen)
ret = httpHeaderDataHandle([]byte(tr.DecMsg), msg[offset+9:offset+9+hdrlen], msg[doffset+9:doffset+datlen+9])
} else {
ret = httpHeaderDataHandle([]byte(tr.DecMsg), msg[offset+9:hdrlen], nil)
}
} else if extHdr.Proto == 17 { // UDP
ilen := uint16(binary.BigEndian.Uint16(msg[off+2:]))
udplen := uint16(ilen - 20)
extHdr.DataLen = udplen - 8
extHdr.DataInfo = make([]byte, extHdr.DataLen)
copy(extHdr.DataInfo, msg[off+27:])
ret = pfcpDataHandle([]byte(tr.DecMsg), tr.Timestamp, tr.RawMsg)
}
if ret < 0 {
log.Error("Decode message error")
} else {
PutTraceRecordToDB(&tr)
}
}
for extType != 0 && extTypeIndex < rvLen {
extLen := msg[extTypeIndex+1] << 2
if extLen == 0 {
log.Error("error, extLen is zero")
return
}
gtpuHdrLen += int(extLen)
extTypeIndex += int(extLen)
extType = msg[extTypeIndex]
}
}
} else {
gtpuHdrLen -= 4
}
}
func udpProcess(conn *net.UDPConn) {
data := make([]byte, 2048)
n, _, err := conn.ReadFromUDP(data)
if err != nil {
log.Error("failed read udp msg, error: " + err.Error())
}
gtpuHandler(data, n)
//str := string(data[:n])
//log.Error("receive from client, data:" + str)
<-limitChan
}
func udpServer(address string) {
udpAddr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
log.Error("Failed to ResolveUDPAddr:", err)
os.Exit(1)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
log.Error("read from connect failed, err:", err)
os.Exit(1)
}
defer conn.Close()
for {
limitChan <- true
go udpProcess(conn)
}
}
//SCTP OFFSET = 48
//HTTP2 OFFSET = 52
//UDP OFFSET = 28
func PutTraceRecordToDB(tr *dborm.TraceData) error {
var offset, ret int
if tr.IfType == 1 || tr.IfType == 2 { // SCTP
offset = 48
ret = ngapDataHandle([]byte(tr.DecMsg), tr.Timestamp, tr.RawMsg[offset:])
} else if tr.IfType == 4 { // UDP
offset = 28
ret = pfcpDataHandle([]byte(tr.DecMsg), tr.Timestamp, tr.RawMsg[offset:])
} else { // HTTP2
iplen := uint16(binary.BigEndian.Uint16(tr.RawMsg[2:]))
tcplen := uint16(iplen - 32 - 20)
hdrlen := uint16(binary.BigEndian.Uint16(tr.RawMsg[20+32+1:]))
offset = 52
if tcplen > (hdrlen + 9) { // has data
doffset := uint16(uint16(offset) + hdrlen + 9)
datlen := uint16(binary.BigEndian.Uint16(tr.RawMsg[doffset+1:]))
log.Debugf("HTTP datlen %d\n", datlen)
ret = httpHeaderDataHandle([]byte(tr.DecMsg), tr.RawMsg[offset+9:offset+9+int(hdrlen)], tr.RawMsg[doffset+9:doffset+datlen+9])
} else {
ret = httpHeaderDataHandle([]byte(tr.DecMsg), tr.RawMsg[offset+9:hdrlen], nil)
}
}
if ret == 0 {
_, err := dborm.XormInsertTraceData(tr)
if err != nil {
log.Error("Failed to dborm.XormInsertTraceData:", err)
return err
}
}
return nil
}
// ////////////////
// var rdb *redis.Client
// var redisOn bool
// var rurl string
//
// func RdbInitClient(Url string) (err error) {
// rurl = Url
// rdb = redis.NewClient(&redis.Options{
// Addr: rurl,
// Password: "", // no password set
// DB: 0, // use default
// })
//
// ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// defer cancel()
//
// _, err = rdb.Ping(ctx).Result()
// if err != nil {
// log.Debugf("db connect failed\n")
// return err
// }
// return nil
// }
//
// func RdbKeys(filter string) (vals []string, err error) {
// ctx := context.Background()
//
// vals, err = rdb.Keys(ctx, filter).Result()
//
// if err != nil {
// log.Error("db: Keys ", err.Error())
// return nil, err
// }
//
// return vals, nil
// }
//
// func RdbHGetAll(key string) (kvs map[string]string, err error) {
// ctx := context.Background()
// kvs, err = rdb.HGetAll(ctx, key).Result()
//
// if err != nil {
// log.Error("db: HGetAll ", err.Error())
// return nil, err
// }
//
// return kvs, nil
// }
//
// func RdbHMSet(key string, kvs map[string]interface{}) (err error) {
// ctx := context.Background()
// err = rdb.HMSet(ctx, key, kvs).Err()
//
// if err != nil {
// log.Error("db: HMSet ", err.Error())
// return err
// }
//
// return nil
// }
//
// func RdbDel(key string) (err error) {
// ctx := context.Background()
// err = rdb.Del(ctx, key).Err()
//
// if err != nil {
// log.Error("db: Del ", err.Error())
// return err
// }
//
// return nil
// }
//
// func rdbClient(url string) {
// var err error
// var kvs map[string]string
// var vals []string
// var tr TraceRecord
//
// err = RdbInitClient(url)
//
// if err != nil {
// log.Debugf("db: RdbInitClient err\n")
// os.Exit(2)
// }
//
// for {
// vals, err = RdbKeys("tsk*") // (vals []string, err error)
// if err == nil {
// for i := range vals {
// kvs, err = RdbHGetAll(vals[i]) //(kvs map[string]string, err error)
// if err == nil {
// //tsk-1:1682764180993584177:460000100000001:8
// log.Debugf("%d: %s %s %s %v\n", i, vals[i], kvs["srcip"], kvs["dstip"], []byte(kvs["ipdat"]))
// arr := strings.Split(vals[i], ":")
// if arr != nil && len(arr) == 4 {
// tr.Taskid, _ = strconv.Atoi(arr[0][4:])
// tr.Timestamp, _ = strconv.ParseInt(arr[1], 10, 64)
//
// tr.Imsi = arr[2]
// tr.IfType, _ = strconv.Atoi(arr[3])
//
// }
// tr.SrcAddr = fmt.Sprintf("%s:d", kvs["srcip"], kvs["srcport"])
// tr.DstAddr = fmt.Sprintf("%s:d", kvs["dstip"], kvs["dstport"])
// tr.MsgType, _ = strconv.Atoi(kvs["msgtp"])
// tr.MsgDirect, _ = strconv.Atoi(kvs["direct"])
// rawlen, _ := strconv.Atoi(kvs["datlen"])
// tr.RawMsg = make([]byte, rawlen)
// copy(tr.RawMsg, []byte(kvs["ipdat"]))
// //tr.DecodedMsg
// PutTraceRecordToDB(&tr)
//
// //RdbDel(vals[i])
// }
//
// }
// }
// time.Sleep(time.Second * 5)
// }
// }
// DataBase Connnection
//func QueryMultiRowDemo() {
// //InitMysql()
// sqlStr := "SELECT id,sname,age FROM student WHERE id = ?"
// rows, err := dbc.Query(sqlStr, 1)
// if err != nil {
// log.Debugf("query failed, err:%v\n", err)
// return
// }
// // 非常重要关闭rows释放持有的数据库链接
// defer rows.Close()
//
// // 循环读取结果集中的数据
// for rows.Next() {
// var u User
// err := rows.Scan(&u.id, &u.name, &u.age)
// if err != nil {
// log.Debugf("scan failed, err:%v\n", err)
// return
// }
// //log.Debugf("id:%d name:%s age:%d\n", u.id, u.name, u.age)
// }
//}
func ToHtml(path string, timestamp string, port string, ipDaTA []byte) {
log.Trace("byte数据:", ipDaTA)
encodedStr := hex.EncodeToString(ipDaTA)
// [72 101 108 108 111]
log.Trace(encodedStr)
command := fmt.Sprintf("/usr/local/bin/data2html -f %s -t %s -i %s -d %s", path, timestamp, port, encodedStr)
log.Trace("commm:", command)
cmd := exec.Command("sh", "-c", command)
_, err := cmd.CombinedOutput()
if err != nil {
log.Error("Error:can not obtain stdout pipe for command:", err)
return
}
//执行命令
}
func main() {
conf := config.GetYamlConfig()
log.InitLogger(conf.Logger.File, conf.Logger.Duration, conf.Logger.Count, "omc:captrace", config.GetLogLevel())
log.Debugf("OMC captrace version: %s\n", global.Version)
log.Infof("========================= OMC captrace startup =========================")
log.Infof("OMC captrace version: %s %s %s", global.Version, global.BuildTime, global.GoVer)
err := dborm.InitDbClient(conf.Database.Type, conf.Database.User, conf.Database.Password,
conf.Database.Host, conf.Database.Port, conf.Database.Name)
if err != nil {
fmt.Println("dborm.initDbClient err:", err)
os.Exit(2)
}
var wg sync.WaitGroup
wg.Add(1)
udpServer(conf.Gtp.Addr)
wg.Wait()
}