From 9c72a8a4c9f3e99f0b7cd054eead36f594b4afbc Mon Sep 17 00:00:00 2001
From: laodaming <11058467+laudamine@user.noreply.gitee.com>
Date: Tue, 25 Jul 2023 17:10:50 +0800
Subject: [PATCH] fix

---
 .../internal/handler/datatransferhandler.go   | 129 +++++++++++++-----
 .../internal/handler/rendernotifyhandler.go   |  62 +++++++++
 server/websocket/internal/handler/routes.go   |   5 +
 .../internal/logic/rendernotifylogic.go       |  43 ++++++
 server/websocket/internal/types/types.go      |  32 ++++-
 server_api/websocket.api                      |  32 ++++-
 6 files changed, 262 insertions(+), 41 deletions(-)
 create mode 100644 server/websocket/internal/handler/rendernotifyhandler.go
 create mode 100644 server/websocket/internal/logic/rendernotifylogic.go

diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go
index 96a65455..e07d3abe 100644
--- a/server/websocket/internal/handler/datatransferhandler.go
+++ b/server/websocket/internal/handler/datatransferhandler.go
@@ -4,9 +4,9 @@ import (
 	"encoding/json"
 	"fmt"
 	"fusenapi/constants"
+	"fusenapi/server/websocket/internal/logic"
 	"fusenapi/server/websocket/internal/svc"
 	"fusenapi/server/websocket/internal/types"
-	"fusenapi/utils/auth"
 	"github.com/google/uuid"
 	"github.com/gorilla/websocket"
 	"github.com/zeromicro/go-zero/core/logx"
@@ -24,21 +24,27 @@ var (
 		},
 	}
 	//连接map池
-	mapConn = sync.Map{}
+	mapConnPool = sync.Map{}
 )
 
 // 每个连接的连接属性
 type wsConnectItem struct {
-	conn      *websocket.Conn //websocket的连接
-	closeChan chan struct{}   //关闭chan
-	isClose   bool            //是否已经关闭
-	flag      string
-	inChan    chan interface{} //接受消息缓冲通道
-	property  interface{}      //属性
+	conn        *websocket.Conn //websocket的连接
+	closeChan   chan struct{}   //关闭chan
+	isClose     bool            //是否已经关闭
+	flag        string
+	inChan      chan []byte //接受消息缓冲通道
+	outChan     chan []byte //发送回客户端的消息
+	mutex       sync.Mutex
+	renderImage map[string]struct{} //需要渲染的图片
 }
 
 func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
