2023-09-07 07:54:24 +00:00
|
|
|
|
package logic
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
//用户标识的连接(白板用户不存)
|
|
|
|
|
mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id (val是个普通map,存储这个用户的所有连接标识)
|
|
|
|
|
//用户标识的连接增删操作队列
|
|
|
|
|
userConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 500)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// 添加用户索引池ws连接
|
|
|
|
|
func createUserConnPoolElement(userId, guestId int64, uniqueId string) {
|
2023-09-07 08:03:56 +00:00
|
|
|
|
if userId == 0 && guestId == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
2023-09-07 07:54:24 +00:00
|
|
|
|
data := userConnPoolCtlChanItem{
|
2023-10-07 08:10:51 +00:00
|
|
|
|
userId: userId,
|
|
|
|
|
guestId: guestId,
|
|
|
|
|
uniqueId: uniqueId,
|
|
|
|
|
message: nil,
|
|
|
|
|
option: 1,
|
2023-09-07 07:54:24 +00:00
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case userConnPoolCtlChan <- data:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 从用户索引池删除ws连接
|
|
|
|
|
func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) {
|
2023-09-07 08:03:56 +00:00
|
|
|
|
if userId == 0 && guestId == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
2023-09-07 07:54:24 +00:00
|
|
|
|
data := userConnPoolCtlChanItem{
|
2023-10-07 08:10:51 +00:00
|
|
|
|
userId: userId,
|
|
|
|
|
guestId: guestId,
|
|
|
|
|
uniqueId: uniqueId,
|
|
|
|
|
message: nil,
|
|
|
|
|
option: 0,
|
2023-09-07 07:54:24 +00:00
|
|
|
|
}
|
2023-10-07 08:10:51 +00:00
|
|
|
|
userConnPoolCtlChan <- data
|
2023-09-07 07:54:24 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 根据用户索引发现链接并发送(广播)消息到出口队列
|
|
|
|
|
func sendToOutChanByUserIndex(userId, guestId int64, message []byte) {
|
|
|
|
|
data := userConnPoolCtlChanItem{
|
|
|
|
|
userId: userId,
|
|
|
|
|
guestId: guestId,
|
|
|
|
|
uniqueId: "",
|
|
|
|
|
message: message,
|
|
|
|
|
option: 2,
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case userConnPoolCtlChan <- data:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 消费用户索引创建/删除/发送消息中的任务数据
|
|
|
|
|
func ConsumeUserConnPoolCtlChanData(ctx context.Context) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
|
logx.Error("ConsumeUserConnPoolCtlChanData panic:", err)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
go func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
panic("ConsumeUserConnPoolCtlChanData ctx deadline")
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
var (
|
|
|
|
|
data userConnPoolCtlChanItem
|
|
|
|
|
userKey string
|
|
|
|
|
)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case data = <-userConnPoolCtlChan:
|
|
|
|
|
userKey = getmapUserConnPoolUniqueId(data.userId, data.guestId)
|
|
|
|
|
switch data.option {
|
|
|
|
|
case 2: //发送消息
|
2023-10-07 08:10:51 +00:00
|
|
|
|
//logx.Info("通过用户id索引发送消息")
|
2023-09-07 07:54:24 +00:00
|
|
|
|
mapUserUniqueId, ok := mapUserConnPool[userKey]
|
|
|
|
|
if !ok {
|
|
|
|
|
logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
for uniqueId, _ := range mapUserUniqueId {
|
|
|
|
|
//根据uniqueId查询原始池中连接
|
|
|
|
|
mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
|
|
|
|
|
if !ok {
|
|
|
|
|
logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey, " 原始uniqueId:", uniqueId)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
originConn, ok := mapConnPoolVal.(wsConnectItem)
|
|
|
|
|
if !ok {
|
|
|
|
|
logx.Error("通过用户id索引发送消息,断言原始连接失败,用户索引key:", userKey, " 原始uniqueId:", uniqueId)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
originConn.sendToOutChan(data.message)
|
|
|
|
|
}
|
|
|
|
|
case 1: //添加
|
2023-10-07 08:10:51 +00:00
|
|
|
|
//logx.Info("添加用户id索引标识:", data.uniqueId)
|
2023-09-07 07:54:24 +00:00
|
|
|
|
//存在这个用户的map池子
|
|
|
|
|
if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok {
|
|
|
|
|
mapUserUniqueId[data.uniqueId] = struct{}{}
|
|
|
|
|
} else {
|
|
|
|
|
mapUserConnPool[userKey] = make(map[string]struct{})
|
|
|
|
|
mapUserConnPool[userKey][data.uniqueId] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
case 0: //删除
|
2023-10-07 08:10:51 +00:00
|
|
|
|
//logx.Info("删除用户id索引标识:", data.uniqueId)
|
2023-09-07 07:54:24 +00:00
|
|
|
|
if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok {
|
|
|
|
|
delete(mapUserUniqueId, data.uniqueId)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 获取mapUserConnPool唯一id
|
|
|
|
|
func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) {
|
|
|
|
|
if userId > 0 {
|
|
|
|
|
guestId = 0
|
|
|
|
|
}
|
|
|
|
|
return fmt.Sprintf("%d_%d", userId, guestId)
|
|
|
|
|
}
|