diff --git a/constants/websocket.go b/constants/websocket.go index 03bd63bb..8a4a065c 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -9,7 +9,7 @@ const ( //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" //请求恢复为上次连接的标识 - WEBSOCKET_REQUEST_RESUME_LAST_CONNECT = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT" + WEBSOCKET_REQUEST_REUSE_LAST_CONNECT = "WEBSOCKET_REQUEST_REUSE_LAST_CONNECT" //请求恢复为上次连接的标识错误 WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" //渲染前数据组装 diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index c4961ac0..1c1b05dd 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -154,8 +154,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User go func() { //把连接成功消息发回去 time.Sleep(time.Second * 2) //兼容下火狐 - b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) - _ = conn.WriteMessage(websocket.TextMessage, b) + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)) }() return ws } @@ -284,7 +283,9 @@ func (w *wsConnectItem) sendToOutChan(data []byte) { case <-w.closeChan: return case w.outChan <- data: - logx.Info("notify send render result to out chan") + return + case <-time.After(time.Second * 3): //阻塞超过3秒丢弃 + return } } @@ -302,7 +303,7 @@ func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []by func (w *wsConnectItem) dealwithReciveData(data []byte) { var parseInfo websocket_data.DataTransferData if err := json.Unmarshal(data, &parseInfo); err != nil { - logx.Error("invalid format of websocket message") + logx.Error("invalid format of websocket message:", err) w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data))) return } @@ -313,8 +314,8 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { case constants.WEBSOCKET_RENDER_IMAGE: w.renderImage(d) //刷新重连请求恢复上次连接的标识 - case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT: - w.resumeLateConnect(d) + case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT: + w.reuseLastConnect(d) default: } diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 2bb61fe7..0d23615e 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -107,7 +107,7 @@ func (w *wsConnectItem) renderImage(data []byte) { } d, _ := json.Marshal(tmpData) //发送给对应的流水线组装数据 - if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { + if err = w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { logx.Error("发送渲染任务数据到MQ失败:", string(d), "err:", err) return } diff --git a/server/websocket/internal/logic/ws_resume_last_connect.go b/server/websocket/internal/logic/ws_resume_last_connect.go deleted file mode 100644 index 79934c8b..00000000 --- a/server/websocket/internal/logic/ws_resume_last_connect.go +++ /dev/null @@ -1,27 +0,0 @@ -package logic - -import "fusenapi/constants" - -// 刷新重连请求恢复上次连接的标识 -func (w *wsConnectItem) resumeLateConnect(data []byte) { - clientId := string(data) - //id长度不对 - if len(clientId) != 50 { - rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid") - w.sendToOutChan(rsp) - return - } - publicMutex.Lock() - defer publicMutex.Unlock() - //存在是不能给他申请重新绑定 - if _, ok := mapConnPool.Load(clientId); ok { - rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ") - w.sendToOutChan(rsp) - return - } - //重新绑定 - w.uniqueId = clientId - rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, clientId) - w.sendToOutChan(rsp) - return -} diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go new file mode 100644 index 00000000..325bb292 --- /dev/null +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -0,0 +1,36 @@ +package logic + +import ( + "encoding/json" + "fusenapi/constants" + "github.com/zeromicro/go-zero/core/logx" +) + +// 刷新重连请求恢复上次连接的标识 +func (w *wsConnectItem) reuseLastConnect(data []byte) { + logx.Info("收到请求恢复上次连接标识数据:", string(data)) + var clientId string + if err := json.Unmarshal(data, &clientId); err != nil { + logx.Error(" invalid format of client id :", clientId) + w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "invalid format of client id")) + return + } + //id长度不对 + if len(clientId) > 100 { + w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "length of client id is to long")) + return + } + publicMutex.Lock() + defer publicMutex.Unlock() + //存在是不能给他申请重新绑定 + if _, ok := mapConnPool.Load(clientId); ok { + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ") + w.sendToOutChan(rsp) + return + } + //重新绑定 + w.uniqueId = clientId + rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, clientId) + w.sendToOutChan(rsp) + return +}