+		// 创建一个业务逻辑层实例
+		var req types.DataTransferReq
+		l := logic.NewDataTransferLogic(r.Context(), svcCtx)
+		l.DataTransfer(&req, nil)
 		//升级websocket
 		conn, err := upgrade.Upgrade(w, r, nil)
 		if err != nil {
@@ -48,7 +54,7 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
 		defer conn.Close()
 		rsp := types.DataTransferRsp{}
 		// 解析JWT token,并对空用户进行判断
-		claims, err := svcCtx.ParseJwtToken(r)
+		/*claims, err := svcCtx.ParseJwtToken(r)
 		// 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息
 		if err != nil {
 			rsp.MsgType = constants.WEBSOCKET_UNAUTH
@@ -75,18 +81,20 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
 			b, _ := json.Marshal(rsp)
 			_ = conn.WriteMessage(websocket.TextMessage, b)
 			return
-		}
+		}*/
 		//生成连接唯一标识
 		flag := uuid.New().String() + time.Now().Format("20060102150405")
 		ws := wsConnectItem{
-			conn:      conn,
-			flag:      flag,
-			closeChan: make(chan struct{}, 1),
-			inChan:    make(chan interface{}, 1000),
-			isClose:   false,
+			conn:        conn,
+			flag:        flag,
+			closeChan:   make(chan struct{}, 1),
+			inChan:      make(chan []byte, 100),
+			outChan:     make(chan []byte, 100),
+			renderImage: make(map[string]struct{}),
+			isClose:     false,
 		}
 		//保存连接
-		mapConn.Store(flag, ws)
+		mapConnPool.Store(flag, ws)
 		defer ws.close()
 		//把连接成功消息发回去
 		rsp.MsgType = constants.WEBSOCKET_CONNECT_SUCCESS
@@ -95,43 +103,66 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
 		conn.WriteMessage(websocket.TextMessage, b)
 		//循环读客户端信息
 		go ws.readLoop()
+		//循环把数据发送给客户端
+		go ws.writeLoop()
 		//推消息到云渲染
 		go ws.sendLoop()
 		//心跳
-		for {
-			time.Sleep(time.Second * 3)
-			select {
-			case <-ws.closeChan:
+		ws.heartbeat()
+
+	}
+}
+
+// 心跳
+func (w *wsConnectItem) heartbeat() {
+	rsp := types.DataTransferRsp{
+		MsgType: constants.WEBSOCKET_HEARTBEAT,
+		Message: "",
+	}
+	for {
+		time.Sleep(time.Second * 10)
+		select {
+		case <-w.closeChan:
+			return
+		default:
+			//发送心跳信息
+			b, _ := json.Marshal(rsp)
+			if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil {
+				logx.Error("发送心跳信息异常,关闭连接:", w.flag, err)
+				w.close()
 				return
-			default:
-				//发送心跳信息
-				rsp.MsgType = constants.WEBSOCKET_HEARTBEAT
-				rsp.Message = "heartbeat"
-				b, _ = json.Marshal(rsp)
-				if err = conn.WriteMessage(websocket.TextMessage, b); err != nil {
-					logx.Error("发送心跳信息异常,关闭连接:", flag, err)
-					ws.close()
-					return
-				}
 			}
 		}
-
 	}
 }
 
 // 关闭连接
 func (w *wsConnectItem) close() {
+	w.mutex.Lock()
+	defer w.mutex.Unlock()
 	logx.Info("websocket:", w.flag, " is closing...")
 	w.conn.Close()
-	mapConn.Delete(w.flag)
+	mapConnPool.Delete(w.flag)
 	if !w.isClose {
 		w.isClose = true
 		close(w.closeChan)
+		close(w.outChan)
 		close(w.inChan)
 	}
 	logx.Info("websocket:", w.flag, " is closed")
 }
 
+func (w *wsConnectItem) writeLoop() {
+	for {
+		select {
+		case <-w.closeChan: //如果关闭了
+			return
+		case data := <-w.outChan:
+			w.conn.WriteMessage(websocket.TextMessage, data)
+		}
+	}
+}
+
 // 接受客户端发来的消息
 func (w *wsConnectItem) readLoop() {
 	for {
@@ -159,7 +190,39 @@ func (w *wsConnectItem) sendLoop() {
 		case <-w.closeChan:
 			return
 		case data := <-w.inChan:
-			fmt.Println(data)
+			w.dealwithReciveData(data)
 		}
 	}
 }
+
+// 处理接受到的数据
+func (w *wsConnectItem) dealwithReciveData(data []byte) {
+	var parseInfo types.DataTransferReq
+	if err := json.Unmarshal(data, &parseInfo); err != nil {
+		logx.Error("invalid format of websocket message")
+		return
+	}
+	switch parseInfo.MsgType {
+	case constants.WEBSOCKET_RENDER_IMAGE: //图片渲染
+		var renderImageData []types.RenderImageReqMsg
+		if err := json.Unmarshal([]byte(parseInfo.Message), &renderImageData); err != nil {
+			logx.Error("invalid format of websocket render image message", err)
+			return
+		}
+		logx.Info("收到请求云渲染图片数据:", renderImageData)
+		w.mutex.Lock()
+		defer w.mutex.Unlock()
+		//把需要渲染的图片加进去
+		for _, v := range renderImageData {
+			key := getRenderImageMapKey(v.ProductId, v.SizeId, v.TemplateId)
+			w.renderImage[key] = struct{}{}
+		}
+	default:
+
+	}
+}
+
+// 获取需要渲染图片的map key
+func getRenderImageMapKey(productId, sizeId, templateId int64) string {
+	return fmt.Sprintf("%d-%d-%d", productId, sizeId, templateId)
+}
diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go
new file mode 100644
index 00000000..9c2eb28d
--- /dev/null
+++ b/server/websocket/internal/handler/rendernotifyhandler.go
@@ -0,0 +1,62 @@
+package handler
+
+import (
+	"encoding/json"
+	"fusenapi/utils/basic"
+	"github.com/zeromicro/go-zero/core/logx"
+	"github.com/zeromicro/go-zero/rest/httpx"
+	"net/http"
+
+	"fusenapi/server/websocket/internal/svc"
+	"fusenapi/server/websocket/internal/types"
+)
+
+func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		var req types.RenderNotifyReq
+		_, err := basic.RequestParse(w, r, svcCtx, &req)
+		if err != nil {
+			return
+		}
+		mapConnPool.Range(func(key, value any) bool {
+			ws, ok := value.(wsConnectItem)
+			if !ok {
+				return false
+			}
+			setOutRenderImage(req, ws)
+			return true
+		})
+		httpx.OkJsonCtx(r.Context(), w, basic.Response{
+			Code:    200,
+			Message: "success",
+			Data:    nil,
+		})
+	}
+}
+
+// 把渲染好的数据放入outchan
+func setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) {
+	ws.mutex.Lock()
+	defer ws.mutex.Unlock()
+	for _, notifyItem := range req.NotifyList {
+		renderKey := getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId)
+		//加载并删除
+		_, ok := ws.renderImage[renderKey]
+		if !ok {
+			continue
+		}
+		responseData := types.RenderImageRspMsg{
+			ProductId:  notifyItem.ProductId,
+			SizeId:     notifyItem.SizeId,
+			TemplateId: notifyItem.TemplateId,
+			Source:     "我是渲染资源",
+		}
+		b, _ := json.Marshal(responseData)
+		select {
+		case <-ws.closeChan:
+			return
+		case ws.outChan <- b:
+			logx.Info("notify send render result to out chan")
+		}
+	}
+}
diff --git a/server/websocket/internal/handler/routes.go b/server/websocket/internal/handler/routes.go
index fd3200e4..859ecc5b 100644
--- a/server/websocket/internal/handler/routes.go
+++ b/server/websocket/internal/handler/routes.go
@@ -17,6 +17,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
 				Path:    "/api/websocket/data_transfer",
 				Handler: DataTransferHandler(serverCtx),
 			},
