From a1060823642a696d258360def9421d186e658289 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 7 Aug 2023 10:48:14 +0800 Subject: [PATCH 1/3] fix --- .../internal/logic/datatransferlogic.go | 8 +- .../internal/logic/rendernotifylogic.go | 13 ++-- .../internal/logic/ws_render_image_logic.go | 73 +++++++++---------- server/websocket/internal/types/types.go | 20 ++--- server_api/websocket.api | 20 ++--- 5 files changed, 55 insertions(+), 79 deletions(-) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index cbd2a0c9..317a93ad 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -3,7 +3,6 @@ package logic import ( "bytes" "encoding/json" - "fmt" "fusenapi/constants" "fusenapi/initalize" "fusenapi/server/websocket/internal/types" @@ -108,7 +107,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp inChan: make(chan []byte, 1000), outChan: make(chan []byte, 1000), renderProperty: renderProperty{ - renderImageTask: make(map[string]struct{}), + renderImageTask: make(map[string]string), renderImageTaskCtlChan: make(chan renderImageControlChanItem, 100), }, } @@ -250,11 +249,6 @@ func (w *wsConnectItem) sendToOutChan(data []byte) { } } -// 获取需要渲染图片的map key -func (w *wsConnectItem) getRenderImageMapKey(productId, templateTagId, logoId int64, algorithmVersion string) string { - return fmt.Sprintf("%d-%d-%d-%s", productId, templateTagId, logoId, algorithmVersion) -} - // 格式化返回数据 func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []byte { d := types.DataTransferData{ diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go index 9e5e0c9e..f499157f 100644 --- a/server/websocket/internal/logic/rendernotifylogic.go +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -61,21 +61,20 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq) (resp *basi if ws.isClose { return true } - renderKey := ws.getRenderImageMapKey(req.Info.ProductId, req.Info.TemplateTagId, req.Info.LogoId, req.Info.AlgorithmVersion) //查询有无该渲染任务 - _, ok = ws.renderProperty.renderImageTask[renderKey] + renderId, ok := ws.renderProperty.renderImageTask[req.Info.TaskId] if !ok { return true } b := ws.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, types.RenderImageRspMsg{ - ProductId: req.Info.ProductId, - TemplateTagId: req.Info.TemplateTagId, - Image: req.Info.Image, + RenderId: renderId, + Image: req.Info.Image, }) //删除对应的需要渲染的图片map ws.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ - Option: 0, //0删除 1添加 - Key: renderKey, + Option: 0, //0删除 1添加 + TaskId: req.Info.TaskId, + RenderId: renderId, } //发送数据到out chan ws.sendToOutChan(b) diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 7e969d40..3ba3fdd7 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -4,38 +4,21 @@ import ( "encoding/json" "fusenapi/constants" "fusenapi/server/websocket/internal/types" + "fusenapi/utils/hash" "github.com/zeromicro/go-zero/core/logx" ) // 云渲染属性 type renderProperty struct { - renderImageTask map[string]struct{} //需要渲染的图片任务 + renderImageTask map[string]string //需要渲染的图片任务 key是taskId val 是renderId renderImageTaskCtlChan chan renderImageControlChanItem //渲染任务新增移除的控制通道 } // 渲染任务新增移除的控制通道的数据 type renderImageControlChanItem struct { - Option int // 0删除 1添加 - Key string //map的key -} - -// 操作连接中渲染任务的增加/删除 -func (w *wsConnectItem) operationRenderTask() { - for { - select { - case <-w.closeChan: - return - case data := <-w.renderProperty.renderImageTaskCtlChan: - switch data.Option { - case 0: //删除任务 - delete(w.renderProperty.renderImageTask, data.Key) - case 1: //新增任务 - w.renderProperty.renderImageTask[data.Key] = struct{}{} - default: - - } - } - } + Option int // 0删除 1添加 + TaskId string //map的key + RenderId string // map的val } // 渲染请求数据处理发送云渲染服务处理 @@ -48,23 +31,39 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { } logx.Info("收到请求云渲染图片数据:", renderImageData) //把需要渲染的图片任务加进去 - for _, productId := range renderImageData.ProductIds { - select { - case <-w.closeChan: //连接关闭了 + select { + case <-w.closeChan: //连接关闭了 + return + default: + //加入渲染任务 + taskId := hash.JsonHashKey(renderImageData.RenderData) + w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ + Option: 1, //0删除 1添加 + TaskId: taskId, + RenderId: renderImageData.RenderId, + } + //发送给对应的流水线组装数据 + if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { + logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err) return - default: - //加入渲染任务 - key := w.getRenderImageMapKey(productId, renderImageData.TemplateTagId, renderImageData.LogoId, renderImageData.AlgorithmVersion) - w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ - Option: 1, //0删除 1添加 - Key: key, + } + logx.Info("发送渲染数据到rabbitmq成功:", string(data)) + } +} + +// 操作连接中渲染任务的增加/删除 +func (w *wsConnectItem) operationRenderTask() { + for { + select { + case <-w.closeChan: + return + case data := <-w.renderProperty.renderImageTaskCtlChan: + switch data.Option { + case 0: //删除任务 + delete(w.renderProperty.renderImageTask, data.TaskId) + case 1: //新增任务 + w.renderProperty.renderImageTask[data.TaskId] = data.RenderId } - //发送给对应的流水线组装数据 - if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { - logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err) - continue - } - logx.Info("发送渲染数据到rabbitmq成功:", string(data)) } } } diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 901b8123..141d7e5e 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -11,18 +11,13 @@ type DataTransferData struct { } type RenderImageReqMsg struct { - ProductIds []int64 `json:"product_ids"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - LogoId int64 `json:"logo_id"` //logoid - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 + RenderId string `json:"render_id"` //渲染id + RenderData interface{} `json:"render_data"` //参数数据 } type RenderImageRspMsg struct { - ProductId int64 `json:"product_id"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` //渲染后的图片 + RenderId string `json:"render_id"` //渲染id + Image string `json:"image"` //渲染结果图片 } type ThirdPartyLoginRspMsg struct { @@ -36,11 +31,8 @@ type RenderNotifyReq struct { } type NotifyInfo struct { - ProductId int64 `json:"product_id"` //产品id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` + TaskId string `json:"task_id"` //任务id + Image string `json:"image"` } type ThirdPartyLoginNotifyReq struct { diff --git a/server_api/websocket.api b/server_api/websocket.api index e24e2874..254e29a5 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -26,17 +26,12 @@ type DataTransferData { D interface{} `json:"d"` //传递的消息 } type RenderImageReqMsg { //websocket接受要云渲染处理的数据 - ProductIds []int64 `json:"product_ids"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - LogoId int64 `json:"logo_id"` //logoid - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 + RenderId string `json:"render_id"` //渲染id + RenderData interface{} `json:"render_data"` //参数数据 } type RenderImageRspMsg { //websocket发送渲染完的数据 - ProductId int64 `json:"product_id"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` //渲染后的图片 + RenderId string `json:"render_id"` //渲染id + Image string `json:"image"` //渲染结果图片 } type ThirdPartyLoginRspMsg { //websocket三方登录的通知数据 Token string `json:"token"` @@ -48,11 +43,8 @@ type RenderNotifyReq { Info NotifyInfo `json:"info"` } type NotifyInfo { - ProductId int64 `json:"product_id"` //产品id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` + TaskId string `json:"task_id"` //任务id + Image string `json:"image"` } //第三方登录通知接口 type ThirdPartyLoginNotifyReq { From 8e4e1c1cfd18ff0b89c2e8c7005f7e55a5aed591 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 7 Aug 2023 11:15:51 +0800 Subject: [PATCH 2/3] fix --- constants/websocket.go | 2 + .../internal/logic/datatransferlogic.go | 6 +-- .../internal/logic/ws_render_image_logic.go | 42 ++++++++++--------- server/websocket/websocket.go | 4 +- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/constants/websocket.go b/constants/websocket.go index 34e28eb7..be28eb32 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -8,6 +8,8 @@ const ( WEBSOCKET_UNAUTH = "WEBSOCKET_UNAUTH" //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" + //渲染前数据组装 + WEBSOCKET_RENDER_IMAGE_ASSEMBLE = "WEBSOCKET_RENDER_IMAGE_ASSEMBLE" //图片渲染 WEBSOCKET_RENDER_IMAGE = "WEBSOCKET_RENDER_IMAGE" //数据格式错误 diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 317a93ad..1865300d 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -270,9 +270,9 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { d, _ := json.Marshal(parseInfo.D) //分消息类型给到不同逻辑处理,可扩展 switch parseInfo.T { - //图片渲染 - case constants.WEBSOCKET_RENDER_IMAGE: - w.SendToCloudRender(d) + //图片渲染数据组装 + case constants.WEBSOCKET_RENDER_IMAGE_ASSEMBLE: + w.assembleRenderData(d) default: } diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 3ba3fdd7..ca93a6c7 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -21,8 +21,13 @@ type renderImageControlChanItem struct { RenderId string // map的val } -// 渲染请求数据处理发送云渲染服务处理 -func (w *wsConnectItem) SendToCloudRender(data []byte) { +// 渲染发送到组装数据组装数据 +type assembleRenderData struct { + TaskId string `json:"task_id"` + RenderData interface{} `json:"render_data"` +} + +func (w *wsConnectItem) assembleRenderData(data []byte) { var renderImageData types.RenderImageReqMsg if err := json.Unmarshal(data, &renderImageData); err != nil { w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data)) @@ -31,24 +36,23 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { } logx.Info("收到请求云渲染图片数据:", renderImageData) //把需要渲染的图片任务加进去 - select { - case <-w.closeChan: //连接关闭了 - return - default: - //加入渲染任务 - taskId := hash.JsonHashKey(renderImageData.RenderData) - w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ - Option: 1, //0删除 1添加 - TaskId: taskId, - RenderId: renderImageData.RenderId, - } - //发送给对应的流水线组装数据 - if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { - logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err) - return - } - logx.Info("发送渲染数据到rabbitmq成功:", string(data)) + taskId := hash.JsonHashKey(renderImageData.RenderData) + w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ + Option: 1, //0删除 1添加 + TaskId: taskId, + RenderId: renderImageData.RenderId, } + tmpData := assembleRenderData{ + TaskId: taskId, + RenderData: renderImageData.RenderData, + } + d, _ := json.Marshal(tmpData) + //发送给对应的流水线组装数据 + if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { + logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err) + return + } + logx.Info("发送渲染数据到rabbitmq成功:", string(data)) } // 操作连接中渲染任务的增加/删除 diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index c62fc5ba..01a9efb8 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -30,11 +30,11 @@ func main() { defer server.Stop() ctx := svc.NewServiceContext(c) - //消费组装队列 + //消费渲染结果队列 ctx1 := context.Background() ctx2, cancel := context.WithCancel(ctx1) defer cancel() - go ctx.RabbitMq.Consume(ctx2, constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, &consumer.MqConsumerRenderResult{}) + go ctx.RabbitMq.Consume(ctx2, constants.RABBIT_MQ_RENDER_RESULT_DATA, &consumer.MqConsumerRenderResult{}) handler.RegisterHandlers(server, ctx) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() From 36efad44dfc25c8330a3763dda8744cf2472f57a Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 7 Aug 2023 11:25:06 +0800 Subject: [PATCH 3/3] fix --- .../internal/logic/datatransferlogic.go | 6 +++--- .../internal/logic/rendernotifylogic.go | 3 ++- .../logic/thirdpartyloginnotifylogic.go | 3 ++- .../internal/logic/ws_render_image_logic.go | 4 ++-- server/websocket/internal/types/types.go | 19 ----------------- server_api/websocket.api | 16 -------------- utils/websocket_data/render_data.go | 21 +++++++++++++++++++ 7 files changed, 30 insertions(+), 42 deletions(-) create mode 100644 utils/websocket_data/render_data.go diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 1865300d..ab215c78 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -5,9 +5,9 @@ import ( "encoding/json" "fusenapi/constants" "fusenapi/initalize" - "fusenapi/server/websocket/internal/types" "fusenapi/utils/auth" "fusenapi/utils/id_generator" + "fusenapi/utils/websocket_data" "github.com/gorilla/websocket" "net/http" "sync" @@ -251,7 +251,7 @@ func (w *wsConnectItem) sendToOutChan(data []byte) { // 格式化返回数据 func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []byte { - d := types.DataTransferData{ + d := websocket_data.DataTransferData{ T: msgType, D: data, } @@ -261,7 +261,7 @@ func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []by // 处理接受到的数据 func (w *wsConnectItem) dealwithReciveData(data []byte) { - var parseInfo types.DataTransferData + var parseInfo websocket_data.DataTransferData if err := json.Unmarshal(data, &parseInfo); err != nil { logx.Error("invalid format of websocket message") w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data)) diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go index f499157f..27b2002d 100644 --- a/server/websocket/internal/logic/rendernotifylogic.go +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -3,6 +3,7 @@ package logic import ( "fusenapi/constants" "fusenapi/utils/basic" + "fusenapi/utils/websocket_data" "time" "context" @@ -66,7 +67,7 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq) (resp *basi if !ok { return true } - b := ws.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, types.RenderImageRspMsg{ + b := ws.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, websocket_data.RenderImageRspMsg{ RenderId: renderId, Image: req.Info.Image, }) diff --git a/server/websocket/internal/logic/thirdpartyloginnotifylogic.go b/server/websocket/internal/logic/thirdpartyloginnotifylogic.go index d363ffd9..d3639a95 100644 --- a/server/websocket/internal/logic/thirdpartyloginnotifylogic.go +++ b/server/websocket/internal/logic/thirdpartyloginnotifylogic.go @@ -4,6 +4,7 @@ import ( "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" + "fusenapi/utils/websocket_data" "time" "context" @@ -66,7 +67,7 @@ func (l *ThirdPartyLoginNotifyLogic) ThirdPartyLoginNotify(req *types.ThirdParty if !ok { return resp.SetStatusWithMessage(basic.CodeServiceErr, "type of websocket connect object is err") } - b := ws.respondDataFormat(constants.WEBSOCKET_THIRD_PARTY_LOGIN_NOTIFY, types.ThirdPartyLoginRspMsg{ + b := ws.respondDataFormat(constants.WEBSOCKET_THIRD_PARTY_LOGIN_NOTIFY, websocket_data.ThirdPartyLoginRspMsg{ Token: req.Info.Token, }) select { diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index ca93a6c7..5be69551 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -3,8 +3,8 @@ package logic import ( "encoding/json" "fusenapi/constants" - "fusenapi/server/websocket/internal/types" "fusenapi/utils/hash" + "fusenapi/utils/websocket_data" "github.com/zeromicro/go-zero/core/logx" ) @@ -28,7 +28,7 @@ type assembleRenderData struct { } func (w *wsConnectItem) assembleRenderData(data []byte) { - var renderImageData types.RenderImageReqMsg + var renderImageData websocket_data.RenderImageReqMsg if err := json.Unmarshal(data, &renderImageData); err != nil { w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data)) logx.Error("invalid format of websocket render image message", err) diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 141d7e5e..67d38865 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -5,25 +5,6 @@ import ( "fusenapi/utils/basic" ) -type DataTransferData struct { - T string `json:"t"` //消息类型 - D interface{} `json:"d"` //传递的消息 -} - -type RenderImageReqMsg struct { - RenderId string `json:"render_id"` //渲染id - RenderData interface{} `json:"render_data"` //参数数据 -} - -type RenderImageRspMsg struct { - RenderId string `json:"render_id"` //渲染id - Image string `json:"image"` //渲染结果图片 -} - -type ThirdPartyLoginRspMsg struct { - Token string `json:"token"` -} - type RenderNotifyReq struct { Sign string `json:"sign"` Time int64 `json:"time"` diff --git a/server_api/websocket.api b/server_api/websocket.api index 254e29a5..22104825 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -20,22 +20,6 @@ service websocket { post /api/websocket/third_party_login_notify(ThirdPartyLoginNotifyReq) returns (response); } -//websocket数据交互 -type DataTransferData { - T string `json:"t"` //消息类型 - D interface{} `json:"d"` //传递的消息 -} -type RenderImageReqMsg { //websocket接受要云渲染处理的数据 - RenderId string `json:"render_id"` //渲染id - RenderData interface{} `json:"render_data"` //参数数据 -} -type RenderImageRspMsg { //websocket发送渲染完的数据 - RenderId string `json:"render_id"` //渲染id - Image string `json:"image"` //渲染结果图片 -} -type ThirdPartyLoginRspMsg { //websocket三方登录的通知数据 - Token string `json:"token"` -} //渲染完了通知接口 type RenderNotifyReq { Sign string `json:"sign"` diff --git a/utils/websocket_data/render_data.go b/utils/websocket_data/render_data.go new file mode 100644 index 00000000..2e6c6a9e --- /dev/null +++ b/utils/websocket_data/render_data.go @@ -0,0 +1,21 @@ +package websocket_data + +// websocket数据交互 +type DataTransferData struct { + T string `json:"t"` //消息类型 + D interface{} `json:"d"` //传递的消息 +} +type RenderImageReqMsg struct { + //websocket接受要云渲染处理的数据 + RenderId string `json:"render_id"` //渲染id + RenderData interface{} `json:"render_data"` //参数数据 +} +type RenderImageRspMsg struct { + //websocket发送渲染完的数据 + RenderId string `json:"render_id"` //渲染id + Image string `json:"image"` //渲染结果图片 +} +type ThirdPartyLoginRspMsg struct { + //websocket三方登录的通知数据 + Token string `json:"token"` +}