Merge branch 'develop' of gitee.com:fusenpack/fusenapi into develop
This commit is contained in:
commit
869873deda
|
@ -8,6 +8,8 @@ const (
|
|||
WEBSOCKET_UNAUTH = "WEBSOCKET_UNAUTH"
|
||||
//ws连接成功
|
||||
WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS"
|
||||
//渲染前数据组装
|
||||
WEBSOCKET_RENDER_IMAGE_ASSEMBLE = "WEBSOCKET_RENDER_IMAGE_ASSEMBLE"
|
||||
//图片渲染
|
||||
WEBSOCKET_RENDER_IMAGE = "WEBSOCKET_RENDER_IMAGE"
|
||||
//数据格式错误
|
||||
|
|
|
@ -3,12 +3,11 @@ package logic
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"fusenapi/constants"
|
||||
"fusenapi/initalize"
|
||||
"fusenapi/server/websocket/internal/types"
|
||||
"fusenapi/utils/auth"
|
||||
"fusenapi/utils/id_generator"
|
||||
"fusenapi/utils/websocket_data"
|
||||
"github.com/gorilla/websocket"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
@ -108,7 +107,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
|
|||
inChan: make(chan []byte, 1000),
|
||||
outChan: make(chan []byte, 1000),
|
||||
renderProperty: renderProperty{
|
||||
renderImageTask: make(map[string]struct{}),
|
||||
renderImageTask: make(map[string]string),
|
||||
renderImageTaskCtlChan: make(chan renderImageControlChanItem, 100),
|
||||
},
|
||||
}
|
||||
|
@ -250,14 +249,9 @@ func (w *wsConnectItem) sendToOutChan(data []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
// 获取需要渲染图片的map key
|
||||
func (w *wsConnectItem) getRenderImageMapKey(productId, templateTagId, logoId int64, algorithmVersion string) string {
|
||||
return fmt.Sprintf("%d-%d-%d-%s", productId, templateTagId, logoId, algorithmVersion)
|
||||
}
|
||||
|
||||
// 格式化返回数据
|
||||
func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []byte {
|
||||
d := types.DataTransferData{
|
||||
d := websocket_data.DataTransferData{
|
||||
T: msgType,
|
||||
D: data,
|
||||
}
|
||||
|
@ -267,7 +261,7 @@ func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []by
|
|||
|
||||
// 处理接受到的数据
|
||||
func (w *wsConnectItem) dealwithReciveData(data []byte) {
|
||||
var parseInfo types.DataTransferData
|
||||
var parseInfo websocket_data.DataTransferData
|
||||
if err := json.Unmarshal(data, &parseInfo); err != nil {
|
||||
logx.Error("invalid format of websocket message")
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data))
|
||||
|
@ -276,9 +270,9 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) {
|
|||
d, _ := json.Marshal(parseInfo.D)
|
||||
//分消息类型给到不同逻辑处理,可扩展
|
||||
switch parseInfo.T {
|
||||
//图片渲染
|
||||
case constants.WEBSOCKET_RENDER_IMAGE:
|
||||
w.SendToCloudRender(d)
|
||||
//图片渲染数据组装
|
||||
case constants.WEBSOCKET_RENDER_IMAGE_ASSEMBLE:
|
||||
w.assembleRenderData(d)
|
||||
default:
|
||||
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package logic
|
|||
import (
|
||||
"fusenapi/constants"
|
||||
"fusenapi/utils/basic"
|
||||
"fusenapi/utils/websocket_data"
|
||||
"time"
|
||||
|
||||
"context"
|
||||
|
@ -61,21 +62,20 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq) (resp *basi
|
|||
if ws.isClose {
|
||||
return true
|
||||
}
|
||||
renderKey := ws.getRenderImageMapKey(req.Info.ProductId, req.Info.TemplateTagId, req.Info.LogoId, req.Info.AlgorithmVersion)
|
||||
//查询有无该渲染任务
|
||||
_, ok = ws.renderProperty.renderImageTask[renderKey]
|
||||
renderId, ok := ws.renderProperty.renderImageTask[req.Info.TaskId]
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
b := ws.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, types.RenderImageRspMsg{
|
||||
ProductId: req.Info.ProductId,
|
||||
TemplateTagId: req.Info.TemplateTagId,
|
||||
Image: req.Info.Image,
|
||||
b := ws.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, websocket_data.RenderImageRspMsg{
|
||||
RenderId: renderId,
|
||||
Image: req.Info.Image,
|
||||
})
|
||||
//删除对应的需要渲染的图片map
|
||||
ws.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{
|
||||
Option: 0, //0删除 1添加
|
||||
Key: renderKey,
|
||||
Option: 0, //0删除 1添加
|
||||
TaskId: req.Info.TaskId,
|
||||
RenderId: renderId,
|
||||
}
|
||||
//发送数据到out chan
|
||||
ws.sendToOutChan(b)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fusenapi/constants"
|
||||
"fusenapi/utils/auth"
|
||||
"fusenapi/utils/basic"
|
||||
"fusenapi/utils/websocket_data"
|
||||
"time"
|
||||
|
||||
"context"
|
||||
|
@ -66,7 +67,7 @@ func (l *ThirdPartyLoginNotifyLogic) ThirdPartyLoginNotify(req *types.ThirdParty
|
|||
if !ok {
|
||||
return resp.SetStatusWithMessage(basic.CodeServiceErr, "type of websocket connect object is err")
|
||||
}
|
||||
b := ws.respondDataFormat(constants.WEBSOCKET_THIRD_PARTY_LOGIN_NOTIFY, types.ThirdPartyLoginRspMsg{
|
||||
b := ws.respondDataFormat(constants.WEBSOCKET_THIRD_PARTY_LOGIN_NOTIFY, websocket_data.ThirdPartyLoginRspMsg{
|
||||
Token: req.Info.Token,
|
||||
})
|
||||
select {
|
||||
|
|
|
@ -3,20 +3,56 @@ package logic
|
|||
import (
|
||||
"encoding/json"
|
||||
"fusenapi/constants"
|
||||
"fusenapi/server/websocket/internal/types"
|
||||
"fusenapi/utils/hash"
|
||||
"fusenapi/utils/websocket_data"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
// 云渲染属性
|
||||
type renderProperty struct {
|
||||
renderImageTask map[string]struct{} //需要渲染的图片任务
|
||||
renderImageTask map[string]string //需要渲染的图片任务 key是taskId val 是renderId
|
||||
renderImageTaskCtlChan chan renderImageControlChanItem //渲染任务新增移除的控制通道
|
||||
}
|
||||
|
||||
// 渲染任务新增移除的控制通道的数据
|
||||
type renderImageControlChanItem struct {
|
||||
Option int // 0删除 1添加
|
||||
Key string //map的key
|
||||
Option int // 0删除 1添加
|
||||
TaskId string //map的key
|
||||
RenderId string // map的val
|
||||
}
|
||||
|
||||
// 渲染发送到组装数据组装数据
|
||||
type assembleRenderData struct {
|
||||
TaskId string `json:"task_id"`
|
||||
RenderData interface{} `json:"render_data"`
|
||||
}
|
||||
|
||||
func (w *wsConnectItem) assembleRenderData(data []byte) {
|
||||
var renderImageData websocket_data.RenderImageReqMsg
|
||||
if err := json.Unmarshal(data, &renderImageData); err != nil {
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data))
|
||||
logx.Error("invalid format of websocket render image message", err)
|
||||
return
|
||||
}
|
||||
logx.Info("收到请求云渲染图片数据:", renderImageData)
|
||||
//把需要渲染的图片任务加进去
|
||||
taskId := hash.JsonHashKey(renderImageData.RenderData)
|
||||
w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{
|
||||
Option: 1, //0删除 1添加
|
||||
TaskId: taskId,
|
||||
RenderId: renderImageData.RenderId,
|
||||
}
|
||||
tmpData := assembleRenderData{
|
||||
TaskId: taskId,
|
||||
RenderData: renderImageData.RenderData,
|
||||
}
|
||||
d, _ := json.Marshal(tmpData)
|
||||
//发送给对应的流水线组装数据
|
||||
if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil {
|
||||
logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err)
|
||||
return
|
||||
}
|
||||
logx.Info("发送渲染数据到rabbitmq成功:", string(data))
|
||||
}
|
||||
|
||||
// 操作连接中渲染任务的增加/删除
|
||||
|
@ -28,43 +64,10 @@ func (w *wsConnectItem) operationRenderTask() {
|
|||
case data := <-w.renderProperty.renderImageTaskCtlChan:
|
||||
switch data.Option {
|
||||
case 0: //删除任务
|
||||
delete(w.renderProperty.renderImageTask, data.Key)
|
||||
delete(w.renderProperty.renderImageTask, data.TaskId)
|
||||
case 1: //新增任务
|
||||
w.renderProperty.renderImageTask[data.Key] = struct{}{}
|
||||
default:
|
||||
|
||||
w.renderProperty.renderImageTask[data.TaskId] = data.RenderId
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 渲染请求数据处理发送云渲染服务处理
|
||||
func (w *wsConnectItem) SendToCloudRender(data []byte) {
|
||||
var renderImageData types.RenderImageReqMsg
|
||||
if err := json.Unmarshal(data, &renderImageData); err != nil {
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data))
|
||||
logx.Error("invalid format of websocket render image message", err)
|
||||
return
|
||||
}
|
||||
logx.Info("收到请求云渲染图片数据:", renderImageData)
|
||||
//把需要渲染的图片任务加进去
|
||||
for _, productId := range renderImageData.ProductIds {
|
||||
select {
|
||||
case <-w.closeChan: //连接关闭了
|
||||
return
|
||||
default:
|
||||
//加入渲染任务
|
||||
key := w.getRenderImageMapKey(productId, renderImageData.TemplateTagId, renderImageData.LogoId, renderImageData.AlgorithmVersion)
|
||||
w.renderProperty.renderImageTaskCtlChan <- renderImageControlChanItem{
|
||||
Option: 1, //0删除 1添加
|
||||
Key: key,
|
||||
}
|
||||
//发送给对应的流水线组装数据
|
||||
if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil {
|
||||
logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err)
|
||||
continue
|
||||
}
|
||||
logx.Info("发送渲染数据到rabbitmq成功:", string(data))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,30 +5,6 @@ import (
|
|||
"fusenapi/utils/basic"
|
||||
)
|
||||
|
||||
type DataTransferData struct {
|
||||
T string `json:"t"` //消息类型
|
||||
D interface{} `json:"d"` //传递的消息
|
||||
}
|
||||
|
||||
type RenderImageReqMsg struct {
|
||||
ProductIds []int64 `json:"product_ids"` //产品 id
|
||||
TemplateTagId int64 `json:"template_tag_id"` //模板标签id
|
||||
LogoId int64 `json:"logo_id"` //logoid
|
||||
AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本
|
||||
}
|
||||
|
||||
type RenderImageRspMsg struct {
|
||||
ProductId int64 `json:"product_id"` //产品 id
|
||||
TemplateTagId int64 `json:"template_tag_id"` //模板标签id
|
||||
AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本
|
||||
LogoId int64 `json:"logo_id"` //logoid
|
||||
Image string `json:"image"` //渲染后的图片
|
||||
}
|
||||
|
||||
type ThirdPartyLoginRspMsg struct {
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
type RenderNotifyReq struct {
|
||||
Sign string `json:"sign"`
|
||||
Time int64 `json:"time"`
|
||||
|
@ -36,11 +12,8 @@ type RenderNotifyReq struct {
|
|||
}
|
||||
|
||||
type NotifyInfo struct {
|
||||
ProductId int64 `json:"product_id"` //产品id
|
||||
TemplateTagId int64 `json:"template_tag_id"` //模板标签id
|
||||
AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本
|
||||
LogoId int64 `json:"logo_id"` //logoid
|
||||
Image string `json:"image"`
|
||||
TaskId string `json:"task_id"` //任务id
|
||||
Image string `json:"image"`
|
||||
}
|
||||
|
||||
type ThirdPartyLoginNotifyReq struct {
|
||||
|
|
|
@ -30,11 +30,11 @@ func main() {
|
|||
defer server.Stop()
|
||||
|
||||
ctx := svc.NewServiceContext(c)
|
||||
//消费组装队列
|
||||
//消费渲染结果队列
|
||||
ctx1 := context.Background()
|
||||
ctx2, cancel := context.WithCancel(ctx1)
|
||||
defer cancel()
|
||||
go ctx.RabbitMq.Consume(ctx2, constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, &consumer.MqConsumerRenderResult{})
|
||||
go ctx.RabbitMq.Consume(ctx2, constants.RABBIT_MQ_RENDER_RESULT_DATA, &consumer.MqConsumerRenderResult{})
|
||||
handler.RegisterHandlers(server, ctx)
|
||||
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
|
||||
server.Start()
|
||||
|
|
|
@ -20,27 +20,6 @@ service websocket {
|
|||
post /api/websocket/third_party_login_notify(ThirdPartyLoginNotifyReq) returns (response);
|
||||
}
|
||||
|
||||
//websocket数据交互
|
||||
type DataTransferData {
|
||||
T string `json:"t"` //消息类型
|
||||
D interface{} `json:"d"` //传递的消息
|
||||
}
|
||||
type RenderImageReqMsg { //websocket接受要云渲染处理的数据
|
||||
ProductIds []int64 `json:"product_ids"` //产品 id
|
||||
TemplateTagId int64 `json:"template_tag_id"` //模板标签id
|
||||
LogoId int64 `json:"logo_id"` //logoid
|
||||
AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本
|
||||
}
|
||||
type RenderImageRspMsg { //websocket发送渲染完的数据
|
||||
ProductId int64 `json:"product_id"` //产品 id
|
||||
TemplateTagId int64 `json:"template_tag_id"` //模板标签id
|
||||
AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本
|
||||
LogoId int64 `json:"logo_id"` //logoid
|
||||
Image string `json:"image"` //渲染后的图片
|
||||
}
|
||||
type ThirdPartyLoginRspMsg { //websocket三方登录的通知数据
|
||||
Token string `json:"token"`
|
||||
}
|
||||
//渲染完了通知接口
|
||||
type RenderNotifyReq {
|
||||
Sign string `json:"sign"`
|
||||
|
@ -48,11 +27,8 @@ type RenderNotifyReq {
|
|||
Info NotifyInfo `json:"info"`
|
||||
}
|
||||
type NotifyInfo {
|
||||
ProductId int64 `json:"product_id"` //产品id
|
||||
TemplateTagId int64 `json:"template_tag_id"` //模板标签id
|
||||
AlgorithmVersion string `json:"algorithm_version,optional"` //算法版本
|
||||
LogoId int64 `json:"logo_id"` //logoid
|
||||
Image string `json:"image"`
|
||||
TaskId string `json:"task_id"` //任务id
|
||||
Image string `json:"image"`
|
||||
}
|
||||
//第三方登录通知接口
|
||||
type ThirdPartyLoginNotifyReq {
|
||||
|
|
21
utils/websocket_data/render_data.go
Normal file
21
utils/websocket_data/render_data.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package websocket_data
|
||||
|
||||
// websocket数据交互
|
||||
type DataTransferData struct {
|
||||
T string `json:"t"` //消息类型
|
||||
D interface{} `json:"d"` //传递的消息
|
||||
}
|
||||
type RenderImageReqMsg struct {
|
||||
//websocket接受要云渲染处理的数据
|
||||
RenderId string `json:"render_id"` //渲染id
|
||||
RenderData interface{} `json:"render_data"` //参数数据
|
||||
}
|
||||
type RenderImageRspMsg struct {
|
||||
//websocket发送渲染完的数据
|
||||
RenderId string `json:"render_id"` //渲染id
|
||||
Image string `json:"image"` //渲染结果图片
|
||||
}
|
||||
type ThirdPartyLoginRspMsg struct {
|
||||
//websocket三方登录的通知数据
|
||||
Token string `json:"token"`
|
||||
}
|
Loading…
Reference in New Issue
Block a user