Merge branch 'develop' of gitee.com:fusenpack/fusenapi into develop

This commit is contained in:
Hiven 2023-08-15 18:35:23 +08:00
commit b4a06dc66f
5 changed files with 45 additions and 35 deletions

View File

@ -9,7 +9,7 @@ const (
//ws连接成功 //ws连接成功
WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS"
//请求恢复为上次连接的标识 //请求恢复为上次连接的标识
WEBSOCKET_REQUEST_RESUME_LAST_CONNECT = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT" WEBSOCKET_REQUEST_REUSE_LAST_CONNECT = "WEBSOCKET_REQUEST_REUSE_LAST_CONNECT"
//请求恢复为上次连接的标识错误 //请求恢复为上次连接的标识错误
WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR"
//渲染前数据组装 //渲染前数据组装

View File

@ -154,8 +154,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User
go func() { go func() {
//把连接成功消息发回去 //把连接成功消息发回去
time.Sleep(time.Second * 2) //兼容下火狐 time.Sleep(time.Second * 2) //兼容下火狐
b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId))
_ = conn.WriteMessage(websocket.TextMessage, b)
}() }()
return ws return ws
} }
@ -284,7 +283,9 @@ func (w *wsConnectItem) sendToOutChan(data []byte) {
case <-w.closeChan: case <-w.closeChan:
return return
case w.outChan <- data: case w.outChan <- data:
logx.Info("notify send render result to out chan") return
case <-time.After(time.Second * 3): //阻塞超过3秒丢弃
return
} }
} }
@ -302,7 +303,7 @@ func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []by
func (w *wsConnectItem) dealwithReciveData(data []byte) { func (w *wsConnectItem) dealwithReciveData(data []byte) {
var parseInfo websocket_data.DataTransferData var parseInfo websocket_data.DataTransferData
if err := json.Unmarshal(data, &parseInfo); err != nil { if err := json.Unmarshal(data, &parseInfo); err != nil {
logx.Error("invalid format of websocket message") logx.Error("invalid format of websocket message:", err)
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data))) w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data)))
return return
} }
@ -313,8 +314,8 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) {
case constants.WEBSOCKET_RENDER_IMAGE: case constants.WEBSOCKET_RENDER_IMAGE:
w.renderImage(d) w.renderImage(d)
//刷新重连请求恢复上次连接的标识 //刷新重连请求恢复上次连接的标识
case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT: case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT:
w.resumeLateConnect(d) w.reuseLastConnect(d)
default: default:
} }

View File

@ -107,7 +107,7 @@ func (w *wsConnectItem) renderImage(data []byte) {
} }
d, _ := json.Marshal(tmpData) d, _ := json.Marshal(tmpData)
//发送给对应的流水线组装数据 //发送给对应的流水线组装数据
if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { if err = w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil {
logx.Error("发送渲染任务数据到MQ失败:", string(d), "err:", err) logx.Error("发送渲染任务数据到MQ失败:", string(d), "err:", err)
return return
} }

View File

@ -1,27 +0,0 @@
package logic
import "fusenapi/constants"
// 刷新重连请求恢复上次连接的标识
func (w *wsConnectItem) resumeLateConnect(data []byte) {
clientId := string(data)
//id长度不对
if len(clientId) != 50 {
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid")
w.sendToOutChan(rsp)
return
}
publicMutex.Lock()
defer publicMutex.Unlock()
//存在是不能给他申请重新绑定
if _, ok := mapConnPool.Load(clientId); ok {
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ")
w.sendToOutChan(rsp)
return
}
//重新绑定
w.uniqueId = clientId
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, clientId)
w.sendToOutChan(rsp)
return
}

View File

@ -0,0 +1,36 @@
package logic
import (
"encoding/json"
"fusenapi/constants"
"github.com/zeromicro/go-zero/core/logx"
)
// 刷新重连请求恢复上次连接的标识
func (w *wsConnectItem) reuseLastConnect(data []byte) {
logx.Info("收到请求恢复上次连接标识数据:", string(data))
var clientId string
if err := json.Unmarshal(data, &clientId); err != nil {
logx.Error(" invalid format of client id :", clientId)
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "invalid format of client id"))
return
}
//id长度不对
if len(clientId) > 100 {
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "length of client id is to long"))
return
}
publicMutex.Lock()
defer publicMutex.Unlock()
//存在是不能给他申请重新绑定
if _, ok := mapConnPool.Load(clientId); ok {
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ")
w.sendToOutChan(rsp)
return
}
//重新绑定
w.uniqueId = clientId
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, clientId)
w.sendToOutChan(rsp)
return
}