package logic import ( "context" "fmt" "github.com/zeromicro/go-zero/core/logx" ) var ( //用户标识的连接(白板用户不存) mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id (val是个普通map,存储这个用户的所有连接标识) //用户标识的连接增删操作队列 userConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 500) ) // 用户标识的连接增删操作队列传输的值的结构 type userConnPoolCtlChanItem struct { userId int64 //必须(两个用户id任意一个不为0) guestId int64 //必须(两个用户id任意一个不为0) uniqueId string //主连接池唯一标识(添加/删除时候必须) message []byte //消息(发送消息传的,格式是经过标准输出序列化后的数据) option int64 //操作 2发消息 1增加 0删除 } // 添加用户索引池ws连接 func createUserConnPoolElement(userId, guestId int64, uniqueId string) { if userId == 0 && guestId == 0 { return } data := userConnPoolCtlChanItem{ userId: userId, guestId: guestId, uniqueId: uniqueId, message: nil, option: 1, } select { case userConnPoolCtlChan <- data: return } } // 从用户索引池删除ws连接 func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) { if userId == 0 && guestId == 0 { return } data := userConnPoolCtlChanItem{ userId: userId, guestId: guestId, uniqueId: uniqueId, message: nil, option: 0, } userConnPoolCtlChan <- data } // 消费用户索引创建/删除/发送消息中的任务数据 func ConsumeUserConnPoolCtlChanData(ctx context.Context) { defer func() { if err := recover(); err != nil { logx.Error("ConsumeUserConnPoolCtlChanData panic:", err) } }() go func() { select { case <-ctx.Done(): panic("ConsumeUserConnPoolCtlChanData ctx deadline") } }() var ( data userConnPoolCtlChanItem userKey string ) for { select { case data = <-userConnPoolCtlChan: userKey = getmapUserConnPoolUniqueId(data.userId, data.guestId) switch data.option { case 2: //发送消息 //logx.Info("通过用户id索引发送消息") mapUserUniqueId, ok := mapUserConnPool[userKey] if !ok { logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey) continue } for uniqueId, _ := range mapUserUniqueId { //根据uniqueId查询原始池中连接 mapConnPoolVal, ok := mapConnPool.Load(uniqueId) if !ok { logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey, " 原始uniqueId:", uniqueId) continue } originConn, ok := mapConnPoolVal.(wsConnectItem) if !ok { logx.Error("通过用户id索引发送消息,断言原始连接失败,用户索引key:", userKey, " 原始uniqueId:", uniqueId) continue } originConn.sendToOutChan(data.message) } case 1: //添加 //logx.Info("添加用户id索引标识:", data.uniqueId) //存在这个用户的map池子 if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { mapUserUniqueId[data.uniqueId] = struct{}{} } else { mapUserConnPool[userKey] = make(map[string]struct{}) mapUserConnPool[userKey][data.uniqueId] = struct{}{} } case 0: //删除 //logx.Info("删除用户id索引标识:", data.uniqueId) if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { delete(mapUserUniqueId, data.uniqueId) } } } } } // 获取mapUserConnPool唯一id func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) { if userId > 0 { guestId = 0 } return fmt.Sprintf("%d_%d", userId, guestId) } // 根据用户索引发现链接并发送(广播)消息到出口队列 func sendToOutChanByUserIndex(userId, guestId int64, message []byte) { data := userConnPoolCtlChanItem{ userId: userId, guestId: guestId, uniqueId: "", message: message, option: 2, } select { case userConnPoolCtlChan <- data: return } }