Merge branch 'develop' of https://gitee.com/fusenpack/fusenapi into develop

This commit is contained in:
eson 2023-08-30 11:04:59 +08:00
commit f52224024e
2 changed files with 19 additions and 7 deletions

View File

@ -24,4 +24,6 @@ const (
WEBSOCKET_ERR_DATA_FORMAT Websocket = "WEBSOCKET_ERR_DATA_FORMAT"
//通用回调通知
WEBSOCKET_COMMON_NOTIFY Websocket = "WEBSOCKET_COMMON_NOTIFY"
//数据接收速度超过数据消费速度(缓冲队列满了)
WEBSOCKET_INCOME_CACHE_QUEUE_OVERFLOW = "WEBSOCKET_INCOME_CACHE_QUEUE_OVERFLOW"
)

View File

@ -41,7 +41,7 @@ func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Data
}
var (
//临时缓存对象
//临时对象缓存池
buffPool = sync.Pool{
New: func() interface{} {
return bytes.Buffer{}
@ -66,6 +66,14 @@ var (
}
//websocket连接存储
mapConnPool = sync.Map{}
//每个websocket连接入口缓冲队列长度
websocketInChanLen = 1000
//每个websocket连接出口缓冲队列长度
websocketOutChanLen = 1000
//渲染任务调度(添加任务/删除任务/修改任务属性缓冲队列长度该队列用于避免map并发读写冲突
renderImageTaskCtlChanLen = 500
//渲染任务缓冲队列长度
renderChanLen = 500
)
// 每个连接的连接基本属性
@ -152,14 +160,14 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
logic: l,
uniqueId: uniqueId,
closeChan: make(chan struct{}, 1),
inChan: make(chan []byte, 1000),
outChan: make(chan []byte, 1000),
inChan: make(chan []byte, websocketInChanLen),
outChan: make(chan []byte, websocketOutChanLen),
userId: userInfo.UserId,
guestId: userInfo.GuestId,
extendRenderProperty: extendRenderProperty{
renderImageTask: make(map[string]*renderTask),
renderImageTaskCtlChan: make(chan renderImageControlChanItem, 500),
renderChan: make(chan []byte, 500),
renderImageTaskCtlChan: make(chan renderImageControlChanItem, renderImageTaskCtlChanLen),
renderChan: make(chan []byte, renderChanLen),
},
}
//保存连接
@ -340,7 +348,8 @@ func (w *wsConnectItem) sendToOutChan(data []byte) {
return
case w.outChan <- data:
return
case <-time.After(time.Second * 3): //阻塞超过3秒丢弃
case <-time.After(time.Millisecond * 200): //阻塞超过200ms丢弃
logx.Error("failed to send to out chan,time expired,data:", string(data))
return
}
}
@ -352,7 +361,8 @@ func (w *wsConnectItem) sendToInChan(data []byte) {
return
case w.inChan <- data:
return
case <-time.After(time.Second * 3): //3秒超时丢弃
case <-time.After(time.Millisecond * 200): //200豪秒超时丢弃说明超过消费速度了
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_INCOME_CACHE_QUEUE_OVERFLOW, "send message is too frequent,Please reduce the sending speed ,otherwise, the message you sent may be lost"))
return
}
}