This commit is contained in:
laodaming 2023-08-30 17:40:40 +08:00
parent 2ce9776255
commit 6c36817750
4 changed files with 64 additions and 30 deletions

View File

@ -0,0 +1,33 @@
package logic
import (
"fusenapi/constants"
)
// 消息分发工厂
type allocationProcessorFactory interface {
allocationMessage(data []byte)
}
var mapAllocationProcessor = make(map[constants.Websocket]allocationProcessorFactory)
func (w *wsConnectItem) newAllocationProcessor(msgType constants.Websocket) allocationProcessorFactory {
if obj, ok := mapAllocationProcessor[msgType]; ok {
return obj
}
var obj allocationProcessorFactory
switch msgType {
//图片渲染
case constants.WEBSOCKET_RENDER_IMAGE:
obj = &renderProcesser{w}
//刷新重连请求恢复上次连接的标识
case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT:
obj = &reuseConnProcesser{w}
default:
}
if obj != nil {
mapAllocationProcessor[msgType] = obj
}
return obj
}

View File

@ -391,15 +391,8 @@ func (w *wsConnectItem) allocationProcessing(data []byte) {
return
}
d, _ := json.Marshal(parseInfo.D)
//分消息类型给到不同逻辑处理,可扩展
switch parseInfo.T {
//图片渲染
case constants.WEBSOCKET_RENDER_IMAGE:
w.sendToRenderChan(d)
//刷新重连请求恢复上次连接的标识
case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT:
w.reuseLastConnect(d)
default:
logx.Error("未知消息类型uid:", w.userId, "gid:", w.guestId, "data:", string(data))
}
//获取工厂实例
processor := w.newAllocationProcessor(parseInfo.T)
//执行工厂方法
processor.allocationMessage(d)
}

View File

@ -18,6 +18,11 @@ import (
"time"
)
// 渲染处理器
type renderProcesser struct {
w *wsConnectItem
}
// 云渲染属性
type extendRenderProperty struct {
renderImageTask map[string]*renderTask //需要渲染的图片任务 key是taskId val 是renderId
@ -44,12 +49,11 @@ type renderTask struct {
uploadUnityRenderImageTakesTime int64 //上传unity渲染结果图时间
}
// 发送到渲染缓冲队列
func (w *wsConnectItem) sendToRenderChan(data []byte) {
func (r *renderProcesser) allocationMessage(data []byte) {
select {
case <-w.closeChan: //已经关闭
case <-r.w.closeChan: //已经关闭
return
case w.extendRenderProperty.renderChan <- data: //发入到缓冲队列
case r.w.extendRenderProperty.renderChan <- data: //发入到缓冲队列
return
case <-time.After(time.Second * 3): //三秒没进入缓冲队列就丢弃
return

View File

@ -9,32 +9,36 @@ import (
"github.com/zeromicro/go-zero/core/logx"
)
// 刷新重连请求恢复上次连接的标识
func (w *wsConnectItem) reuseLastConnect(data []byte) {
// 复用连接处理器
type reuseConnProcesser struct {
w *wsConnectItem
}
func (r *reuseConnProcesser) allocationMessage(data []byte) {
logx.Info("收到请求恢复上次连接标识数据:", string(data))
var wid string
if err := json.Unmarshal(data, &wid); err != nil {
logx.Error(" invalid format of wid :", wid)
w.incomeDataFormatErrResponse("invalid format of wid")
r.w.incomeDataFormatErrResponse("invalid format of wid")
return
}
//解密
decryptionWid, err := encryption_decryption.CBCDecrypt(wid)
if err != nil {
w.reuseLastConnErrResponse("invalid wid")
r.w.reuseLastConnErrResponse("invalid wid")
return
}
lendecryptionWid := len(decryptionWid)
//合成client后缀,不是同个后缀的不能复用
userPart := getUserJoinPart(w.userId, w.guestId, w.userAgent)
userPart := getUserJoinPart(r.w.userId, r.w.guestId, r.w.userAgent)
lenUserPart := len(userPart)
if lendecryptionWid <= lenUserPart {
w.reuseLastConnErrResponse("length of client id is to short")
r.w.reuseLastConnErrResponse("length of client id is to short")
return
}
//尾部不同不能复用
if decryptionWid[lendecryptionWid-lenUserPart:] != userPart {
w.reuseLastConnErrResponse("the client id is not belong to you before")
r.w.reuseLastConnErrResponse("the client id is not belong to you before")
return
}
//存在是不能给他申请重新绑定
@ -44,20 +48,20 @@ func (w *wsConnectItem) reuseLastConnect(data []byte) {
logx.Error("连接断言失败")
}
//是当前自己占用(无需处理)
if obj.uniqueId == w.uniqueId {
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)
w.sendToOutChan(rsp)
if obj.uniqueId == r.w.uniqueId {
rsp := r.w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)
r.w.sendToOutChan(rsp)
return
} else {
w.reuseLastConnErrResponse("the wid is used by other people")
r.w.reuseLastConnErrResponse("the wid is used by other people")
return
}
}
//重新绑定
w.uniqueId = wid
mapConnPool.Store(wid, *w)
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)
w.sendToOutChan(rsp)
r.w.uniqueId = wid
mapConnPool.Store(wid, *r.w)
rsp := r.w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)
r.w.sendToOutChan(rsp)
return
}