faet: 新增WS模块

This commit is contained in:
TsMask
2024-01-23 18:06:44 +08:00
parent 413f0b4951
commit 89499c9d28
12 changed files with 673 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
package service
import (
"net/http"
"ems.agt/src/modules/ws/model"
"github.com/gorilla/websocket"
)
// IWS WebSocket通信 服务层接口
type IWS interface {
// UpgraderWs http升级ws请求
UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn
// NewClient 新建客户端 uid 登录用户ID
NewClient(uid string, gids []string, conn *websocket.Conn) *model.WSClient
// CloseClient 客户端关闭
CloseClient(clientID string)
}

View File

@@ -0,0 +1,207 @@
package service
import (
"encoding/json"
"net/http"
"sync"
"time"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/utils/generate"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model"
"github.com/gorilla/websocket"
)
var (
// ws客户端 [clientId: client]
WsClients = sync.Map{}
// ws用户对应的多个客户端id [uid:clientIds]
WsUsers = sync.Map{}
// ws组对应的多个用户id [groupID:uids]
WsGroup = sync.Map{}
)
// 实例化服务层 WSImpl 结构体
var NewWSImpl = &WSImpl{}
// WSImpl WebSocket通信 服务层处理
type WSImpl struct{}
// UpgraderWs http升级ws请求
func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn {
wsUpgrader := websocket.Upgrader{
// 设置消息发送缓冲区大小byte如果这个值设置得太小可能会导致服务端在发送大型消息时遇到问题
WriteBufferSize: 1024,
// 消息包启用压缩
EnableCompression: true,
// ws握手超时时间
HandshakeTimeout: 5 * time.Second,
// ws握手过程中允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("ws Upgrade err: %s", err.Error())
}
return conn
}
// NewClient 新建客户端 uid 登录用户ID
func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) *model.WSClient {
// clientID也可以用其他方式生成只要能保证在所有服务端中都能保证唯一即可
clientID := generate.Code(16)
wsClient := &model.WSClient{
ID: clientID,
Conn: conn,
LastHeartbeat: time.Now().UnixMilli(),
BindUid: uid,
SubGroup: groupIDs,
MsgChan: make(chan []byte, 100),
StopChan: make(chan struct{}, 1), // 请求卡死循环标记
}
// 存入客户端
WsClients.Store(clientID, wsClient)
// 存入用户持有客户端
if uid != "" {
if v, ok := WsUsers.Load(uid); ok {
uidClientIds := v.(*[]string)
*uidClientIds = append(*uidClientIds, clientID)
} else {
WsUsers.Store(uid, &[]string{clientID})
}
}
// 存入用户订阅组
if uid != "" && len(groupIDs) > 0 {
for _, groupID := range groupIDs {
if v, ok := WsGroup.Load(groupID); ok {
groupUIDs := v.(*[]string)
*groupUIDs = append(*groupUIDs, uid)
} else {
WsGroup.Store(groupID, &[]string{uid})
}
}
}
go s.clientRead(wsClient)
go s.clientWrite(wsClient)
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": clientID,
}))
wsClient.MsgChan <- msgByte
return wsClient
}
// clientRead 客户端读取消息
func (s *WSImpl) clientRead(wsClient *model.WSClient) {
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.CloseClient(wsClient.ID)
return
}
// 文本和二进制类型只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
err := json.Unmarshal(msg, &reqMsg)
if err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format not supported"))
wsClient.MsgChan <- msgByte
} else {
err := NewWSReceiveImpl.Receive(wsClient, reqMsg)
if err != nil {
logger.Warnf("ws ReceiveMessage UID %s err: %s", wsClient.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
wsClient.MsgChan <- msgByte
}
}
}
}
}
// clientWrite 客户端写入消息
func (s *WSImpl) clientWrite(wsClient *model.WSClient) {
ticker := time.NewTicker(time.Second * 5) // 设置心跳间隔为 5 秒钟
defer ticker.Stop()
for {
select {
case <-ticker.C:
wsClient.LastHeartbeat = time.Now().UnixMilli()
// 发送 Ping 消息
err := wsClient.Conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
logger.Warnf("ws PingMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.CloseClient(wsClient.ID)
return
}
case msg := <-wsClient.MsgChan:
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.CloseClient(wsClient.ID)
return
}
}
}
}
// CloseClient 客户端关闭
func (s *WSImpl) CloseClient(clientID string) {
v, ok := WsClients.Load(clientID)
if !ok {
return
}
client := v.(*model.WSClient)
defer func() {
client.Conn.WriteMessage(websocket.CloseMessage, []byte{})
client.Conn.Close()
client.StopChan <- struct{}{}
WsClients.Delete(clientID)
}()
// 客户端断线时自动踢出Uid绑定列表
if client.BindUid != "" {
if clientIds, ok := WsUsers.Load(client.BindUid); ok {
uidClientIds := clientIds.(*[]string)
if len(*uidClientIds) > 0 {
for i, clientId := range *uidClientIds {
if clientId == client.ID {
*uidClientIds = append((*uidClientIds)[:i], (*uidClientIds)[i+1:]...)
}
}
}
}
}
// 客户端断线时自动踢出已加入的组
if client.BindUid != "" && len(client.SubGroup) > 0 {
for _, groupID := range client.SubGroup {
uids, ok := WsGroup.Load(groupID)
if !ok {
continue
}
groupUIDs := uids.(*[]string)
if len(*groupUIDs) > 0 {
for i, v := range *groupUIDs {
if v == client.BindUid {
*groupUIDs = append((*groupUIDs)[:i], (*groupUIDs)[i+1:]...)
}
}
}
}
}
}

