From 826ba92b8e280f522d891693e7f12650d6766bdc Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 25 Aug 2023 14:49:24 +0800 Subject: [PATCH] fix --- .../internal/logic/commonnotifylogic.go | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 7d13abac..65fd1ffc 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -5,6 +5,8 @@ import ( "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" + "sync" + "time" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" @@ -26,6 +28,68 @@ func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm } } +// 定义公共回调未找到websocket连接时暂存数据缓冲队列 +var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000) + +type commonConnectionNotFoundDataCacheChanItem struct { + retryTimes int //重回队列次数 + data types.CommonNotifyReq //数据 +} + +// 放入缓冲队列 +func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCacheChanItem) { + select { + case commonConnectionNotFoundDataCacheChan <- data: + return + case <-time.After(time.Second): //超1秒就丢弃 + return + } +} + +// 取出元素 +func (l *CommonNotifyLogic) popCommonCache() (data commonConnectionNotFoundDataCacheChanItem) { + return <-commonConnectionNotFoundDataCacheChan +} + +// 保证处理消息就一个循环在执行 +var consumeCommonCacheData sync.Once + +// 消费公共通知未处理的消息 +func (l *CommonNotifyLogic) consumeCommonCacheData() { + //单例 + consumeCommonCacheData.Do(func() { + tick := time.Tick(time.Millisecond * 200) + for { + select { + case <-tick: //200毫秒触发一次 + info := l.popCommonCache() + //查询websocket连接 + value, ok := mapConnPool.Load(info.data.Wid) + //没有连接 + if !ok { + info.retryTimes-- + //大于0,则放回队列 + if info.retryTimes > 0 { + l.pushCommonCache(info) + continue + } + //否则直接丢弃消息 + continue + } + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + continue + } + //发送 + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) + } + + } + }) +} + // 处理进入前逻辑w,r // func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { // } @@ -35,10 +99,20 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a if req.Wid == "" { return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") } + //触发消费公共未处理的消息(只有第一次有用) + l.consumeCommonCacheData() //查询websocket连接 value, ok := mapConnPool.Load(req.Wid) if !ok { - return resp.SetStatusWithMessage(basic.CodeOK, "success,but connection is not found") + //没找到连接就放到公共缓冲队列 + l.pushCommonCache(commonConnectionNotFoundDataCacheChanItem{ + retryTimes: 4, //重试4次 + data: types.CommonNotifyReq{ + Wid: req.Wid, + Data: req.Data, + }, + }) + return resp.SetStatusWithMessage(basic.CodeOK, "success") } //断言连接 ws, ok := value.(wsConnectItem)