From 030d9f8ab368637a2c56f45bb903a107c9a19205 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 14:26:55 +0800 Subject: [PATCH 1/7] fix --- .../internal/logic/commonnotifylogic.go | 124 ++++++++++-------- .../internal/logic/datatransferlogic.go | 84 ++++++------ server/websocket/websocket.go | 9 ++ 3 files changed, 118 insertions(+), 99 deletions(-) diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 72abe5b6..5dba13b1 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -5,7 +5,6 @@ import ( "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" - "sync" "time" "fusenapi/server/websocket/internal/svc" @@ -37,7 +36,7 @@ type commonConnectionNotFoundDataCacheChanItem struct { } // 放入缓冲队列 -func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { +func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { select { case commonConnectionNotFoundDataCacheChan <- data: return @@ -47,43 +46,48 @@ func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundD } // 取出元素 -func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { +func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { return <-commonConnectionNotFoundDataCacheChan } -// 保证处理消息就一个循环在执行 -var consumeCommonCacheData sync.Once - // 消费公共通知未处理的消息(目前是轮巡方式,待优化) -func (l *CommonNotifyLogic) consumeCommonCacheData() { - //单例 - consumeCommonCacheData.Do(func() { - for { - time.Sleep(time.Millisecond * 200) - info := l.popCommonNotifyCache() - //查询websocket连接 - value, ok := mapConnPool.Load(info.data.Wid) - //没有连接 - if !ok { - info.retryTimes-- - //大于0,则放回队列 - if info.retryTimes > 0 { - l.pushCommonNotifyCache(info) - continue - } - //否则直接丢弃消息 - continue - } - //断言连接 - ws, ok := value.(wsConnectItem) - if !ok { - logx.Error("渲染回调断言websocket连接失败") - continue - } - //发送 - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) +func ConsumeCommonCacheData(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + logx.Error("consumeCommonCacheData panic :", err) } - }) + }() + go func() { + select { + case <-ctx.Done(): + panic("consumeCommonCacheData ctx deadline") + } + }() + for { + time.Sleep(time.Millisecond * 200) + info := popCommonNotifyCache() + //查询websocket连接 + value, ok := mapConnPool.Load(info.data.Wid) + //没有连接 + if !ok { + info.retryTimes-- + //大于0,则放回队列 + if info.retryTimes > 0 { + pushCommonNotifyCache(info) + continue + } + //否则直接丢弃消息 + continue + } + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + continue + } + //发送 + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) + } } // 处理进入前逻辑w,r @@ -91,32 +95,38 @@ func (l *CommonNotifyLogic) consumeCommonCacheData() { // } func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { - //websocket连接id不能为空 + searchConnectType := "uniqueId" if req.Wid == "" { - return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") + if !userinfo.IsUser() && !userinfo.IsGuest() { + return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") + } + searchConnectType = "userInfo" } - //触发消费公共未处理的消息(该方法是单例) - go l.consumeCommonCacheData() - //查询websocket连接 - value, ok := mapConnPool.Load(req.Wid) - if !ok { - //没找到连接就放到公共缓冲队列 - go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ - retryTimes: 20, //重试20次 - data: types.CommonNotifyReq{ - Wid: req.Wid, - Data: req.Data, - }, - }) - return resp.SetStatusWithMessage(basic.CodeOK, "success") + switch searchConnectType { + case "uniqueId": //直接通过唯一标识发消息 + //查询websocket连接 + value, ok := mapConnPool.Load(req.Wid) + if !ok { + //没找到连接就放到公共缓冲队列 + pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ + retryTimes: 20, //重试20次 + data: types.CommonNotifyReq{ + Wid: req.Wid, + Data: req.Data, + }, + }) + return resp.SetStatusWithMessage(basic.CodeOK, "success") + } + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") + } + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) + case "userInfo": //通过用户信息找连接发送 + sendToOutChanByUserIndex(userinfo.UserId, userinfo.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) } - //断言连接 - ws, ok := value.(wsConnectItem) - if !ok { - logx.Error("渲染回调断言websocket连接失败") - return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") - } - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) return resp.SetStatusWithMessage(basic.CodeOK, "success") } diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index d49dc38e..152eaef8 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -148,8 +148,6 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) conn.Close() return } - //消费用户索引控制chan的数据 - go consumeUserPoolData() //循环读客户端信息 go ws.acceptBrowserMessage() //消费出口数据并发送浏览器端 @@ -257,58 +255,60 @@ func sendToOutChanByUserIndex(userId, guestId int64, message []byte) { } } -// 消费用户索引池中的任务(单例) -var consumeUserPoolDataOnce sync.Once - -func consumeUserPoolData() { +// 消费用户索引创建/删除/发送消息中的任务数据 +func ConsumeUserPoolData(ctx context.Context) { defer func() { if err := recover(); err != nil { logx.Error("consumeUserPoolData panic:", err) } }() - consumeUserPoolDataOnce.Do(func() { - for { - select { - case data := <-mapUserConnPoolCtlChan: - key := getmapUserConnPoolUniqueId(data.userId, data.guestId) - switch data.option { - case 2: //发送消息 - logx.Info("通过用户id索引发送消息:", data.uniqueId) - mapUserUniqueId, ok := mapUserConnPool[key] + go func() { + select { + case <-ctx.Done(): + panic("ConsumeUserPoolData ctx deadline") + } + }() + for { + select { + case data := <-mapUserConnPoolCtlChan: + key := getmapUserConnPoolUniqueId(data.userId, data.guestId) + switch data.option { + case 2: //发送消息 + logx.Info("通过用户id索引发送消息:", data.uniqueId) + mapUserUniqueId, ok := mapUserConnPool[key] + if !ok { + continue + } + for _, uniqueId := range mapUserUniqueId { + //根据uniqueId查询原始池中连接 + mapConnPoolVal, ok := mapConnPool.Load(uniqueId) if !ok { continue } - for _, uniqueId := range mapUserUniqueId { - //根据uniqueId查询原始池中连接 - mapConnPoolVal, ok := mapConnPool.Load(uniqueId) - if !ok { - continue - } - originConn, ok := mapConnPoolVal.(wsConnectItem) - if !ok { - continue - } - originConn.sendToOutChan(data.message) + originConn, ok := mapConnPoolVal.(wsConnectItem) + if !ok { + continue } - case 1: //添加 - logx.Info("添加用户id索引标识:", data.uniqueId) - if mapUserUniqueId, ok := mapUserConnPool[key]; ok { - mapUserUniqueId[data.uniqueId] = struct{}{} - } else { - mapUserConnPool[key] = make(map[string]struct{}) - mapUserConnPool[key][data.uniqueId] = struct{}{} - } - case 0: //删除 - logx.Info("删除用户id索引标识:", data.uniqueId) - if mapUserUniqueId, ok := mapUserConnPool[key]; ok { - delete(mapUserUniqueId, data.uniqueId) - } - default: - + originConn.sendToOutChan(data.message) } + case 1: //添加 + logx.Info("添加用户id索引标识:", data.uniqueId) + if mapUserUniqueId, ok := mapUserConnPool[key]; ok { + mapUserUniqueId[data.uniqueId] = struct{}{} + } else { + mapUserConnPool[key] = make(map[string]struct{}) + mapUserConnPool[key][data.uniqueId] = struct{}{} + } + case 0: //删除 + logx.Info("删除用户id索引标识:", data.uniqueId) + if mapUserUniqueId, ok := mapUserConnPool[key]; ok { + delete(mapUserUniqueId, data.uniqueId) + } + default: + } } - }) + } } // 获取mapUserConnPool唯一id diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index b6c469b6..519b895d 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -1,8 +1,10 @@ package main import ( + "context" "flag" "fmt" + "fusenapi/server/websocket/internal/logic" "net/http" "fusenapi/utils/auth" @@ -28,6 +30,13 @@ func main() { ctx := svc.NewServiceContext(c) handler.RegisterHandlers(server, ctx) + ctx1 := context.Background() + ctx1, cancel := context.WithCancel(ctx1) + defer cancel() + //消费公共通知队列的数据 + go logic.ConsumeCommonCacheData(ctx1) + //消费用户索引创建/删除/发送消息中的任务数据 + go logic.ConsumeUserPoolData(ctx1) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() } From c518b5a68d0f301ff73cc1607fa1576d7d8fd6be Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 14:36:32 +0800 Subject: [PATCH 2/7] fix --- server/websocket/internal/logic/datatransferlogic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 152eaef8..167a425c 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -279,7 +279,7 @@ func ConsumeUserPoolData(ctx context.Context) { if !ok { continue } - for _, uniqueId := range mapUserUniqueId { + for uniqueId, _ := range mapUserUniqueId { //根据uniqueId查询原始池中连接 mapConnPoolVal, ok := mapConnPool.Load(uniqueId) if !ok { From d1b2b24fcf6e6b8e4495b5f888866820e516e903 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 14:44:35 +0800 Subject: [PATCH 3/7] fix --- server/websocket/internal/logic/ws_reuse_last_connect.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go index 1a8ea395..faaa062c 100644 --- a/server/websocket/internal/logic/ws_reuse_last_connect.go +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -61,8 +61,11 @@ func (r *reuseConnProcessor) allocationMessage(w *wsConnectItem, data []byte) { } //重新绑定 logx.Info("开始重新绑定websocket连接标识") + oldUniqueId := w.uniqueId w.uniqueId = wid mapConnPool.Store(wid, *w) + //删除用户id级别之前的索引 + deleteUserConnPoolElement(w.userId, w.guestId, oldUniqueId) //添加用户id级别索引 createUserConnPoolElement(w.userId, w.guestId, wid) rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) From f6e2edf3a854c545416d794daabf9dbbbaa214ba Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 14:46:25 +0800 Subject: [PATCH 4/7] fix --- server_api/websocket.api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server_api/websocket.api b/server_api/websocket.api index 97d01742..ad1cc925 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -29,6 +29,6 @@ type RenderNotifyReq { } //通用回调接口 type CommonNotifyReq { - Wid string `json:"wid"` //websocket连接标识 + Wid string `json:"wid"` //websocket连接标识,(如果传了token,则以token为主寻找连接) Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 } \ No newline at end of file From 1da29bb71520c0def0a59cf1344de2bae5ce3ffc Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 15:00:11 +0800 Subject: [PATCH 5/7] fix --- server/websocket/internal/logic/commonnotifylogic.go | 4 ++-- server/websocket/internal/types/types.go | 6 ++++-- server_api/websocket.api | 6 ++++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 5dba13b1..82c0f05a 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -97,8 +97,8 @@ func ConsumeCommonCacheData(ctx context.Context) { func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { searchConnectType := "uniqueId" if req.Wid == "" { - if !userinfo.IsUser() && !userinfo.IsGuest() { - return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") + if req.UserId == 0 && req.GuestId == 0 { + return resp.SetStatusWithMessage(basic.CodeOK, "用户信息或者连接标识必须保证至少有其中一个") } searchConnectType = "userInfo" } diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 32263435..f56eb527 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -13,8 +13,10 @@ type RenderNotifyReq struct { } type CommonNotifyReq struct { - Wid string `json:"wid"` //websocket连接标识 - Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 + Wid string `json:"wid,optional"` //websocket连接标识 + UserId int64 `json:"user_id,optional"` //用户id + GuestId int64 `json:"guest_id,optional"` //游客id + Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 } type Request struct { diff --git a/server_api/websocket.api b/server_api/websocket.api index ad1cc925..34e02ad4 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -29,6 +29,8 @@ type RenderNotifyReq { } //通用回调接口 type CommonNotifyReq { - Wid string `json:"wid"` //websocket连接标识,(如果传了token,则以token为主寻找连接) - Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 + Wid string `json:"wid,optional"` //websocket连接标识 + UserId int64 `json:"user_id,optional"` //用户id + GuestId int64 `json:"guest_id,optional"` //游客id + Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 } \ No newline at end of file From 4c62b941a0cf9f9a9d081954e007d6b155c6d972 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 15:06:07 +0800 Subject: [PATCH 6/7] fix --- server/websocket/internal/logic/commonnotifylogic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 82c0f05a..23f09959 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -125,7 +125,7 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a } ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) case "userInfo": //通过用户信息找连接发送 - sendToOutChanByUserIndex(userinfo.UserId, userinfo.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) + sendToOutChanByUserIndex(req.UserId, req.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) } return resp.SetStatusWithMessage(basic.CodeOK, "success") } From 5b88e9730787a50b353b2fa9cbaeafb178ef81fa Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 15:17:30 +0800 Subject: [PATCH 7/7] fix --- server/websocket/internal/logic/commonnotifylogic.go | 8 ++++---- server/websocket/internal/logic/datatransferlogic.go | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 23f09959..c42af9eb 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -95,15 +95,15 @@ func ConsumeCommonCacheData(ctx context.Context) { // } func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { - searchConnectType := "uniqueId" + searchConnectType := 1 if req.Wid == "" { if req.UserId == 0 && req.GuestId == 0 { return resp.SetStatusWithMessage(basic.CodeOK, "用户信息或者连接标识必须保证至少有其中一个") } - searchConnectType = "userInfo" + searchConnectType = 2 } switch searchConnectType { - case "uniqueId": //直接通过唯一标识发消息 + case 1: //直接通过唯一标识发消息 //查询websocket连接 value, ok := mapConnPool.Load(req.Wid) if !ok { @@ -124,7 +124,7 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") } ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) - case "userInfo": //通过用户信息找连接发送 + case 2: //通过用户信息找连接发送 sendToOutChanByUserIndex(req.UserId, req.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) } return resp.SetStatusWithMessage(basic.CodeOK, "success") diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 167a425c..7a674a1f 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -274,19 +274,22 @@ func ConsumeUserPoolData(ctx context.Context) { key := getmapUserConnPoolUniqueId(data.userId, data.guestId) switch data.option { case 2: //发送消息 - logx.Info("通过用户id索引发送消息:", data.uniqueId) + logx.Info("通过用户id索引发送消息") mapUserUniqueId, ok := mapUserConnPool[key] if !ok { + logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", key) continue } for uniqueId, _ := range mapUserUniqueId { //根据uniqueId查询原始池中连接 mapConnPoolVal, ok := mapConnPool.Load(uniqueId) if !ok { + logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", key, " 原始uniqueId:", uniqueId) continue } originConn, ok := mapConnPoolVal.(wsConnectItem) if !ok { + logx.Error("通过用户id索引发送消息,断言原始连接失败,用户索引key:", key, " 原始uniqueId:", uniqueId) continue } originConn.sendToOutChan(data.message)