View File

@@ -0,0 +1,9 @@
package service
import "ems.agt/src/modules/ws/model"
// IWSReceive WebSocket消息接收处理 服务层接口
type IWSReceive interface {
// Receive 接收处理
Receive(client *model.WSClient, reqMsg model.WSRequest) error
}

View File

@@ -0,0 +1,30 @@
package service
import (
"fmt"
"ems.agt/src/modules/ws/model"
"ems.agt/src/modules/ws/processor"
)
// 实例化服务层 WSReceiveImpl 结构体
var NewWSReceiveImpl = &WSReceiveImpl{}
// WSReceiveImpl WebSocket消息接收处理 服务层处理
type WSReceiveImpl struct{}
// Receive 接收处理
func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) error {
fmt.Println(client.ID, reqMsg)
switch reqMsg.Type {
case "ps":
res, err := processor.GetProcessData(reqMsg.Data)
if err != nil {
return err
}
client.MsgChan <- res
default:
return fmt.Errorf("message type not supported")
}
return nil
}

View File

@@ -0,0 +1,10 @@
package service
// IWSSend WebSocket消息发送处理 服务层接口
type IWSSend interface {
// ByClientID 给已知客户端发消息
ByClientID(clientID string, data any) error
// ByGroupID 给订阅组的用户发送消息
ByGroupID(gid string, data any) error
}

View File

@@ -0,0 +1,72 @@
package service
import (
"encoding/json"
"fmt"
"ems.agt/src/modules/ws/model"
)
const (
// 组号-其他
GROUP_OTHER = "0"
// 组号-指标
GROUP_KPI = "1000"
// 组号-会话记录
GROUP_CDR = "1005"
)
// 实例化服务层 WSSendImpl 结构体
var NewWSSendImpl = &WSSendImpl{}
// IWSSend WebSocket消息发送处理 服务层处理
type WSSendImpl struct{}
// ByClientID 给已知客户端发消息
func (s *WSSendImpl) ByClientID(clientID string, data any) error {
v, ok := WsClients.Load(clientID)
if !ok {
return fmt.Errorf("no fount client ID: %s", clientID)
}
dataByte, err := json.Marshal(data)
if err != nil {
return err
}
client := v.(*model.WSClient)
client.MsgChan <- dataByte
return nil
}
// ByGroupID 给订阅组的用户发送消息
func (s *WSSendImpl) ByGroupID(groupID string, data any) error {
uids, ok := WsGroup.Load(groupID)
if !ok {
return fmt.Errorf("no fount Group ID: %s", groupID)
}
groupUids := uids.(*[]string)
// 群组中没有成员
if len(*groupUids) == 0 {
return fmt.Errorf("no members in the group")
}
// 在群组中找到对应的 uid
for _, uid := range *groupUids {
clientIds, ok := WsUsers.Load(uid)
if !ok {
continue
}
// 在用户中找到客户端并发送
uidClientIds := clientIds.(*[]string)
for _, clientId := range *uidClientIds {
err := s.ByClientID(clientId, data)
if err != nil {
continue
}
}
}
return nil
}