From 417126484bd540c4336c2916af4a3e519d6b4b84 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 1 Nov 2023 18:23:13 +0800 Subject: [PATCH 1/9] fix --- .../internal/logic/ws_render_image.go | 49 +++++++------------ utils/websocket_data/render_data.go | 5 +- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index fdafa1bf..1800082a 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -48,11 +48,11 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) { logx.Error("invalid format of websocket render image message", err) return } - //颜色/模板标签/logo变更 - ifCancelOldCtx := false - /*if renderImageData.RenderData.TemplateTag != w.extendRenderProperty.templateTag { + //颜色/模板标签/logo变更(后面再开启) + /*ifCancelOldCtx := false + if renderImageData.RenderData.TemplateTag != w.extendRenderProperty.templateTag { ifCancelOldCtx = true - }*/ + } if renderImageData.RenderData.TemplateTagColor.SelectedColorIndex != w.extendRenderProperty.selectColorIndex { ifCancelOldCtx = true } @@ -68,7 +68,7 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) { w.extendRenderProperty.renderCtxCancelFunc() //重新赋值 w.extendRenderProperty.renderCtx, w.extendRenderProperty.renderCtxCancelFunc = context.WithCancel(w.logic.ctx) - } + }*/ select { case <-w.closeChan: //已经关闭 return @@ -95,20 +95,6 @@ func (w *wsConnectItem) consumeRenderImageData() { case <-w.closeChan: //已关闭 return case data := <-w.extendRenderProperty.renderChan: //消费数据 - //标签不一样 - /*if data.RenderData.TemplateTag != w.extendRenderProperty.templateTag { - //logx.Info("标签不一致,丢弃消息") - continue - }*/ - //颜色不一致 - if data.RenderData.TemplateTagColor.SelectedColorIndex != w.extendRenderProperty.selectColorIndex { - //logx.Info("颜色不一致,丢弃消息") - continue - } - //logo不一样 - if data.RenderData.Logo != w.extendRenderProperty.Logo { - continue - } limitChan <- struct{}{} go func(d websocket_data.RenderImageReqMsg) { defer func() { @@ -122,19 +108,22 @@ func (w *wsConnectItem) consumeRenderImageData() { defer func() { <-limitChan }() - go func() { - defer func() { - if err := recover(); err != nil { - logx.Error("func renderImage panic:", err) + //如果不是无视上下文切换取消的(后面再开启) + /*if !d.IgnoreContextCancel { + go func() { + defer func() { + if err := recover(); err != nil { + logx.Error("func renderImage panic:", err) + } + }() + select { + case <-w.extendRenderProperty.renderCtx.Done(): + panic("检测到模板标签/颜色/logo变化,渲染取消旧的任务") + case <-tmpChan: + return } }() - select { - case <-w.extendRenderProperty.renderCtx.Done(): - panic("检测到模板标签/颜色/logo变化,渲染取消旧的任务") - case <-tmpChan: - return - } - }() + }*/ w.renderImage(d) }(data) } diff --git a/utils/websocket_data/render_data.go b/utils/websocket_data/render_data.go index c8b857af..508640e2 100644 --- a/utils/websocket_data/render_data.go +++ b/utils/websocket_data/render_data.go @@ -2,8 +2,9 @@ package websocket_data // websocket接受要云渲染处理的数据 type RenderImageReqMsg struct { - RequestId string `json:"request_id"` - RenderData RenderData `json:"render_data"` //渲染主要参数 + RequestId string `json:"request_id"` + IgnoreContextCancel bool `json:"ignore_context_cancel"` //是否无视切换上下文取消正在执行的渲染任务 + RenderData RenderData `json:"render_data"` //渲染主要参数 } type RenderData struct { TemplateTag string `json:"template_tag"` //模板标签(必须) From 496a56be5d5f9268f414cd322571829bba075c2b Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 1 Nov 2023 18:31:14 +0800 Subject: [PATCH 2/9] fix --- server/websocket/internal/logic/ws_render_image.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index 1800082a..914c3172 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -48,8 +48,8 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) { logx.Error("invalid format of websocket render image message", err) return } - //颜色/模板标签/logo变更(后面再开启) - /*ifCancelOldCtx := false + //颜色/模板标签/logo变更 + ifCancelOldCtx := false if renderImageData.RenderData.TemplateTag != w.extendRenderProperty.templateTag { ifCancelOldCtx = true } @@ -68,7 +68,7 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) { w.extendRenderProperty.renderCtxCancelFunc() //重新赋值 w.extendRenderProperty.renderCtx, w.extendRenderProperty.renderCtxCancelFunc = context.WithCancel(w.logic.ctx) - }*/ + } select { case <-w.closeChan: //已经关闭 return From 1bd606a89c6ea6b0392931229b121f6caff56048 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 2 Nov 2023 10:10:22 +0800 Subject: [PATCH 3/9] fix --- server/websocket/internal/logic/rendernotifylogic.go | 2 +- server/websocket/internal/logic/ws_ok_response.go | 2 +- server/websocket/internal/logic/ws_render_image.go | 2 +- utils/websocket_data/render_data.go | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go index de5124dc..1d08b889 100644 --- a/server/websocket/internal/logic/rendernotifylogic.go +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -99,7 +99,7 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *a ws.sendRenderResultData(websocket_data.RenderImageRspMsg{ RequestId: requestId, Image: uploadRes.ResourceUrl, - RenderProcessTime: websocket_data.RenderProcessTime{ + RenderProcessTime: &websocket_data.RenderProcessTime{ UnityRenderTakesTime: fmt.Sprintf("%dms", unityRenderEndTime-unityRenderBeginTime), UploadUnityRenderImageTakesTime: fmt.Sprintf("%dms", uploadUnityRenderImageTakesTime), }, diff --git a/server/websocket/internal/logic/ws_ok_response.go b/server/websocket/internal/logic/ws_ok_response.go index 5c81991a..bcfd7cfb 100644 --- a/server/websocket/internal/logic/ws_ok_response.go +++ b/server/websocket/internal/logic/ws_ok_response.go @@ -61,7 +61,7 @@ func (w *wsConnectItem) sendRenderDataToUnityStepResponseMessage(requestId strin func (w *wsConnectItem) sendRenderResultData(data websocket_data.RenderImageRspMsg) { //没开启debug if w.debug == nil { - data.RenderProcessTime = websocket_data.RenderProcessTime{} + data.RenderProcessTime = nil } w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, data)) } diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index 914c3172..5c88d428 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -258,7 +258,7 @@ func (w *wsConnectItem) renderImage(renderImageData websocket_data.RenderImageRe w.sendRenderResultData(websocket_data.RenderImageRspMsg{ RequestId: renderImageData.RequestId, Image: *resource.ResourceUrl, - RenderProcessTime: websocket_data.RenderProcessTime{ + RenderProcessTime: &websocket_data.RenderProcessTime{ UnityRenderTakesTime: "cache", UploadUnityRenderImageTakesTime: "cache", }, diff --git a/utils/websocket_data/render_data.go b/utils/websocket_data/render_data.go index 508640e2..c0047b55 100644 --- a/utils/websocket_data/render_data.go +++ b/utils/websocket_data/render_data.go @@ -28,9 +28,9 @@ type TemplateTagColor struct { // websocket发送渲染完的数据 type RenderImageRspMsg struct { - RequestId string `json:"request_id"` - Image string `json:"image"` //渲染结果图片 - RenderProcessTime RenderProcessTime `json:"render_process_time"` //流程耗时 + RequestId string `json:"request_id"` + Image string `json:"image"` //渲染结果图片 + RenderProcessTime *RenderProcessTime `json:"render_process_time"` //流程耗时 } type RenderProcessTime struct { UnityRenderTakesTime string `json:"unity_render_takes_time"` //unity渲染用时 From 265489956c2d667ec8a232a0692554ff4496749b Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 2 Nov 2023 11:32:04 +0800 Subject: [PATCH 4/9] fix --- .../internal/logic/datatransferlogic.go | 28 +++--- .../logic/ws_render_cancel_notify_unity.go | 88 +++++++++++++++++++ .../internal/logic/ws_render_image.go | 10 ++- .../websocket/internal/logic/ws_statistics.go | 2 +- .../internal/logic/ws_user_connect_pool.go | 2 +- server/websocket/websocket.go | 6 ++ 6 files changed, 119 insertions(+), 17 deletions(-) create mode 100644 server/websocket/internal/logic/ws_render_cancel_notify_unity.go diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 103c6758..339dd1a2 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -82,7 +82,7 @@ type wsConnectItem struct { logic *DataTransferLogic //logic(基本属性,用于获取上下文,配置或者操作数据库) closeChan chan struct{} //ws连接关闭chan(基本属性) isClose bool //是否已经关闭(基本属性) - uniqueId string //ws连接唯一标识(基本属性) + wid string //ws连接唯一标识(基本属性) inChan chan []byte //接受消息缓冲队列(基本属性) outChan chan []byte //要发送回客户端的消息缓冲队列(基本属性) mutex sync.Mutex //互斥锁(基本属性) @@ -170,7 +170,7 @@ func (l *DataTransferLogic) DataTransfer(req *types.DataTransferReq, w http.Resp // 设置连接 func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.UserInfo, isFirefoxBrowser bool, userAgent, oldWid string) (wsConnectItem, error) { //生成连接唯一标识(失败重试10次) - uniqueId, err := l.getUniqueId(userInfo, userAgent, 10) + wid, err := l.getUniqueId(userInfo, userAgent, 10) if err != nil { //发送获取唯一标识失败的消息 if isFirefoxBrowser { @@ -208,7 +208,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use break } logx.Info("====复用旧的ws连接成功====") - uniqueId = oldWid + wid = oldWid } } //默认过期时间 @@ -224,7 +224,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use logic: l, closeChan: make(chan struct{}, 1), isClose: false, - uniqueId: uniqueId, + wid: wid, inChan: make(chan []byte, websocketInChanLen), outChan: make(chan []byte, websocketOutChanLen), mutex: sync.Mutex{}, @@ -245,15 +245,15 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use IsAllTemplateTag: 0, }*/ //保存连接 - mapConnPool.Store(uniqueId, ws) + mapConnPool.Store(wid, ws) //累加统计连接数 increaseWebsocketConnectCount(userInfo.UserId, userInfo.GuestId) //非白板用户,需要为这个用户建立map索引便于通过用户查询 - createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, uniqueId) + createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, wid) if isFirefoxBrowser { time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究) } - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, websocket_data.ConnectSuccessMsg{Wid: uniqueId})) + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, websocket_data.ConnectSuccessMsg{Wid: wid})) return ws, nil } @@ -304,7 +304,7 @@ func (w *wsConnectItem) heartbeat() { case <-tick: //看看token是否过期了 if w.connExpireTime > 0 && w.connExpireTime < time.Now().UTC().Unix() { - logx.Info("token过期,关闭连接:", w.uniqueId) + logx.Info("token过期,关闭连接:", w.wid) w.close() return } @@ -313,7 +313,7 @@ func (w *wsConnectItem) heartbeat() { w.debug = nil } if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil { - logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err) + logx.Error("发送心跳信息异常,关闭连接:", w.wid, err) w.close() return } @@ -325,20 +325,22 @@ func (w *wsConnectItem) heartbeat() { func (w *wsConnectItem) close() { w.mutex.Lock() defer w.mutex.Unlock() - logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closing....") + logx.Info("###websocket:", w.wid, " uid:", w.userId, " gid:", w.guestId, " is closing....") //发送关闭信息 _ = w.conn.WriteMessage(websocket.CloseMessage, nil) w.conn.Close() - mapConnPool.Delete(w.uniqueId) + mapConnPool.Delete(w.wid) if !w.isClose { w.isClose = true close(w.closeChan) //删除用户级索引 - deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId) + deleteUserConnPoolElement(w.userId, w.guestId, w.wid) //减少连接数统计 decreaseWebsocketConnectCount(w.userId, w.guestId) + //通知unity取消任务 + sendCancelRenderMsgToUnity(w.wid, time.Now().UTC().Unix()) } - logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed") + logx.Info("###websocket:", w.wid, " uid:", w.userId, " gid:", w.guestId, " is closed") } // 读取出口缓冲队列数据输出返回给浏览器端 diff --git a/server/websocket/internal/logic/ws_render_cancel_notify_unity.go b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go new file mode 100644 index 00000000..d5316920 --- /dev/null +++ b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go @@ -0,0 +1,88 @@ +package logic + +import ( + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/zeromicro/go-zero/core/logx" + "net" + "time" +) + +var ( + //取消unity僵尸任务控制通道 + cancelUnityCtlChan = make(chan cancelUnityCtlChanItem, 1000) + cancelRenderContextPanicMsg any = "cancel_render_context_panic_msg" +) + +// 控制通道元素 +type cancelUnityCtlChanItem struct { + Wid string `json:"wid"` //ws的唯一id + DeadlineTime int64 `json:"deadline_time"` //截断时间 + Sign string `json:"sign"` //有效签名 +} + +// 取消渲染抛出的异常 +func cancelRenderPanic() { + panic(cancelRenderContextPanicMsg) +} + +// 判断是否是取消渲染的异常 +func isCancelRenderPanic(err any) bool { + return err == cancelRenderContextPanicMsg +} + +// 发送取消上下文消息给unity +func sendCancelRenderMsgToUnity(wid string, deadlineTime int64) { + h := md5.New() + h.Write([]byte(fmt.Sprintf("%s_%d", wid, deadlineTime))) + data := cancelUnityCtlChanItem{ + Wid: wid, + DeadlineTime: deadlineTime, + Sign: hex.EncodeToString(h.Sum(nil)), + } + select { + case cancelUnityCtlChan <- data: + case <-time.After(time.Millisecond * 200): + logx.Error("sendCancelRenderMsgToUnity数据超时丢弃") + } +} + +// 拨号udp +func DialUdp(ctx context.Context) error { + conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP("localhost"), Port: 9501}) + if err != nil { + return err + } + go ConsumeCancelUnityChanMessage(ctx, conn) + return nil +} + +// 消费数据 +func ConsumeCancelUnityChanMessage(ctx context.Context, conn *net.UDPConn) { + defer func() { + if err := recover(); err != nil { + logx.Error("ConsumeCancelUnityChanMessage 异常:", err) + } + }() + go func() { + select { + case <-ctx.Done(): + panic(any("ConsumeCancelUnityChanMessage ctx deadline")) + } + }() + defer conn.Close() + for { + select { + case data := <-cancelUnityCtlChan: + d, _ := json.Marshal(data) + _, err := conn.Write(d) + if err != nil { + logx.Error("发送udp包失败:", err) + continue + } + } + } +} diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index 5c88d428..15ef0eb3 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -85,6 +85,11 @@ func (w *wsConnectItem) consumeRenderImageData() { defer func() { if err := recover(); err != nil { logx.Error("func consumeRenderImageData panic:", err) + //如果是上下文取消渲染的异常 + if isCancelRenderPanic(err) { + //通知unity取消任务 + sendCancelRenderMsgToUnity(w.wid, time.Now().UTC().Unix()) + } } }() //限制并发 @@ -118,7 +123,8 @@ func (w *wsConnectItem) consumeRenderImageData() { }() select { case <-w.extendRenderProperty.renderCtx.Done(): - panic("检测到模板标签/颜色/logo变化,渲染取消旧的任务") + //抛出取消渲染异常 + cancelRenderPanic() case <-tmpChan: return } @@ -390,7 +396,7 @@ func (w *wsConnectItem) assembleRenderDataToUnity(taskId string, resolution int, w.sendAssembleRenderDataStepResponseMessage(info.RequestId) temId := websocket_data.ToUnityIdStruct{ TaskId: taskId, - Wid: w.uniqueId, + Wid: w.wid, RequestId: info.RequestId, RenderBeginTime: time.Now().UTC().UnixMilli(), TemplateTag: info.RenderData.TemplateTag, diff --git a/server/websocket/internal/logic/ws_statistics.go b/server/websocket/internal/logic/ws_statistics.go index 843debf1..73b0aa4f 100644 --- a/server/websocket/internal/logic/ws_statistics.go +++ b/server/websocket/internal/logic/ws_statistics.go @@ -213,7 +213,7 @@ func ConsumeWebsocketStatData(ctx context.Context) { go func() { select { case <-ctx.Done(): - panic("ConsumeWebsocketStatData ctx deadline") + panic(any("ConsumeWebsocketStatData ctx deadline")) } }() for { diff --git a/server/websocket/internal/logic/ws_user_connect_pool.go b/server/websocket/internal/logic/ws_user_connect_pool.go index 140b731c..d272e72e 100644 --- a/server/websocket/internal/logic/ws_user_connect_pool.go +++ b/server/websocket/internal/logic/ws_user_connect_pool.go @@ -65,7 +65,7 @@ func ConsumeUserConnPoolCtlChanData(ctx context.Context) { go func() { select { case <-ctx.Done(): - panic("ConsumeUserConnPoolCtlChanData ctx deadline") + panic(any("ConsumeUserConnPoolCtlChanData ctx deadline")) } }() var ( diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index 34673a22..a3d720a5 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "fusenapi/server/websocket/internal/logic" + "github.com/zeromicro/go-zero/core/logx" "net/http" "fusenapi/utils/auth" @@ -39,6 +40,11 @@ func main() { go logic.ConsumeUserConnPoolCtlChanData(ctx1) //消费连接统计信息 go logic.ConsumeWebsocketStatData(ctx1) + //拨号udp消费控制unity取消僵尸任务的消息 + if err := logic.DialUdp(ctx1); err != nil { + logx.Error("dail udp err:", err) + return + } fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() } From 7342c0d72a4f671a86f376134623a7a6c78f0f63 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 2 Nov 2023 11:48:15 +0800 Subject: [PATCH 5/9] fix --- server/websocket/etc/websocket.yaml | 7 ++++++- server/websocket/internal/config/config.go | 6 ++++++ .../internal/logic/ws_render_cancel_notify_unity.go | 9 ++++++--- server/websocket/websocket.go | 2 +- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/server/websocket/etc/websocket.yaml b/server/websocket/etc/websocket.yaml index 85f9ed0b..660e5bfc 100644 --- a/server/websocket/etc/websocket.yaml +++ b/server/websocket/etc/websocket.yaml @@ -25,4 +25,9 @@ BLMService: #Url: "http://192.168.1.7:8999/LogoCombine" Url: "http://18.119.109.254:8999/LogoCombine" Unity: - Host: http://api.fusen.3718.cn:4050 \ No newline at end of file + Host: "http://api.fusen.3718.cn:4050" + Udp: + LocalAddr: "127.0.0.1" + LocalPort: 9100 + RemoteAddr: "127.0.0.1" + RemotePort: 9101 \ No newline at end of file diff --git a/server/websocket/internal/config/config.go b/server/websocket/internal/config/config.go index 925b57ae..93ff57db 100644 --- a/server/websocket/internal/config/config.go +++ b/server/websocket/internal/config/config.go @@ -27,5 +27,11 @@ type Config struct { } Unity struct { Host string + Udp struct { + LocalAddr string + LocalPort int + RemoteAddr string + RemotePort int + } } } diff --git a/server/websocket/internal/logic/ws_render_cancel_notify_unity.go b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go index d5316920..e347f047 100644 --- a/server/websocket/internal/logic/ws_render_cancel_notify_unity.go +++ b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "fusenapi/server/websocket/internal/config" "github.com/zeromicro/go-zero/core/logx" "net" "time" @@ -51,8 +52,10 @@ func sendCancelRenderMsgToUnity(wid string, deadlineTime int64) { } // 拨号udp -func DialUdp(ctx context.Context) error { - conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP("localhost"), Port: 9501}) +func DialUdp(ctx context.Context, config config.Config) error { + localAddr := &net.UDPAddr{IP: net.ParseIP(config.Unity.Udp.LocalAddr), Port: config.Unity.Udp.LocalPort} + remoteAddr := &net.UDPAddr{IP: net.ParseIP(config.Unity.Udp.RemoteAddr), Port: config.Unity.Udp.RemotePort} + conn, err := net.DialUDP("udp", localAddr, remoteAddr) if err != nil { return err } @@ -80,7 +83,7 @@ func ConsumeCancelUnityChanMessage(ctx context.Context, conn *net.UDPConn) { d, _ := json.Marshal(data) _, err := conn.Write(d) if err != nil { - logx.Error("发送udp包失败:", err) + logx.Error("发送udp包通知Unity失败:", err) continue } } diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index a3d720a5..562e9e43 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -41,7 +41,7 @@ func main() { //消费连接统计信息 go logic.ConsumeWebsocketStatData(ctx1) //拨号udp消费控制unity取消僵尸任务的消息 - if err := logic.DialUdp(ctx1); err != nil { + if err := logic.DialUdp(ctx1, c); err != nil { logx.Error("dail udp err:", err) return } From cd75ee795ff855c6aa721a1a75b06999151e4e83 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 2 Nov 2023 12:01:07 +0800 Subject: [PATCH 6/9] fix --- .../internal/logic/ws_render_cancel_notify_unity.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/websocket/internal/logic/ws_render_cancel_notify_unity.go b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go index e347f047..3bcf07b2 100644 --- a/server/websocket/internal/logic/ws_render_cancel_notify_unity.go +++ b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go @@ -37,12 +37,10 @@ func isCancelRenderPanic(err any) bool { // 发送取消上下文消息给unity func sendCancelRenderMsgToUnity(wid string, deadlineTime int64) { - h := md5.New() - h.Write([]byte(fmt.Sprintf("%s_%d", wid, deadlineTime))) data := cancelUnityCtlChanItem{ Wid: wid, DeadlineTime: deadlineTime, - Sign: hex.EncodeToString(h.Sum(nil)), + Sign: signMessage(wid, deadlineTime), } select { case cancelUnityCtlChan <- data: @@ -63,6 +61,13 @@ func DialUdp(ctx context.Context, config config.Config) error { return nil } +// 签名消息 +func signMessage(wid string, deadlineTime int64) string { + h := md5.New() + h.Write([]byte(fmt.Sprintf("%s_fusen_control_unity_%d", wid, deadlineTime))) + return hex.EncodeToString(h.Sum(nil)) +} + // 消费数据 func ConsumeCancelUnityChanMessage(ctx context.Context, conn *net.UDPConn) { defer func() { From afbf183b97ba67f8e22b7f86f996ab3eff0dfb9b Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 2 Nov 2023 12:15:52 +0800 Subject: [PATCH 7/9] fix --- .../internal/logic/rendernotifylogic.go | 21 +++++++------------ .../websocket/internal/logic/ws_statistics.go | 2 ++ server/websocket/websocket.go | 5 ++--- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go index 1d08b889..0db5c748 100644 --- a/server/websocket/internal/logic/rendernotifylogic.go +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -11,7 +11,6 @@ import ( "fusenapi/utils/basic" "fusenapi/utils/file" "fusenapi/utils/websocket_data" - "strings" "time" "github.com/zeromicro/go-zero/core/logx" @@ -39,19 +38,7 @@ func NewRenderNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Rend // func (l *RenderNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) { // // httpx.OkJsonCtx(r.Context(), w, resp) // } - func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { - //统计unity处理数 - decreaseUnityRequestCount(req.UserId, req.GuestId) - if req.Code != 0 { - //统计unity失败处理数 - increaseUnityErrorCount(req.UserId, req.GuestId) - } - req.TaskId = strings.Trim(req.TaskId, " ") - if req.TaskId == "" { - logx.Error("渲染回调参数错误:任务标识") - return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "invalid param task_id") - } unityRenderEndTime := time.Now().UTC().UnixMilli() //解析数据 var info websocket_data.ToUnityIdStruct @@ -59,6 +46,14 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *a logx.Error("解析taskId错误") return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "failed to parse param taskId !!!!") } + if info.RenderBeginTime > serverStartTime { + //统计unity处理数 + decreaseUnityRequestCount(req.UserId, req.GuestId) + if req.Code != 0 { + //统计unity失败处理数 + increaseUnityErrorCount(req.UserId, req.GuestId) + } + } //重新赋值(很重要) wid := info.Wid requestId := info.RequestId diff --git a/server/websocket/internal/logic/ws_statistics.go b/server/websocket/internal/logic/ws_statistics.go index 73b0aa4f..72ce3bb8 100644 --- a/server/websocket/internal/logic/ws_statistics.go +++ b/server/websocket/internal/logic/ws_statistics.go @@ -34,6 +34,8 @@ type mapUserWsStatItem struct { // 统计信息 var ( + // 服务启动时间 + serverStartTime = time.Now().UTC().UnixMilli() //用户连接统计 mapUserWsStat = sync.Map{} //消息控制通道的数据结构 diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index 562e9e43..969c2c33 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -5,11 +5,10 @@ import ( "flag" "fmt" "fusenapi/server/websocket/internal/logic" - "github.com/zeromicro/go-zero/core/logx" - "net/http" - "fusenapi/utils/auth" "fusenapi/utils/fsconfig" + "github.com/zeromicro/go-zero/core/logx" + "net/http" "fusenapi/server/websocket/internal/config" "fusenapi/server/websocket/internal/handler" From ca24f9db30b00b002ecbbd73daaf681989cc03b1 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 2 Nov 2023 12:21:28 +0800 Subject: [PATCH 8/9] fix --- server/websocket/internal/logic/rendernotifylogic.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go index 0db5c748..b557381d 100644 --- a/server/websocket/internal/logic/rendernotifylogic.go +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -47,6 +47,7 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *a return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "failed to parse param taskId !!!!") } if info.RenderBeginTime > serverStartTime { + logx.Info("任务时间:", info.RenderBeginTime, "服务器启动时间:", serverStartTime) //统计unity处理数 decreaseUnityRequestCount(req.UserId, req.GuestId) if req.Code != 0 { From ba9cb470a6ece630062097297708cd205729239e Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 2 Nov 2023 12:25:51 +0800 Subject: [PATCH 9/9] fix --- server/websocket/internal/logic/rendernotifylogic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go index b557381d..4f28923e 100644 --- a/server/websocket/internal/logic/rendernotifylogic.go +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -47,7 +47,7 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *a return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "failed to parse param taskId !!!!") } if info.RenderBeginTime > serverStartTime { - logx.Info("任务时间:", info.RenderBeginTime, "服务器启动时间:", serverStartTime) + //logx.Info("任务时间:", info.RenderBeginTime, "服务器启动时间:", serverStartTime) //统计unity处理数 decreaseUnityRequestCount(req.UserId, req.GuestId) if req.Code != 0 {