From 7fca227d070bd786f6c2efaf10d51a99431f1f78 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Thu, 12 Sep 2024 11:51:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0socket=E5=B7=A5?= =?UTF-8?q?=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/framework/socket/tcp_client.go | 96 ++++++++++++++++++++++++++++++ src/framework/socket/tcp_server.go | 83 ++++++++++++++++++++++++++ src/framework/socket/udp_client.go | 96 ++++++++++++++++++++++++++++++ src/framework/socket/udp_server.go | 80 +++++++++++++++++++++++++ 4 files changed, 355 insertions(+) create mode 100644 src/framework/socket/tcp_client.go create mode 100644 src/framework/socket/tcp_server.go create mode 100644 src/framework/socket/udp_client.go create mode 100644 src/framework/socket/udp_server.go diff --git a/src/framework/socket/tcp_client.go b/src/framework/socket/tcp_client.go new file mode 100644 index 00000000..fb8899b5 --- /dev/null +++ b/src/framework/socket/tcp_client.go @@ -0,0 +1,96 @@ +package socket + +import ( + "bytes" + "fmt" + "net" + "strings" + "time" +) + +// ConnTCP 连接TCP客户端 +type ConnTCP struct { + Addr string `json:"addr"` // 主机地址 + Port int64 `json:"port"` // 端口 + + DialTimeOut time.Duration `json:"dialTimeOut"` // 连接超时断开 + + Client *net.Conn `json:"client"` + LastResult string `json:"lastResult"` // 记最后一次发送消息的结果 +} + +// New 创建TCP客户端 +func (c *ConnTCP) New() (*ConnTCP, error) { + // IPV6地址协议 + proto := "tcp" + if strings.Contains(c.Addr, ":") { + proto = "tcp6" + c.Addr = fmt.Sprintf("[%s]", c.Addr) + } + address := fmt.Sprintf("%s:%d", c.Addr, c.Port) + + // 默认等待5s + if c.DialTimeOut == 0 { + c.DialTimeOut = 5 * time.Second + } + + // 连接到服务端 + client, err := net.DialTimeout(proto, address, c.DialTimeOut) + if err != nil { + return nil, err + } + + c.Client = &client + return c, nil +} + +// Close 关闭当前TCP客户端 +func (c *ConnTCP) Close() { + if c.Client != nil { + (*c.Client).Close() + } +} + +// Send 发送消息 +func (c *ConnTCP) Send(msg []byte, timer time.Duration) (string, error) { + if c.Client == nil { + return "", fmt.Errorf("tcp client not connected") + } + conn := *c.Client + + // 写入信息 + if len(msg) > 0 { + if _, err := conn.Write(msg); err != nil { + return "", err + } + } + + var buf bytes.Buffer + defer buf.Reset() + + tmp := make([]byte, 1024) + for { + select { + case <-time.After(timer): + c.LastResult = buf.String() + return c.LastResult, fmt.Errorf("timeout") + default: + // 读取命令消息 + n, err := conn.Read(tmp) + if n == 0 || err != nil { + tmp = nil + break + } + + tmpStr := string(tmp[:n]) + buf.WriteString(tmpStr) + + // 是否有终止符 + if strings.HasSuffix(tmpStr, ">") || strings.HasSuffix(tmpStr, "> ") || strings.HasSuffix(tmpStr, "# ") { + tmp = nil + c.LastResult = buf.String() + return c.LastResult, nil + } + } + } +} diff --git a/src/framework/socket/tcp_server.go b/src/framework/socket/tcp_server.go new file mode 100644 index 00000000..52a9b960 --- /dev/null +++ b/src/framework/socket/tcp_server.go @@ -0,0 +1,83 @@ +package socket + +import ( + "fmt" + "net" + "strings" + + "be.ems/src/framework/logger" +) + +// SocketTCP TCP服务端 +type SocketTCP struct { + Addr string `json:"addr"` // 主机地址 + Port int64 `json:"port"` // 端口 + Listen *net.Listener `json:"listen"` + StopChan chan struct{} `json:"stop"` // 停止信号 +} + +// New 创建TCP服务端 +func (s *SocketTCP) New() (*SocketTCP, error) { + // IPV6地址协议 + proto := "tcp" + if strings.Contains(s.Addr, ":") { + proto = "tcp6" + s.Addr = fmt.Sprintf("[%s]", s.Addr) + } + address := fmt.Sprintf("%s:%d", s.Addr, s.Port) + + ln, err := net.Listen(proto, address) + if err != nil { + return nil, err + } + + s.Listen = &ln + s.StopChan = make(chan struct{}, 1) + return s, nil +} + +// Close 关闭当前TCP服务端 +func (s *SocketTCP) Close() { + if s.Listen != nil { + s.StopChan <- struct{}{} + (*s.Listen).Close() + } +} + +// Resolve 处理消息 +func (s *SocketTCP) Resolve(bufferSize int, callback func([]byte, int)) error { + if s.Listen == nil { + return fmt.Errorf("tcp service not created") + } + + ln := *s.Listen + buffer := make([]byte, bufferSize) + + for { + select { + case <-s.StopChan: + return fmt.Errorf("udp service stop") + default: + conn, err := ln.Accept() + if err != nil { + logger.Errorf("Error accepting connection: %v ", err) + continue + } + defer conn.Close() + + // 读取数据 + n, err := conn.Read(buffer) + if err != nil { + fmt.Println("Error reading from TCP connection:", err) + continue + } + + callback(buffer, n) + + // 发送响应 + if _, err = conn.Write([]byte("tcp>")); err != nil { + fmt.Println("Error sending response:", err) + } + } + } +} diff --git a/src/framework/socket/udp_client.go b/src/framework/socket/udp_client.go new file mode 100644 index 00000000..ecaa4864 --- /dev/null +++ b/src/framework/socket/udp_client.go @@ -0,0 +1,96 @@ +package socket + +import ( + "bytes" + "fmt" + "net" + "strings" + "time" +) + +// ConnUDP 连接UDP客户端 +type ConnUDP struct { + Addr string `json:"addr"` // 主机地址 + Port int64 `json:"port"` // 端口 + + DialTimeOut time.Duration `json:"dialTimeOut"` // 连接超时断开 + + Client *net.Conn `json:"client"` + LastResult string `json:"lastResult"` // 记最后一次发送消息的结果 +} + +// New 创建UDP客户端 +func (c *ConnUDP) New() (*ConnUDP, error) { + // IPV6地址协议 + proto := "udp" + if strings.Contains(c.Addr, ":") { + proto = "udp6" + c.Addr = fmt.Sprintf("[%s]", c.Addr) + } + address := fmt.Sprintf("%s:%d", c.Addr, c.Port) + + // 默认等待5s + if c.DialTimeOut == 0 { + c.DialTimeOut = 5 * time.Second + } + + // 连接到服务端 + client, err := net.DialTimeout(proto, address, c.DialTimeOut) + if err != nil { + return nil, err + } + + c.Client = &client + return c, nil +} + +// Close 关闭当前UDP客户端 +func (c *ConnUDP) Close() { + if c.Client != nil { + (*c.Client).Close() + } +} + +// Send 发送消息 +func (c *ConnUDP) Send(msg []byte, ms int) (string, error) { + if c.Client == nil { + return "", fmt.Errorf("udp client not connected") + } + conn := *c.Client + + // 写入信息 + if len(msg) > 0 { + if _, err := conn.Write(msg); err != nil { + return "", err + } + } + + var buf bytes.Buffer + defer buf.Reset() + + tmp := make([]byte, 1024) + for { + select { + case <-time.After(time.Duration(time.Duration(ms).Milliseconds())): + c.LastResult = buf.String() + return c.LastResult, fmt.Errorf("timeout") + default: + // 读取命令消息 + n, err := conn.Read(tmp) + if n == 0 || err != nil { + tmp = nil + break + } + + tmpStr := string(tmp[:n]) + buf.WriteString(tmpStr) + + // 是否有终止符 + if strings.HasSuffix(tmpStr, ">") || strings.HasSuffix(tmpStr, "> ") || strings.HasSuffix(tmpStr, "# ") { + tmp = nil + c.LastResult = buf.String() + return c.LastResult, nil + } + } + } +} diff --git a/src/framework/socket/udp_server.go b/src/framework/socket/udp_server.go new file mode 100644 index 00000000..d54b957a --- /dev/null +++ b/src/framework/socket/udp_server.go @@ -0,0 +1,80 @@ +package socket + +import ( + "fmt" + "net" + "strings" +) + +// SocketUDP UDP服务端 +type SocketUDP struct { + Addr string `json:"addr"` // 主机地址 + Port int64 `json:"port"` // 端口 + Conn *net.UDPConn `json:"conn"` + StopChan chan struct{} `json:"stop"` // 停止信号 +} + +// New 创建UDP服务端 +func (s *SocketUDP) New() (*SocketUDP, error) { + // IPV6地址协议 + proto := "udp" + if strings.Contains(s.Addr, ":") { + proto = "udp6" + s.Addr = fmt.Sprintf("[%s]", s.Addr) + } + address := fmt.Sprintf("%s:%d", s.Addr, s.Port) + + // 解析 UDP 地址 + udpAddr, err := net.ResolveUDPAddr(proto, address) + if err != nil { + return nil, err + } + + // 监听 UDP 地址 + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, err + } + + s.Conn = conn + s.StopChan = make(chan struct{}, 1) + return s, nil +} + +// CloseService 关闭当前UDP服务端 +func (s *SocketUDP) Close() { + if s.Conn != nil { + s.StopChan <- struct{}{} + (*s.Conn).Close() + } +} + +// Resolve 处理消息 +func (s *SocketUDP) Resolve(bufferSize int, callback func([]byte, int)) error { + if s.Conn == nil { + return fmt.Errorf("udp service not created") + } + + buffer := make([]byte, bufferSize) + + for { + select { + case <-s.StopChan: + return fmt.Errorf("udp service stop") + default: + // 读取数据 + n, addr, err := s.Conn.ReadFromUDP(buffer) + if err != nil { + fmt.Println("Error reading from UDP connection:", err) + continue + } + + callback(buffer, n) + + // 发送响应 + if _, err = s.Conn.WriteToUDP([]byte("udp>"), addr); err != nil { + fmt.Println("Error sending response:", err) + } + } + } +}