From 81e9ac861a5eb3821ad4657c524bbc54a2dbbc65 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 30 Aug 2023 10:55:27 +0800 Subject: [PATCH 1/2] fix --- constants/websocket.go | 2 ++ .../internal/logic/datatransferlogic.go | 21 +++++++++++++------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/constants/websocket.go b/constants/websocket.go index 507f403c..34c682d9 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -24,4 +24,6 @@ const ( WEBSOCKET_ERR_DATA_FORMAT Websocket = "WEBSOCKET_ERR_DATA_FORMAT" //通用回调通知 WEBSOCKET_COMMON_NOTIFY Websocket = "WEBSOCKET_COMMON_NOTIFY" + //数据接收速度超过数据消费速度(缓冲队列满了) + WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW = "WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW" ) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 7c97ec08..ddf752af 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -41,7 +41,7 @@ func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Data } var ( - //临时缓存对象池 + //临时对象缓存池 buffPool = sync.Pool{ New: func() interface{} { return bytes.Buffer{} @@ -66,6 +66,14 @@ var ( } //websocket连接存储 mapConnPool = sync.Map{} + //每个websocket连接入口缓冲队列长度 + websocketInChanLen = 1000 + //每个websocket连接出口缓冲队列长度 + websocketOutChanLen = 1000 + //渲染任务调度(添加任务/删除任务/修改任务属性)缓冲队列长度(该队列用于避免map并发读写冲突) + renderImageTaskCtlChanLen = 500 + //渲染任务缓冲队列长度 + renderChanLen = 500 ) // 每个连接的连接基本属性 @@ -152,14 +160,14 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use logic: l, uniqueId: uniqueId, closeChan: make(chan struct{}, 1), - inChan: make(chan []byte, 1000), - outChan: make(chan []byte, 1000), + inChan: make(chan []byte, websocketInChanLen), + outChan: make(chan []byte, websocketOutChanLen), userId: userInfo.UserId, guestId: userInfo.GuestId, extendRenderProperty: extendRenderProperty{ renderImageTask: make(map[string]*renderTask), - renderImageTaskCtlChan: make(chan renderImageControlChanItem, 500), - renderChan: make(chan []byte, 500), + renderImageTaskCtlChan: make(chan renderImageControlChanItem, renderImageTaskCtlChanLen), + renderChan: make(chan []byte, renderChanLen), }, } //保存连接 @@ -352,7 +360,8 @@ func (w *wsConnectItem) sendToInChan(data []byte) { return case w.inChan <- data: return - case <-time.After(time.Second * 3): //3秒超时丢弃 + case <-time.After(time.Millisecond * 200): //200豪秒超时丢弃,说明超过消费速度了 + w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW, "send message is too frequent,Please reduce the sending speed!!!")) return } } From ebb58461115a27719ceeb65a32d9de8f522f3bac Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 30 Aug 2023 11:01:49 +0800 Subject: [PATCH 2/2] fix --- constants/websocket.go | 2 +- server/websocket/internal/logic/datatransferlogic.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/constants/websocket.go b/constants/websocket.go index 34c682d9..84321556 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -25,5 +25,5 @@ const ( //通用回调通知 WEBSOCKET_COMMON_NOTIFY Websocket = "WEBSOCKET_COMMON_NOTIFY" //数据接收速度超过数据消费速度(缓冲队列满了) - WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW = "WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW" + WEBSOCKET_INCOME_CACHE_QUEUE_OVERFLOW = "WEBSOCKET_INCOME_CACHE_QUEUE_OVERFLOW" ) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index ddf752af..53a2d2c6 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -348,7 +348,8 @@ func (w *wsConnectItem) sendToOutChan(data []byte) { return case w.outChan <- data: return - case <-time.After(time.Second * 3): //阻塞超过3秒丢弃 + case <-time.After(time.Millisecond * 200): //阻塞超过200ms丢弃 + logx.Error("failed to send to out chan,time expired,data:", string(data)) return } } @@ -361,7 +362,7 @@ func (w *wsConnectItem) sendToInChan(data []byte) { case w.inChan <- data: return case <-time.After(time.Millisecond * 200): //200豪秒超时丢弃,说明超过消费速度了 - w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW, "send message is too frequent,Please reduce the sending speed!!!")) + w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_INCOME_CACHE_QUEUE_OVERFLOW, "send message is too frequent,Please reduce the sending speed ,otherwise, the message you sent may be lost")) return } }