+			{
+				Method:  http.MethodPost,
+				Path:    "/api/websocket/render_notify",
+				Handler: RenderNotifyHandler(serverCtx),
+			},
 		},
 	)
 }
diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go
new file mode 100644
index 00000000..49803d29
--- /dev/null
+++ b/server/websocket/internal/logic/rendernotifylogic.go
@@ -0,0 +1,43 @@
+package logic
+
+import (
+	"fusenapi/utils/auth"
+	"fusenapi/utils/basic"
+
+	"context"
+
+	"fusenapi/server/websocket/internal/svc"
+	"fusenapi/server/websocket/internal/types"
+
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type RenderNotifyLogic struct {
+	logx.Logger
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+}
+
+func NewRenderNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RenderNotifyLogic {
+	return &RenderNotifyLogic{
+		Logger: logx.WithContext(ctx),
+		ctx:    ctx,
+		svcCtx: svcCtx,
+	}
+}
+
+// 处理进入前逻辑w,r
+// func (l *RenderNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) {
+// }
+
+// 处理逻辑后 w,r 如:重定向, resp 必须重新处理
+// func (l *RenderNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) {
+// // httpx.OkJsonCtx(r.Context(), w, resp)
+// }
+
+func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) {
+	// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data)
+	// userinfo 传入值时, 一定不为null
+
+	return resp.SetStatus(basic.CodeOK)
+}
diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go
index 06201d39..35df171d 100644
--- a/server/websocket/internal/types/types.go
+++ b/server/websocket/internal/types/types.go
@@ -6,13 +6,37 @@ import (
 )
 
 type DataTransferReq struct {
-	MsgType string      `json:"msg_type"` //消息类型
-	Message interface{} `json:"message"`  //传递的消息
+	MsgType string `json:"msg_type"` //消息类型
+	Message string `json:"message"`  //传递的消息
 }
 
 type DataTransferRsp struct {
-	MsgType string      `json:"msg_type"` //消息类型
-	Message interface{} `json:"message"`  //传递的消息
+	MsgType string `json:"msg_type"` //消息类型
+	Message string `json:"message"`  //传递的消息
+}
+
+type RenderImageReqMsg struct {
+	ProductId  int64 `json:"product_id"`
+	SizeId     int64 `json:"size_id"`
+	TemplateId int64 `json:"template_id"`
+}
+
+type RenderImageRspMsg struct {
+	ProductId  int64  `json:"product_id"`
+	SizeId     int64  `json:"size_id"`
+	TemplateId int64  `json:"template_id"`
+	Source     string `json:"source"`
+}
+
+type RenderNotifyReq struct {
+	NotifyList []NotifyItem `json:"notify_list"`
+}
+
+type NotifyItem struct {
+	ProductId  int64  `json:"product_id"`
+	SizeId     int64  `json:"size_id"`
+	TemplateId int64  `json:"template_id"`
+	Source     string `json:"source"`
 }
 
 type Request struct {
diff --git a/server_api/websocket.api b/server_api/websocket.api
index 786e67e7..ee90cdcc 100644
--- a/server_api/websocket.api
+++ b/server_api/websocket.api
@@ -12,14 +12,38 @@ service websocket {
 	//websocket数据交互
 	@handler DataTransferHandler
 	get /api/websocket/data_transfer(DataTransferReq) returns (response);
+	//渲染完了通知接口
+	@handler RenderNotifyHandler
+	post /api/websocket/render_notify(RenderNotifyReq) returns (response);
 }
 
 //websocket数据交互
 type DataTransferReq {
-	MsgType string      `json:"msg_type"` //消息类型
-	Message interface{} `json:"message"`  //传递的消息
+	MsgType string `json:"msg_type"` //消息类型
+	Message string `json:"message"`  //传递的消息
 }
 type DataTransferRsp {
-	MsgType string      `json:"msg_type"` //消息类型
-	Message interface{} `json:"message"`  //传递的消息
+	MsgType string `json:"msg_type"` //消息类型
+	Message string `json:"message"`  //传递的消息
+}
+type RenderImageReqMsg { //websocket接受需要云渲染的图片
+	ProductId  int64 `json:"product_id"`
+	SizeId     int64 `json:"size_id"`
+	TemplateId int64 `json:"template_id"`
+}
+type RenderImageRspMsg { //websocket发送渲染完的数据
+	ProductId  int64  `json:"product_id"`
+	SizeId     int64  `json:"size_id"`
+	TemplateId int64  `json:"template_id"`
+	Source     string `json:"source"`
+}
+//渲染完了通知接口
+type RenderNotifyReq {
+	NotifyList []NotifyItem `json:"notify_list"`
+}
+type NotifyItem {
+	ProductId  int64  `json:"product_id"`
+	SizeId     int64  `json:"size_id"`
+	TemplateId int64  `json:"template_id"`
+	Source     string `json:"source"`
 }
\ No newline at end of file