From 8d0a675dd908927121102e54ce7ad76357614589 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 15 Aug 2023 17:36:17 +0800 Subject: [PATCH 1/3] fix --- server/websocket/internal/logic/ws_render_image_logic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 2bb61fe7..0d23615e 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -107,7 +107,7 @@ func (w *wsConnectItem) renderImage(data []byte) { } d, _ := json.Marshal(tmpData) //发送给对应的流水线组装数据 - if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { + if err = w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { logx.Error("发送渲染任务数据到MQ失败:", string(d), "err:", err) return } From 6502f241abb8d6b6c0ab023c40cd082f21ec4536 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 15 Aug 2023 17:40:27 +0800 Subject: [PATCH 2/3] fix --- server/websocket/internal/logic/datatransferlogic.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index c4961ac0..5bb6710b 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -284,7 +284,9 @@ func (w *wsConnectItem) sendToOutChan(data []byte) { case <-w.closeChan: return case w.outChan <- data: - logx.Info("notify send render result to out chan") + return + case <-time.After(time.Second * 3): //阻塞超过3秒丢弃 + return } } From d603cacd0b9319701a372fca658a44c6f8f716a4 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 15 Aug 2023 18:14:44 +0800 Subject: [PATCH 3/3] fix --- constants/websocket.go | 2 +- .../internal/logic/datatransferlogic.go | 9 +++-- .../internal/logic/ws_resume_last_connect.go | 27 -------------- .../internal/logic/ws_reuse_last_connect.go | 36 +++++++++++++++++++ 4 files changed, 41 insertions(+), 33 deletions(-) delete mode 100644 server/websocket/internal/logic/ws_resume_last_connect.go create mode 100644 server/websocket/internal/logic/ws_reuse_last_connect.go diff --git a/constants/websocket.go b/constants/websocket.go index 03bd63bb..8a4a065c 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -9,7 +9,7 @@ const ( //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" //请求恢复为上次连接的标识 - WEBSOCKET_REQUEST_RESUME_LAST_CONNECT = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT" + WEBSOCKET_REQUEST_REUSE_LAST_CONNECT = "WEBSOCKET_REQUEST_REUSE_LAST_CONNECT" //请求恢复为上次连接的标识错误 WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" //渲染前数据组装 diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 5bb6710b..1c1b05dd 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -154,8 +154,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User go func() { //把连接成功消息发回去 time.Sleep(time.Second * 2) //兼容下火狐 - b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) - _ = conn.WriteMessage(websocket.TextMessage, b) + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)) }() return ws } @@ -304,7 +303,7 @@ func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []by func (w *wsConnectItem) dealwithReciveData(data []byte) { var parseInfo websocket_data.DataTransferData if err := json.Unmarshal(data, &parseInfo); err != nil { - logx.Error("invalid format of websocket message") + logx.Error("invalid format of websocket message:", err) w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data))) return } @@ -315,8 +314,8 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { case constants.WEBSOCKET_RENDER_IMAGE: w.renderImage(d) //刷新重连请求恢复上次连接的标识 - case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT: - w.resumeLateConnect(d) + case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT: + w.reuseLastConnect(d) default: } diff --git a/server/websocket/internal/logic/ws_resume_last_connect.go b/server/websocket/internal/logic/ws_resume_last_connect.go deleted file mode 100644 index 79934c8b..00000000 --- a/server/websocket/internal/logic/ws_resume_last_connect.go +++ /dev/null @@ -1,27 +0,0 @@ -package logic - -import "fusenapi/constants" - -// 刷新重连请求恢复上次连接的标识 -func (w *wsConnectItem) resumeLateConnect(data []byte) { - clientId := string(data) - //id长度不对 - if len(clientId) != 50 { - rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid") - w.sendToOutChan(rsp) - return - } - publicMutex.Lock() - defer publicMutex.Unlock() - //存在是不能给他申请重新绑定 - if _, ok := mapConnPool.Load(clientId); ok { - rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ") - w.sendToOutChan(rsp) - return - } - //重新绑定 - w.uniqueId = clientId - rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, clientId) - w.sendToOutChan(rsp) - return -} diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go new file mode 100644 index 00000000..325bb292 --- /dev/null +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -0,0 +1,36 @@ +package logic + +import ( + "encoding/json" + "fusenapi/constants" + "github.com/zeromicro/go-zero/core/logx" +) + +// 刷新重连请求恢复上次连接的标识 +func (w *wsConnectItem) reuseLastConnect(data []byte) { + logx.Info("收到请求恢复上次连接标识数据:", string(data)) + var clientId string + if err := json.Unmarshal(data, &clientId); err != nil { + logx.Error(" invalid format of client id :", clientId) + w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "invalid format of client id")) + return + } + //id长度不对 + if len(clientId) > 100 { + w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "length of client id is to long")) + return + } + publicMutex.Lock() + defer publicMutex.Unlock() + //存在是不能给他申请重新绑定 + if _, ok := mapConnPool.Load(clientId); ok { + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ") + w.sendToOutChan(rsp) + return + } + //重新绑定 + w.uniqueId = clientId + rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, clientId) + w.sendToOutChan(rsp) + return +}