From d101d241f43bf627b913c5df708cd0108f8b90be Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 10 Aug 2023 19:12:07 +0800 Subject: [PATCH 1/5] fix --- server/render/consumer/assemble_render_data.go | 6 ++++++ server/websocket/internal/logic/mq_consumer.go | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/server/render/consumer/assemble_render_data.go b/server/render/consumer/assemble_render_data.go index 3c7f1ffa..98184dca 100644 --- a/server/render/consumer/assemble_render_data.go +++ b/server/render/consumer/assemble_render_data.go @@ -33,6 +33,11 @@ type MqConsumerRenderAssemble struct { } func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { + defer func() { + if err := recover(); err != nil { + logx.Error("MqConsumerRenderAssemble panic:", err) + } + }() logx.Info("收到需要组装的消息:", string(data)) var parseInfo websocket_data.AssembleRenderData if err := json.Unmarshal(data, &parseInfo); err != nil { @@ -244,6 +249,7 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo logx.Error("failed to parse python api rsp:", err) return "", err } + //fmt.Println("××××××××××××××××××××:", pythonApiInfo) //上传刀版图 var upload = file.Upload{ Ctx: ctx, diff --git a/server/websocket/internal/logic/mq_consumer.go b/server/websocket/internal/logic/mq_consumer.go index 7488e229..410dee3e 100644 --- a/server/websocket/internal/logic/mq_consumer.go +++ b/server/websocket/internal/logic/mq_consumer.go @@ -13,6 +13,11 @@ type MqConsumerRenderResult struct { } func (m *MqConsumerRenderResult) Run(ctx context.Context, data []byte) error { + defer func() { + if err := recover(); err != nil { + logx.Error("MqConsumerRenderResult panic:", err) + } + }() logx.Info("接收到MqConsumerRenderResult数据:", string(data)) var parseInfo websocket_data.RenderImageNotify if err := json.Unmarshal(data, &parseInfo); err != nil { From b085f3dbefd8c84479802468dedaeb4a444504c4 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 10 Aug 2023 19:41:25 +0800 Subject: [PATCH 2/5] fix --- .../internal/logic/datatransferlogic.go | 58 ++++++++++++------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 44dc3e82..cbd1a189 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -9,6 +9,7 @@ import ( "fusenapi/utils/auth" "fusenapi/utils/id_generator" "fusenapi/utils/websocket_data" + "github.com/google/uuid" "github.com/gorilla/websocket" "net/http" "sync" @@ -60,6 +61,8 @@ var ( } //websocket连接存储 mapConnPool = sync.Map{} + //公共互斥锁 + publicMutex sync.Mutex ) // 每个连接的连接基本属性 @@ -70,7 +73,7 @@ type wsConnectItem struct { allModels *gmodel.AllModelsGen closeChan chan struct{} //ws连接关闭chan isClose bool //是否已经关闭 - uniqueId uint64 //ws连接唯一标识 + uniqueId string //ws连接唯一标识 inChan chan []byte //接受消息缓冲通道 outChan chan []byte //发送回客户端的消息 mutex sync.Mutex //互斥锁 @@ -94,7 +97,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp ) isAuth, userInfo = l.checkAuth(svcCtx, r) if !isAuth { - time.Sleep(time.Second * 4) //兼容下火狐 + time.Sleep(time.Second * 2) //兼容下火狐 rsp := websocket_data.DataTransferData{ T: constants.WEBSOCKET_UNAUTH, D: nil, @@ -109,8 +112,32 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp //测试的目前写死 39 var userInfo auth.UserInfo userInfo.UserId = 39 + ws := l.setConnPool(conn, userInfo) + defer ws.close() + go func() { + //把连接成功消息发回去 + time.Sleep(time.Second * 2) //兼容下火狐 + b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) + _ = conn.WriteMessage(websocket.TextMessage, b) + }() + //循环读客户端信息 + go ws.readLoop() + //循环把数据发送给客户端 + go ws.writeLoop() + //推消息到云渲染 + go ws.sendLoop() + //操作连接中渲染任务的增加/删除 + go ws.operationRenderTask() + //心跳 + ws.heartbeat() +} + +// 获取唯一id +func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo) wsConnectItem { + publicMutex.Lock() + defer publicMutex.Unlock() //生成连接唯一标识 - uniqueId := websocketIdGenerator.Get() + uniqueId := l.getUniqueId() ws := wsConnectItem{ conn: conn, ctx: l.ctx, @@ -129,23 +156,14 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp } //保存连接 mapConnPool.Store(uniqueId, ws) - defer ws.close() - go func() { - //把连接成功消息发回去 - time.Sleep(time.Second * 4) //兼容下火狐 - b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) - _ = conn.WriteMessage(websocket.TextMessage, b) - }() - //循环读客户端信息 - go ws.readLoop() - //循环把数据发送给客户端 - go ws.writeLoop() - //推消息到云渲染 - go ws.sendLoop() - //操作连接中渲染任务的增加/删除 - go ws.operationRenderTask() - //心跳 - ws.heartbeat() + return ws +} +func (l *DataTransferLogic) getUniqueId() string { + uniqueId := uuid.New().String() + time.Now().Format("20060102150405") + if _, ok := mapConnPool.Load(uniqueId); ok { + uniqueId = l.getUniqueId() + } + return uniqueId } // 鉴权 From df8772812aa1927bb7a180aef0b30dfb9c3c9df1 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 10 Aug 2023 19:42:31 +0800 Subject: [PATCH 3/5] fix --- server/websocket/internal/logic/datatransferlogic.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index cbd1a189..825ce08a 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -114,12 +114,6 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp userInfo.UserId = 39 ws := l.setConnPool(conn, userInfo) defer ws.close() - go func() { - //把连接成功消息发回去 - time.Sleep(time.Second * 2) //兼容下火狐 - b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) - _ = conn.WriteMessage(websocket.TextMessage, b) - }() //循环读客户端信息 go ws.readLoop() //循环把数据发送给客户端 @@ -156,6 +150,12 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User } //保存连接 mapConnPool.Store(uniqueId, ws) + go func() { + //把连接成功消息发回去 + time.Sleep(time.Second * 2) //兼容下火狐 + b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) + _ = conn.WriteMessage(websocket.TextMessage, b) + }() return ws } func (l *DataTransferLogic) getUniqueId() string { From b0e534de8c35f3dbd6725f0c5e19d8356c3c217e Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 11 Aug 2023 10:12:25 +0800 Subject: [PATCH 4/5] fix --- server/render/consumer/assemble_render_data.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/render/consumer/assemble_render_data.go b/server/render/consumer/assemble_render_data.go index 98184dca..5b041711 100644 --- a/server/render/consumer/assemble_render_data.go +++ b/server/render/consumer/assemble_render_data.go @@ -25,7 +25,7 @@ import ( type pythonApiRsp struct { Id string `json:"id"` //物料模板的id LogoUrl string `json:"logo_url"` //logo地址 - result string `json:"result"` //图片base64 + Result string `json:"result"` //图片base64 } // 消费渲染需要组装的数据 @@ -258,7 +258,7 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo } uploadRes, err := upload.UploadFileByBase64(&file.UploadBaseReq{ FileHash: combineHash, - FileData: pythonApiInfo.result, + FileData: pythonApiInfo.Result, UploadBucket: 1, ApiType: 2, UserId: parseInfo.RenderData.UserId, From 84749f4d14458ca40f2cae2d91dff9706698a7d9 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 11 Aug 2023 10:49:29 +0800 Subject: [PATCH 5/5] fix --- constants/websocket.go | 6 +++++ .../internal/logic/datatransferlogic.go | 8 +++++- .../internal/logic/ws_resume_last_connect.go | 27 +++++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 server/websocket/internal/logic/ws_resume_last_connect.go diff --git a/constants/websocket.go b/constants/websocket.go index be28eb32..ced0c947 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -8,6 +8,12 @@ const ( WEBSOCKET_UNAUTH = "WEBSOCKET_UNAUTH" //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" + //请求恢复为上次连接的标识 + WEBSOCKET_REQUEST_RESUME_LAST_CONNECT = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT" + //请求恢复为上次连接的标识错误 + WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" + //请求恢复为上次连接的标识成功 + WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS" //渲染前数据组装 WEBSOCKET_RENDER_IMAGE_ASSEMBLE = "WEBSOCKET_RENDER_IMAGE_ASSEMBLE" //图片渲染 diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 825ce08a..16f822aa 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -112,6 +112,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp //测试的目前写死 39 var userInfo auth.UserInfo userInfo.UserId = 39 + //设置连接 ws := l.setConnPool(conn, userInfo) defer ws.close() //循环读客户端信息 @@ -126,7 +127,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp ws.heartbeat() } -// 获取唯一id +// 设置连接 func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo) wsConnectItem { publicMutex.Lock() defer publicMutex.Unlock() @@ -158,6 +159,8 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User }() return ws } + +// 获取唯一id func (l *DataTransferLogic) getUniqueId() string { uniqueId := uuid.New().String() + time.Now().Format("20060102150405") if _, ok := mapConnPool.Load(uniqueId); ok { @@ -309,6 +312,9 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { //图片渲染 case constants.WEBSOCKET_RENDER_IMAGE: w.renderImage(d) + //刷新重连请求恢复上次连接的标识 + case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT: + w.resumeLateConnect(d) default: } diff --git a/server/websocket/internal/logic/ws_resume_last_connect.go b/server/websocket/internal/logic/ws_resume_last_connect.go new file mode 100644 index 00000000..39f29414 --- /dev/null +++ b/server/websocket/internal/logic/ws_resume_last_connect.go @@ -0,0 +1,27 @@ +package logic + +import "fusenapi/constants" + +// 刷新重连请求恢复上次连接的标识 +func (w *wsConnectItem) resumeLateConnect(data []byte) { + clientId := string(data) + //id长度不对 + if len(clientId) != 50 { + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid") + w.sendToOutChan(rsp) + return + } + publicMutex.Lock() + defer publicMutex.Unlock() + //存在是不能给他申请重新绑定 + if _, ok := mapConnPool.Load(clientId); ok { + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ") + w.sendToOutChan(rsp) + return + } + //重新绑定 + w.uniqueId = clientId + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS, clientId) + w.sendToOutChan(rsp) + return +}