diff --git a/constants/websocket.go b/constants/websocket.go index 34e28eb7..be28eb32 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -8,6 +8,8 @@ const ( WEBSOCKET_UNAUTH = "WEBSOCKET_UNAUTH" //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" + //渲染前数据组装 + WEBSOCKET_RENDER_IMAGE_ASSEMBLE = "WEBSOCKET_RENDER_IMAGE_ASSEMBLE" //图片渲染 WEBSOCKET_RENDER_IMAGE = "WEBSOCKET_RENDER_IMAGE" //数据格式错误 diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index cbd2a0c9..ab215c78 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -3,12 +3,11 @@ package logic import ( "bytes" "encoding/json" - "fmt" "fusenapi/constants" "fusenapi/initalize" - "fusenapi/server/websocket/internal/types" "fusenapi/utils/auth" "fusenapi/utils/id_generator" + "fusenapi/utils/websocket_data" "github.com/gorilla/websocket" "net/http" "sync" @@ -108,7 +107,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp inChan: make(chan []byte, 1000), outChan: make(chan []byte, 1000), renderProperty: renderProperty{ - renderImageTask: make(map[string]struct{}), + renderImageTask: make(map[string]string), renderImageTaskCtlChan: make(chan renderImageControlChanItem, 100), }, } @@ -250,14 +249,9 @@ func (w *wsConnectItem) sendToOutChan(data []byte) { } } -// 获取需要渲染图片的map key -func (w *wsConnectItem) getRenderImageMapKey(productId, templateTagId, logoId int64, algorithmVersion string) string { - return fmt.Sprintf("%d-%d-%d-%s", productId, templateTagId, logoId, algorithmVersion) -} - // 格式化返回数据 func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []byte { - d := types.DataTransferData{ + d := websocket_data.DataTransferData{ T: msgType, D: data, } @@ -267,7 +261,7 @@ func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []by // 处理接受到的数据 func (w *wsConnectItem) dealwithReciveData(data []byte) { - var parseInfo types.DataTransferData + var parseInfo websocket_data.DataTransferData if err := json.Unmarshal(data, &parseInfo); err != nil { logx.Error("invalid format of websocket message") w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data)) @@ -276,9 +270,9 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { d, _ := json.Marshal(parseInfo.D) //分消息类型给到不同逻辑处理,可扩展 switch parseInfo.T { - //图片渲染 - case constants.WEBSOCKET_RENDER_IMAGE: - w.SendToCloudRender(d) + //图片渲染数据组装 + case constants.WEBSOCKET_RENDER_IMAGE_ASSEMBLE: + w.assembleRenderData(d) default: } diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go index 9e5e0c9e..27b2002d 100644 --- a/server/websocket/internal/logic/rendernotifylogic.go +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -3,6 +3,7 @@ package logic import ( "fusenapi/constants" "fusenapi/utils/basic" + "fusenapi/utils/websocket_data" "time" "context" @@ -61,21 +62,20 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq) (resp *basi if ws.isClose { return true } - renderKey := ws.getRenderImageMapKey(req.Info.ProductId, req.Info.TemplateTagId, req.Info.LogoId, req.Info.AlgorithmVersion) //查询有无该渲染任务 - _, ok = ws.renderProperty.renderImageTask[renderKey] + renderId, ok := ws.renderProperty.renderImageTask[req.Info.TaskId] if !ok { return true } - b := ws.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, types.RenderImageRspMsg{ - ProductId: req.Info.ProductId, - TemplateTagId: req.Info.TemplateTagId, - Image: req.Info.Image, + b := ws.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, websocket_data.RenderImageRspMsg{ + RenderId: renderId, + Image: req.Info.Image, }) //删除对应的需要渲染的图片map ws.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ - Option: 0, //0删除 1添加 - Key: renderKey, + Option: 0, //0删除 1添加 + TaskId: req.Info.TaskId, + RenderId: renderId, } //发送数据到out chan ws.sendToOutChan(b) diff --git a/server/websocket/internal/logic/thirdpartyloginnotifylogic.go b/server/websocket/internal/logic/thirdpartyloginnotifylogic.go index d363ffd9..d3639a95 100644 --- a/server/websocket/internal/logic/thirdpartyloginnotifylogic.go +++ b/server/websocket/internal/logic/thirdpartyloginnotifylogic.go @@ -4,6 +4,7 @@ import ( "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" + "fusenapi/utils/websocket_data" "time" "context" @@ -66,7 +67,7 @@ func (l *ThirdPartyLoginNotifyLogic) ThirdPartyLoginNotify(req *types.ThirdParty if !ok { return resp.SetStatusWithMessage(basic.CodeServiceErr, "type of websocket connect object is err") } - b := ws.respondDataFormat(constants.WEBSOCKET_THIRD_PARTY_LOGIN_NOTIFY, types.ThirdPartyLoginRspMsg{ + b := ws.respondDataFormat(constants.WEBSOCKET_THIRD_PARTY_LOGIN_NOTIFY, websocket_data.ThirdPartyLoginRspMsg{ Token: req.Info.Token, }) select { diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 7e969d40..5be69551 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -3,20 +3,56 @@ package logic import ( "encoding/json" "fusenapi/constants" - "fusenapi/server/websocket/internal/types" + "fusenapi/utils/hash" + "fusenapi/utils/websocket_data" "github.com/zeromicro/go-zero/core/logx" ) // 云渲染属性 type renderProperty struct { - renderImageTask map[string]struct{} //需要渲染的图片任务 + renderImageTask map[string]string //需要渲染的图片任务 key是taskId val 是renderId renderImageTaskCtlChan chan renderImageControlChanItem //渲染任务新增移除的控制通道 } // 渲染任务新增移除的控制通道的数据 type renderImageControlChanItem struct { - Option int // 0删除 1添加 - Key string //map的key + Option int // 0删除 1添加 + TaskId string //map的key + RenderId string // map的val +} + +// 渲染发送到组装数据组装数据 +type assembleRenderData struct { + TaskId string `json:"task_id"` + RenderData interface{} `json:"render_data"` +} + +func (w *wsConnectItem) assembleRenderData(data []byte) { + var renderImageData websocket_data.RenderImageReqMsg + if err := json.Unmarshal(data, &renderImageData); err != nil { + w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data)) + logx.Error("invalid format of websocket render image message", err) + return + } + logx.Info("收到请求云渲染图片数据:", renderImageData) + //把需要渲染的图片任务加进去 + taskId := hash.JsonHashKey(renderImageData.RenderData) + w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ + Option: 1, //0删除 1添加 + TaskId: taskId, + RenderId: renderImageData.RenderId, + } + tmpData := assembleRenderData{ + TaskId: taskId, + RenderData: renderImageData.RenderData, + } + d, _ := json.Marshal(tmpData) + //发送给对应的流水线组装数据 + if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { + logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err) + return + } + logx.Info("发送渲染数据到rabbitmq成功:", string(data)) } // 操作连接中渲染任务的增加/删除 @@ -28,43 +64,10 @@ func (w *wsConnectItem) operationRenderTask() { case data := <-w.renderProperty.renderImageTaskCtlChan: switch data.Option { case 0: //删除任务 - delete(w.renderProperty.renderImageTask, data.Key) + delete(w.renderProperty.renderImageTask, data.TaskId) case 1: //新增任务 - w.renderProperty.renderImageTask[data.Key] = struct{}{} - default: - + w.renderProperty.renderImageTask[data.TaskId] = data.RenderId } } } } - -// 渲染请求数据处理发送云渲染服务处理 -func (w *wsConnectItem) SendToCloudRender(data []byte) { - var renderImageData types.RenderImageReqMsg - if err := json.Unmarshal(data, &renderImageData); err != nil { - w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data)) - logx.Error("invalid format of websocket render image message", err) - return - } - logx.Info("收到请求云渲染图片数据:", renderImageData) - //把需要渲染的图片任务加进去 - for _, productId := range renderImageData.ProductIds { - select { - case <-w.closeChan: //连接关闭了 - return - default: - //加入渲染任务 - key := w.getRenderImageMapKey(productId, renderImageData.TemplateTagId, renderImageData.LogoId, renderImageData.AlgorithmVersion) - w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{ - Option: 1, //0删除 1添加 - Key: key, - } - //发送给对应的流水线组装数据 - if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { - logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err) - continue - } - logx.Info("发送渲染数据到rabbitmq成功:", string(data)) - } - } -} diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 901b8123..67d38865 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -5,30 +5,6 @@ import ( "fusenapi/utils/basic" ) -type DataTransferData struct { - T string `json:"t"` //消息类型 - D interface{} `json:"d"` //传递的消息 -} - -type RenderImageReqMsg struct { - ProductIds []int64 `json:"product_ids"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - LogoId int64 `json:"logo_id"` //logoid - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 -} - -type RenderImageRspMsg struct { - ProductId int64 `json:"product_id"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` //渲染后的图片 -} - -type ThirdPartyLoginRspMsg struct { - Token string `json:"token"` -} - type RenderNotifyReq struct { Sign string `json:"sign"` Time int64 `json:"time"` @@ -36,11 +12,8 @@ type RenderNotifyReq struct { } type NotifyInfo struct { - ProductId int64 `json:"product_id"` //产品id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` + TaskId string `json:"task_id"` //任务id + Image string `json:"image"` } type ThirdPartyLoginNotifyReq struct { diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index c62fc5ba..01a9efb8 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -30,11 +30,11 @@ func main() { defer server.Stop() ctx := svc.NewServiceContext(c) - //消费组装队列 + //消费渲染结果队列 ctx1 := context.Background() ctx2, cancel := context.WithCancel(ctx1) defer cancel() - go ctx.RabbitMq.Consume(ctx2, constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, &consumer.MqConsumerRenderResult{}) + go ctx.RabbitMq.Consume(ctx2, constants.RABBIT_MQ_RENDER_RESULT_DATA, &consumer.MqConsumerRenderResult{}) handler.RegisterHandlers(server, ctx) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() diff --git a/server_api/websocket.api b/server_api/websocket.api index e24e2874..22104825 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -20,27 +20,6 @@ service websocket { post /api/websocket/third_party_login_notify(ThirdPartyLoginNotifyReq) returns (response); } -//websocket数据交互 -type DataTransferData { - T string `json:"t"` //消息类型 - D interface{} `json:"d"` //传递的消息 -} -type RenderImageReqMsg { //websocket接受要云渲染处理的数据 - ProductIds []int64 `json:"product_ids"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - LogoId int64 `json:"logo_id"` //logoid - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 -} -type RenderImageRspMsg { //websocket发送渲染完的数据 - ProductId int64 `json:"product_id"` //产品 id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` //渲染后的图片 -} -type ThirdPartyLoginRspMsg { //websocket三方登录的通知数据 - Token string `json:"token"` -} //渲染完了通知接口 type RenderNotifyReq { Sign string `json:"sign"` @@ -48,11 +27,8 @@ type RenderNotifyReq { Info NotifyInfo `json:"info"` } type NotifyInfo { - ProductId int64 `json:"product_id"` //产品id - TemplateTagId int64 `json:"template_tag_id"` //模板标签id - AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本 - LogoId int64 `json:"logo_id"` //logoid - Image string `json:"image"` + TaskId string `json:"task_id"` //任务id + Image string `json:"image"` } //第三方登录通知接口 type ThirdPartyLoginNotifyReq { diff --git a/utils/websocket_data/render_data.go b/utils/websocket_data/render_data.go new file mode 100644 index 00000000..2e6c6a9e --- /dev/null +++ b/utils/websocket_data/render_data.go @@ -0,0 +1,21 @@ +package websocket_data + +// websocket数据交互 +type DataTransferData struct { + T string `json:"t"` //消息类型 + D interface{} `json:"d"` //传递的消息 +} +type RenderImageReqMsg struct { + //websocket接受要云渲染处理的数据 + RenderId string `json:"render_id"` //渲染id + RenderData interface{} `json:"render_data"` //参数数据 +} +type RenderImageRspMsg struct { + //websocket发送渲染完的数据 + RenderId string `json:"render_id"` //渲染id + Image string `json:"image"` //渲染结果图片 +} +type ThirdPartyLoginRspMsg struct { + //websocket三方登录的通知数据 + Token string `json:"token"` +}