diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 72abe5b6..c42af9eb 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -5,7 +5,6 @@ import ( "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" - "sync" "time" "fusenapi/server/websocket/internal/svc" @@ -37,7 +36,7 @@ type commonConnectionNotFoundDataCacheChanItem struct { } // 放入缓冲队列 -func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { +func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { select { case commonConnectionNotFoundDataCacheChan <- data: return @@ -47,43 +46,48 @@ func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundD } // 取出元素 -func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { +func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { return <-commonConnectionNotFoundDataCacheChan } -// 保证处理消息就一个循环在执行 -var consumeCommonCacheData sync.Once - // 消费公共通知未处理的消息(目前是轮巡方式,待优化) -func (l *CommonNotifyLogic) consumeCommonCacheData() { - //单例 - consumeCommonCacheData.Do(func() { - for { - time.Sleep(time.Millisecond * 200) - info := l.popCommonNotifyCache() - //查询websocket连接 - value, ok := mapConnPool.Load(info.data.Wid) - //没有连接 - if !ok { - info.retryTimes-- - //大于0,则放回队列 - if info.retryTimes > 0 { - l.pushCommonNotifyCache(info) - continue - } - //否则直接丢弃消息 - continue - } - //断言连接 - ws, ok := value.(wsConnectItem) - if !ok { - logx.Error("渲染回调断言websocket连接失败") - continue - } - //发送 - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) +func ConsumeCommonCacheData(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + logx.Error("consumeCommonCacheData panic :", err) } - }) + }() + go func() { + select { + case <-ctx.Done(): + panic("consumeCommonCacheData ctx deadline") + } + }() + for { + time.Sleep(time.Millisecond * 200) + info := popCommonNotifyCache() + //查询websocket连接 + value, ok := mapConnPool.Load(info.data.Wid) + //没有连接 + if !ok { + info.retryTimes-- + //大于0,则放回队列 + if info.retryTimes > 0 { + pushCommonNotifyCache(info) + continue + } + //否则直接丢弃消息 + continue + } + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + continue + } + //发送 + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) + } } // 处理进入前逻辑w,r @@ -91,32 +95,38 @@ func (l *CommonNotifyLogic) consumeCommonCacheData() { // } func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { - //websocket连接id不能为空 + searchConnectType := 1 if req.Wid == "" { - return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") + if req.UserId == 0 && req.GuestId == 0 { + return resp.SetStatusWithMessage(basic.CodeOK, "用户信息或者连接标识必须保证至少有其中一个") + } + searchConnectType = 2 } - //触发消费公共未处理的消息(该方法是单例) - go l.consumeCommonCacheData() - //查询websocket连接 - value, ok := mapConnPool.Load(req.Wid) - if !ok { - //没找到连接就放到公共缓冲队列 - go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ - retryTimes: 20, //重试20次 - data: types.CommonNotifyReq{ - Wid: req.Wid, - Data: req.Data, - }, - }) - return resp.SetStatusWithMessage(basic.CodeOK, "success") + switch searchConnectType { + case 1: //直接通过唯一标识发消息 + //查询websocket连接 + value, ok := mapConnPool.Load(req.Wid) + if !ok { + //没找到连接就放到公共缓冲队列 + pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ + retryTimes: 20, //重试20次 + data: types.CommonNotifyReq{ + Wid: req.Wid, + Data: req.Data, + }, + }) + return resp.SetStatusWithMessage(basic.CodeOK, "success") + } + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") + } + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) + case 2: //通过用户信息找连接发送 + sendToOutChanByUserIndex(req.UserId, req.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) } - //断言连接 - ws, ok := value.(wsConnectItem) - if !ok { - logx.Error("渲染回调断言websocket连接失败") - return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") - } - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) return resp.SetStatusWithMessage(basic.CodeOK, "success") } diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index d49dc38e..7a674a1f 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -148,8 +148,6 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) conn.Close() return } - //消费用户索引控制chan的数据 - go consumeUserPoolData() //循环读客户端信息 go ws.acceptBrowserMessage() //消费出口数据并发送浏览器端 @@ -257,58 +255,63 @@ func sendToOutChanByUserIndex(userId, guestId int64, message []byte) { } } -// 消费用户索引池中的任务(单例) -var consumeUserPoolDataOnce sync.Once - -func consumeUserPoolData() { +// 消费用户索引创建/删除/发送消息中的任务数据 +func ConsumeUserPoolData(ctx context.Context) { defer func() { if err := recover(); err != nil { logx.Error("consumeUserPoolData panic:", err) } }() - consumeUserPoolDataOnce.Do(func() { - for { - select { - case data := <-mapUserConnPoolCtlChan: - key := getmapUserConnPoolUniqueId(data.userId, data.guestId) - switch data.option { - case 2: //发送消息 - logx.Info("通过用户id索引发送消息:", data.uniqueId) - mapUserUniqueId, ok := mapUserConnPool[key] + go func() { + select { + case <-ctx.Done(): + panic("ConsumeUserPoolData ctx deadline") + } + }() + for { + select { + case data := <-mapUserConnPoolCtlChan: + key := getmapUserConnPoolUniqueId(data.userId, data.guestId) + switch data.option { + case 2: //发送消息 + logx.Info("通过用户id索引发送消息") + mapUserUniqueId, ok := mapUserConnPool[key] + if !ok { + logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", key) + continue + } + for uniqueId, _ := range mapUserUniqueId { + //根据uniqueId查询原始池中连接 + mapConnPoolVal, ok := mapConnPool.Load(uniqueId) if !ok { + logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", key, " 原始uniqueId:", uniqueId) continue } - for _, uniqueId := range mapUserUniqueId { - //根据uniqueId查询原始池中连接 - mapConnPoolVal, ok := mapConnPool.Load(uniqueId) - if !ok { - continue - } - originConn, ok := mapConnPoolVal.(wsConnectItem) - if !ok { - continue - } - originConn.sendToOutChan(data.message) + originConn, ok := mapConnPoolVal.(wsConnectItem) + if !ok { + logx.Error("通过用户id索引发送消息,断言原始连接失败,用户索引key:", key, " 原始uniqueId:", uniqueId) + continue } - case 1: //添加 - logx.Info("添加用户id索引标识:", data.uniqueId) - if mapUserUniqueId, ok := mapUserConnPool[key]; ok { - mapUserUniqueId[data.uniqueId] = struct{}{} - } else { - mapUserConnPool[key] = make(map[string]struct{}) - mapUserConnPool[key][data.uniqueId] = struct{}{} - } - case 0: //删除 - logx.Info("删除用户id索引标识:", data.uniqueId) - if mapUserUniqueId, ok := mapUserConnPool[key]; ok { - delete(mapUserUniqueId, data.uniqueId) - } - default: - + originConn.sendToOutChan(data.message) } + case 1: //添加 + logx.Info("添加用户id索引标识:", data.uniqueId) + if mapUserUniqueId, ok := mapUserConnPool[key]; ok { + mapUserUniqueId[data.uniqueId] = struct{}{} + } else { + mapUserConnPool[key] = make(map[string]struct{}) + mapUserConnPool[key][data.uniqueId] = struct{}{} + } + case 0: //删除 + logx.Info("删除用户id索引标识:", data.uniqueId) + if mapUserUniqueId, ok := mapUserConnPool[key]; ok { + delete(mapUserUniqueId, data.uniqueId) + } + default: + } } - }) + } } // 获取mapUserConnPool唯一id diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go index 1a8ea395..faaa062c 100644 --- a/server/websocket/internal/logic/ws_reuse_last_connect.go +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -61,8 +61,11 @@ func (r *reuseConnProcessor) allocationMessage(w *wsConnectItem, data []byte) { } //重新绑定 logx.Info("开始重新绑定websocket连接标识") + oldUniqueId := w.uniqueId w.uniqueId = wid mapConnPool.Store(wid, *w) + //删除用户id级别之前的索引 + deleteUserConnPoolElement(w.userId, w.guestId, oldUniqueId) //添加用户id级别索引 createUserConnPoolElement(w.userId, w.guestId, wid) rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 32263435..f56eb527 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -13,8 +13,10 @@ type RenderNotifyReq struct { } type CommonNotifyReq struct { - Wid string `json:"wid"` //websocket连接标识 - Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 + Wid string `json:"wid,optional"` //websocket连接标识 + UserId int64 `json:"user_id,optional"` //用户id + GuestId int64 `json:"guest_id,optional"` //游客id + Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 } type Request struct { diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index b6c469b6..519b895d 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -1,8 +1,10 @@ package main import ( + "context" "flag" "fmt" + "fusenapi/server/websocket/internal/logic" "net/http" "fusenapi/utils/auth" @@ -28,6 +30,13 @@ func main() { ctx := svc.NewServiceContext(c) handler.RegisterHandlers(server, ctx) + ctx1 := context.Background() + ctx1, cancel := context.WithCancel(ctx1) + defer cancel() + //消费公共通知队列的数据 + go logic.ConsumeCommonCacheData(ctx1) + //消费用户索引创建/删除/发送消息中的任务数据 + go logic.ConsumeUserPoolData(ctx1) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() } diff --git a/server_api/websocket.api b/server_api/websocket.api index 97d01742..34e02ad4 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -29,6 +29,8 @@ type RenderNotifyReq { } //通用回调接口 type CommonNotifyReq { - Wid string `json:"wid"` //websocket连接标识 - Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 + Wid string `json:"wid,optional"` //websocket连接标识 + UserId int64 `json:"user_id,optional"` //用户id + GuestId int64 `json:"guest_id,optional"` //游客id + Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 } \ No newline at end of file