package main import ( "bytes" "encoding/binary" "encoding/hex" "errors" "fmt" "io" "net" "omc/handle/model" "time" "github.com/aceld/zinx/zconf" "github.com/aceld/zinx/zlog" "gorm.io/driver/mysql" "gorm.io/gorm" ) type Message struct { StartSign uint16 MsgType uint8 TimeStamp uint32 LenOfBody uint16 Value []byte } type TcpClient struct { conn net.Conn PID int32 isOnline chan bool } func (this *TcpClient) Unpack(headdata []byte) (head *Message, err error) { fmt.Println("unpack:", hex.EncodeToString(headdata)) headBuf := bytes.NewReader(headdata) head = &Message{} // Read the startSign if err := binary.Read(headBuf, binary.BigEndian, &head.StartSign); err != nil { return nil, err } // Read the msgID if err := binary.Read(headBuf, binary.BigEndian, &head.MsgType); err != nil { return nil, err } // read timeStamp if err := binary.Read(headBuf, binary.BigEndian, &head.TimeStamp); err != nil { return nil, err } // Read the data length if err := binary.Read(headBuf, binary.BigEndian, &head.LenOfBody); err != nil { return nil, err } // Check whether the data length exceeds the maximum allowed packet size // (判断dataLen的长度是否超出我们允许的最大包长度) if zconf.GlobalObject.MaxPacketSize > 0 && uint32(head.LenOfBody) > zconf.GlobalObject.MaxPacketSize { return nil, errors.New("too large msg data received") } return head, nil } func (this *TcpClient) Pack(msgID uint32, dataBytes []byte) (out []byte, err error) { outbuff := bytes.NewBuffer([]byte{}) // Write the oxffff if err := binary.Write(outbuff, binary.BigEndian, uint16(0xffff)); err != nil { return nil, err } //Write the type if err := binary.Write(outbuff, binary.BigEndian, uint8(msgID)); err != nil { return nil, err } //Write the timestamp if err := binary.Write(outbuff, binary.BigEndian, uint32(0x999999)); err != nil { return nil, err } //Write the length if err := binary.Write(outbuff, binary.BigEndian, uint16(len(dataBytes))); err != nil { return nil, err } // Write the data if err := binary.Write(outbuff, binary.BigEndian, dataBytes); err != nil { return nil, err } out = outbuff.Bytes() return } func (this *TcpClient) SendMsg(msgID uint32, data []byte) { sendData, err := this.Pack(msgID, data) if err == nil { _, err := this.conn.Write(sendData) fmt.Println("send msg:", hex.EncodeToString(sendData), " err:", err) } else { fmt.Println(err) } return } func (this *TcpClient) Receive() { for { //读取服务端发来的数据 ==》 SyncPID //1.读取8字节 ////第一次读取,读取数据头 headData := make([]byte, 9) if _, err := io.ReadFull(this.conn, headData); err != nil { fmt.Println(err) return } pkgHead, err := this.Unpack(headData) if err != nil { fmt.Println("Unpack", err) return } //data if pkgHead.LenOfBody > 0 { pkgHead.Value = make([]byte, pkgHead.LenOfBody) if _, err := io.ReadFull(this.conn, pkgHead.Value); err != nil { return } } //处理服务器回执业务 fmt.Println("=================================") fmt.Println("StartSign:", pkgHead.StartSign) fmt.Println("MsgType:", pkgHead.MsgType) fmt.Println("TimeStamp:", pkgHead.TimeStamp) fmt.Println("LenOfBody:", pkgHead.LenOfBody) fmt.Println("Value:", string(pkgHead.Value)) fmt.Println("=================================") time.Sleep(10 * time.Microsecond) } } func (this *TcpClient) Start() { //登录 data := "reqLoginAlarm;user=audit;key=omc@password;type=ftp" this.SendMsg(0x01, []byte(data)) //发送同步告警信息 //data = "reqSyncAlarmMsg;reqId=33;alarmSeq=1" //this.SendMsg(omc.ReqSyncAlarmMsg, []byte(data)) //发送文件同步告警 //data = "reqSyncAlarmFile;reqId=35;alarmSeq=2000;syncSource=1" //data = "reqSyncAlarmFile;reqId=33;startTime=2023-01-08 00:00:00;syncSource=0" data = "reqSyncAlarmFile;reqId=34;startTime=2023-01-08 16:07:00;endTime=2023-07-19 23:59:59;syncSource=1" this.SendMsg(11, []byte(data)) go this.Receive() } func DataMock() { conf := "root:1000omc@kp!@tcp(192.168.4.130:33066)/omc_db?charset=utf8mb4&parseTime=True&loc=Local" d, err := gorm.Open(mysql.Open(conf), &gorm.Config{}) if err != nil { zlog.Ins().ErrorF("open mysql %s error, ", conf, err) panic(err) } var alarms []model.Alarm if err := d.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ? and alarm_seq >= ?", "SMF", "SZ_01", 0).Order("alarm_seq asc").Limit(5).Find(&alarms).Error; err != nil { zlog.Ins().ErrorF("open mysql %s error, ", conf, err) return } var alarm model.Alarm if err := d.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ?", "SMF", "SZ_01").Order("alarm_seq desc").First(&alarm).Error; err != nil { zlog.Ins().ErrorF("db error %s", err) return } index := alarm.AlarmSeq + 1 for { for _, v := range alarms { v.AlarmSeq = index v.Id = 0 d.Create(&v) index++ } time.Sleep(10 * time.Microsecond) } } func NewTcpClient(ip string, port int) *TcpClient { addrStr := fmt.Sprintf("%s:%d", ip, port) conn, err := net.Dial("tcp", addrStr) if err != nil { fmt.Println("net.Dial err: ", err) panic(err) } client := &TcpClient{ conn: conn, PID: 0, isOnline: make(chan bool), } fmt.Println(fmt.Sprintf("conn: %+v. Connected to server...", conn)) return client } func main() { client := NewTcpClient("192.168.4.130", 31232) client.Start() //DataMock() select {} }