From 6c368177507f7085653fa7a69ea364f89077a479 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 30 Aug 2023 17:40:40 +0800 Subject: [PATCH] fix --- .../logic/allocation_processing_factory.go | 33 ++++++++++++++++++ .../internal/logic/datatransferlogic.go | 15 +++----- .../internal/logic/ws_render_image.go | 12 ++++--- .../internal/logic/ws_reuse_last_connect.go | 34 +++++++++++-------- 4 files changed, 64 insertions(+), 30 deletions(-) create mode 100644 server/websocket/internal/logic/allocation_processing_factory.go diff --git a/server/websocket/internal/logic/allocation_processing_factory.go b/server/websocket/internal/logic/allocation_processing_factory.go new file mode 100644 index 00000000..e28de1bf --- /dev/null +++ b/server/websocket/internal/logic/allocation_processing_factory.go @@ -0,0 +1,33 @@ +package logic + +import ( + "fusenapi/constants" +) + +// 消息分发工厂 +type allocationProcessorFactory interface { + allocationMessage(data []byte) +} + +var mapAllocationProcessor = make(map[constants.Websocket]allocationProcessorFactory) + +func (w *wsConnectItem) newAllocationProcessor(msgType constants.Websocket) allocationProcessorFactory { + if obj, ok := mapAllocationProcessor[msgType]; ok { + return obj + } + var obj allocationProcessorFactory + switch msgType { + //图片渲染 + case constants.WEBSOCKET_RENDER_IMAGE: + obj = &renderProcesser{w} + //刷新重连请求恢复上次连接的标识 + case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT: + obj = &reuseConnProcesser{w} + default: + + } + if obj != nil { + mapAllocationProcessor[msgType] = obj + } + return obj +} diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index dea9b2b1..a0ac676f 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -391,15 +391,8 @@ func (w *wsConnectItem) allocationProcessing(data []byte) { return } d, _ := json.Marshal(parseInfo.D) - //分消息类型给到不同逻辑处理,可扩展 - switch parseInfo.T { - //图片渲染 - case constants.WEBSOCKET_RENDER_IMAGE: - w.sendToRenderChan(d) - //刷新重连请求恢复上次连接的标识 - case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT: - w.reuseLastConnect(d) - default: - logx.Error("未知消息类型:uid:", w.userId, "gid:", w.guestId, "data:", string(data)) - } + //获取工厂实例 + processor := w.newAllocationProcessor(parseInfo.T) + //执行工厂方法 + processor.allocationMessage(d) } diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index 887dc2e7..5700c01a 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -18,6 +18,11 @@ import ( "time" ) +// 渲染处理器 +type renderProcesser struct { + w *wsConnectItem +} + // 云渲染属性 type extendRenderProperty struct { renderImageTask map[string]*renderTask //需要渲染的图片任务 key是taskId val 是renderId @@ -44,12 +49,11 @@ type renderTask struct { uploadUnityRenderImageTakesTime int64 //上传unity渲染结果图时间 } -// 发送到渲染缓冲队列 -func (w *wsConnectItem) sendToRenderChan(data []byte) { +func (r *renderProcesser) allocationMessage(data []byte) { select { - case <-w.closeChan: //已经关闭 + case <-r.w.closeChan: //已经关闭 return - case w.extendRenderProperty.renderChan <- data: //发入到缓冲队列 + case r.w.extendRenderProperty.renderChan <- data: //发入到缓冲队列 return case <-time.After(time.Second * 3): //三秒没进入缓冲队列就丢弃 return diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go index 7ef64ec7..d8e00d79 100644 --- a/server/websocket/internal/logic/ws_reuse_last_connect.go +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -9,32 +9,36 @@ import ( "github.com/zeromicro/go-zero/core/logx" ) -// 刷新重连请求恢复上次连接的标识 -func (w *wsConnectItem) reuseLastConnect(data []byte) { +// 复用连接处理器 +type reuseConnProcesser struct { + w *wsConnectItem +} + +func (r *reuseConnProcesser) allocationMessage(data []byte) { logx.Info("收到请求恢复上次连接标识数据:", string(data)) var wid string if err := json.Unmarshal(data, &wid); err != nil { logx.Error(" invalid format of wid :", wid) - w.incomeDataFormatErrResponse("invalid format of wid") + r.w.incomeDataFormatErrResponse("invalid format of wid") return } //解密 decryptionWid, err := encryption_decryption.CBCDecrypt(wid) if err != nil { - w.reuseLastConnErrResponse("invalid wid") + r.w.reuseLastConnErrResponse("invalid wid") return } lendecryptionWid := len(decryptionWid) //合成client后缀,不是同个后缀的不能复用 - userPart := getUserJoinPart(w.userId, w.guestId, w.userAgent) + userPart := getUserJoinPart(r.w.userId, r.w.guestId, r.w.userAgent) lenUserPart := len(userPart) if lendecryptionWid <= lenUserPart { - w.reuseLastConnErrResponse("length of client id is to short") + r.w.reuseLastConnErrResponse("length of client id is to short") return } //尾部不同不能复用 if decryptionWid[lendecryptionWid-lenUserPart:] != userPart { - w.reuseLastConnErrResponse("the client id is not belong to you before") + r.w.reuseLastConnErrResponse("the client id is not belong to you before") return } //存在是不能给他申请重新绑定 @@ -44,20 +48,20 @@ func (w *wsConnectItem) reuseLastConnect(data []byte) { logx.Error("连接断言失败") } //是当前自己占用(无需处理) - if obj.uniqueId == w.uniqueId { - rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) - w.sendToOutChan(rsp) + if obj.uniqueId == r.w.uniqueId { + rsp := r.w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) + r.w.sendToOutChan(rsp) return } else { - w.reuseLastConnErrResponse("the wid is used by other people") + r.w.reuseLastConnErrResponse("the wid is used by other people") return } } //重新绑定 - w.uniqueId = wid - mapConnPool.Store(wid, *w) - rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) - w.sendToOutChan(rsp) + r.w.uniqueId = wid + mapConnPool.Store(wid, *r.w) + rsp := r.w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) + r.w.sendToOutChan(rsp) return }