package logic import ( "context" "fmt" "github.com/zeromicro/go-zero/core/logx" "time" ) var ( //用户标识的连接(白板用户不存) mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id (val是个普通map,存储这个用户的所有连接标识) //用户标识的连接增删操作队列 userConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 500) ) // 添加用户索引池ws连接 func createUserConnPoolElement(userId, guestId int64, uniqueId string) { if userId == 0 && guestId == 0 { return } data := userConnPoolCtlChanItem{ userId: userId, guestId: guestId, uniqueId: uniqueId, message: nil, messageType: "", option: 1, } select { case userConnPoolCtlChan <- data: return case <-time.After(time.Millisecond * 200): return } } // 从用户索引池删除ws连接 func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) { if userId == 0 && guestId == 0 { return } data := userConnPoolCtlChanItem{ userId: userId, guestId: guestId, uniqueId: uniqueId, message: nil, messageType: "", option: 0, } select { case userConnPoolCtlChan <- data: return case <-time.After(time.Millisecond * 200): return } } // 根据用户索引发现链接并发送(广播)消息到出口队列 func sendToOutChanByUserIndex(userId, guestId int64, message []byte) { data := userConnPoolCtlChanItem{ userId: userId, guestId: guestId, uniqueId: "", message: message, option: 2, } select { case userConnPoolCtlChan <- data: return case <-time.After(time.Millisecond * 200): return } } // 消费用户索引创建/删除/发送消息中的任务数据 func ConsumeUserConnPoolCtlChanData(ctx context.Context) { defer func() { if err := recover(); err != nil { logx.Error("ConsumeUserConnPoolCtlChanData panic:", err) } }() go func() { select { case <-ctx.Done(): panic("ConsumeUserConnPoolCtlChanData ctx deadline") } }() var ( data userConnPoolCtlChanItem userKey string ) for { select { case data = <-userConnPoolCtlChan: userKey = getmapUserConnPoolUniqueId(data.userId, data.guestId) switch data.option { case 2: //发送消息 logx.Info("通过用户id索引发送消息") mapUserUniqueId, ok := mapUserConnPool[userKey] if !ok { logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey) continue } for uniqueId, _ := range mapUserUniqueId { //根据uniqueId查询原始池中连接 mapConnPoolVal, ok := mapConnPool.Load(uniqueId) if !ok { logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey, " 原始uniqueId:", uniqueId) continue } originConn, ok := mapConnPoolVal.(wsConnectItem) if !ok { logx.Error("通过用户id索引发送消息,断言原始连接失败,用户索引key:", userKey, " 原始uniqueId:", uniqueId) continue } originConn.sendToOutChan(data.message) } case 1: //添加 logx.Info("添加用户id索引标识:", data.uniqueId) //存在这个用户的map池子 if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { mapUserUniqueId[data.uniqueId] = struct{}{} } else { mapUserConnPool[userKey] = make(map[string]struct{}) mapUserConnPool[userKey][data.uniqueId] = struct{}{} } case 0: //删除 logx.Info("删除用户id索引标识:", data.uniqueId) if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { delete(mapUserUniqueId, data.uniqueId) } } } } } // 获取mapUserConnPool唯一id func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) { if userId > 0 { guestId = 0 } return fmt.Sprintf("%d_%d", userId, guestId) }