fix: ws 连接write释放goroutune

This commit is contained in:
TsMask
2024-08-06 15:06:57 +08:00
parent aefc185199
commit 2c1f8c75fa
2 changed files with 22 additions and 17 deletions

View File

@@ -15,11 +15,11 @@ import (
var ( var (
// ws客户端 [clientId: client] // ws客户端 [clientId: client]
WsClients sync.Map wsClients sync.Map
// ws用户对应的多个客户端id [uid:clientIds] // ws用户对应的多个客户端id [uid:clientIds]
WsUsers sync.Map wsUsers sync.Map
// ws组对应的多个用户id [groupID:uids] // ws组对应的多个用户id [groupID:uids]
WsGroup sync.Map wsGroup sync.Map
) )
// 实例化服务层 WSImpl 结构体 // 实例化服务层 WSImpl 结构体
@@ -72,22 +72,22 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn,
} }
// 存入客户端 // 存入客户端
WsClients.Store(clientID, wsClient) wsClients.Store(clientID, wsClient)
// 存入用户持有客户端 // 存入用户持有客户端
if uid != "" { if uid != "" {
if v, ok := WsUsers.Load(uid); ok { if v, ok := wsUsers.Load(uid); ok {
uidClientIds := v.(*[]string) uidClientIds := v.(*[]string)
*uidClientIds = append(*uidClientIds, clientID) *uidClientIds = append(*uidClientIds, clientID)
} else { } else {
WsUsers.Store(uid, &[]string{clientID}) wsUsers.Store(uid, &[]string{clientID})
} }
} }
// 存入用户订阅组 // 存入用户订阅组
if uid != "" && len(groupIDs) > 0 { if uid != "" && len(groupIDs) > 0 {
for _, groupID := range groupIDs { for _, groupID := range groupIDs {
if v, ok := WsGroup.Load(groupID); ok { if v, ok := wsGroup.Load(groupID); ok {
groupUIDs := v.(*[]string) groupUIDs := v.(*[]string)
// 避免同组内相同用户 // 避免同组内相同用户
hasUid := false hasUid := false
@@ -101,7 +101,7 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn,
*groupUIDs = append(*groupUIDs, uid) *groupUIDs = append(*groupUIDs, uid)
} }
} else { } else {
WsGroup.Store(groupID, &[]string{uid}) wsGroup.Store(groupID, &[]string{uid})
} }
} }
} }
@@ -158,6 +158,11 @@ func (s *WSImpl) clientWrite(wsClient *model.WSClient) {
} }
}() }()
for msg := range wsClient.MsgChan { for msg := range wsClient.MsgChan {
// 关闭句柄
if string(msg) == "ws:close" {
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 发送消息 // 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg) err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil { if err != nil {
@@ -171,22 +176,22 @@ func (s *WSImpl) clientWrite(wsClient *model.WSClient) {
// CloseClient 客户端关闭 // CloseClient 客户端关闭
func (s *WSImpl) CloseClient(clientID string) { func (s *WSImpl) CloseClient(clientID string) {
v, ok := WsClients.Load(clientID) v, ok := wsClients.Load(clientID)
if !ok { if !ok {
return return
} }
client := v.(*model.WSClient) client := v.(*model.WSClient)
defer func() { defer func() {
client.Conn.WriteMessage(websocket.CloseMessage, []byte{}) client.MsgChan <- []byte("ws:close")
client.Conn.Close()
WsClients.Delete(clientID)
client.StopChan <- struct{}{} client.StopChan <- struct{}{}
client.Conn.Close()
wsClients.Delete(clientID)
}() }()
// 客户端断线时自动踢出Uid绑定列表 // 客户端断线时自动踢出Uid绑定列表
if client.BindUid != "" { if client.BindUid != "" {
if clientIds, ok := WsUsers.Load(client.BindUid); ok { if clientIds, ok := wsUsers.Load(client.BindUid); ok {
uidClientIds := clientIds.(*[]string) uidClientIds := clientIds.(*[]string)
if len(*uidClientIds) > 0 { if len(*uidClientIds) > 0 {
tempClientIds := make([]string, 0, len(*uidClientIds)) tempClientIds := make([]string, 0, len(*uidClientIds))
@@ -203,7 +208,7 @@ func (s *WSImpl) CloseClient(clientID string) {
// 客户端断线时自动踢出已加入的组 // 客户端断线时自动踢出已加入的组
if client.BindUid != "" && len(client.SubGroup) > 0 { if client.BindUid != "" && len(client.SubGroup) > 0 {
for _, groupID := range client.SubGroup { for _, groupID := range client.SubGroup {
uids, ok := WsGroup.Load(groupID) uids, ok := wsGroup.Load(groupID)
if !ok { if !ok {
continue continue
} }

View File

@@ -34,7 +34,7 @@ type WSSendImpl struct{}
// ByClientID 给已知客户端发消息 // ByClientID 给已知客户端发消息
func (s *WSSendImpl) ByClientID(clientID string, data any) error { func (s *WSSendImpl) ByClientID(clientID string, data any) error {
v, ok := WsClients.Load(clientID) v, ok := wsClients.Load(clientID)
if !ok { if !ok {
return fmt.Errorf("no fount client ID: %s", clientID) return fmt.Errorf("no fount client ID: %s", clientID)
} }
@@ -55,7 +55,7 @@ func (s *WSSendImpl) ByClientID(clientID string, data any) error {
// ByGroupID 给订阅组的用户发送消息 // ByGroupID 给订阅组的用户发送消息
func (s *WSSendImpl) ByGroupID(groupID string, data any) error { func (s *WSSendImpl) ByGroupID(groupID string, data any) error {
uids, ok := WsGroup.Load(groupID) uids, ok := wsGroup.Load(groupID)
if !ok { if !ok {
return fmt.Errorf("no fount Group ID: %s", groupID) return fmt.Errorf("no fount Group ID: %s", groupID)
} }
@@ -68,7 +68,7 @@ func (s *WSSendImpl) ByGroupID(groupID string, data any) error {
// 在群组中找到对应的 uid // 在群组中找到对应的 uid
for _, uid := range *groupUids { for _, uid := range *groupUids {
clientIds, ok := WsUsers.Load(uid) clientIds, ok := wsUsers.Load(uid)
if !ok { if !ok {
continue continue
} }