This commit is contained in:
laodaming 2023-08-30 10:55:27 +08:00
parent 9ecb04c98b
commit 81e9ac861a
2 changed files with 17 additions and 6 deletions

View File

@ -24,4 +24,6 @@ const (
WEBSOCKET_ERR_DATA_FORMAT Websocket = "WEBSOCKET_ERR_DATA_FORMAT"
//通用回调通知
WEBSOCKET_COMMON_NOTIFY Websocket = "WEBSOCKET_COMMON_NOTIFY"
//数据接收速度超过数据消费速度(缓冲队列满了)
WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW = "WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW"
)

View File

@ -41,7 +41,7 @@ func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Data
}
var (
//临时缓存对象
//临时对象缓存池
buffPool = sync.Pool{
New: func() interface{} {
return bytes.Buffer{}
@ -66,6 +66,14 @@ var (
}
//websocket连接存储
mapConnPool = sync.Map{}
//每个websocket连接入口缓冲队列长度
websocketInChanLen = 1000
//每个websocket连接出口缓冲队列长度
websocketOutChanLen = 1000
//渲染任务调度(添加任务/删除任务/修改任务属性缓冲队列长度该队列用于避免map并发读写冲突
renderImageTaskCtlChanLen = 500
//渲染任务缓冲队列长度
renderChanLen = 500
)
// 每个连接的连接基本属性
@ -152,14 +160,14 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
logic: l,
uniqueId: uniqueId,
closeChan: make(chan struct{}, 1),
inChan: make(chan []byte, 1000),
outChan: make(chan []byte, 1000),
inChan: make(chan []byte, websocketInChanLen),
outChan: make(chan []byte, websocketOutChanLen),
userId: userInfo.UserId,
guestId: userInfo.GuestId,
extendRenderProperty: extendRenderProperty{
renderImageTask: make(map[string]*renderTask),
renderImageTaskCtlChan: make(chan renderImageControlChanItem, 500),
renderChan: make(chan []byte, 500),
renderImageTaskCtlChan: make(chan renderImageControlChanItem, renderImageTaskCtlChanLen),
renderChan: make(chan []byte, renderChanLen),
},
}
//保存连接
@ -352,7 +360,8 @@ func (w *wsConnectItem) sendToInChan(data []byte) {
return
case w.inChan <- data:
return
case <-time.After(time.Second * 3): //3秒超时丢弃
case <-time.After(time.Millisecond * 200): //200豪秒超时丢弃说明超过消费速度了
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_CACHE_QUEUE_OVERFLOW, "send message is too frequent,Please reduce the sending speed!!!"))
return
}
}