diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 103c6758..339dd1a2 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -82,7 +82,7 @@ type wsConnectItem struct { logic *DataTransferLogic //logic(基本属性,用于获取上下文,配置或者操作数据库) closeChan chan struct{} //ws连接关闭chan(基本属性) isClose bool //是否已经关闭(基本属性) - uniqueId string //ws连接唯一标识(基本属性) + wid string //ws连接唯一标识(基本属性) inChan chan []byte //接受消息缓冲队列(基本属性) outChan chan []byte //要发送回客户端的消息缓冲队列(基本属性) mutex sync.Mutex //互斥锁(基本属性) @@ -170,7 +170,7 @@ func (l *DataTransferLogic) DataTransfer(req *types.DataTransferReq, w http.Resp // 设置连接 func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.UserInfo, isFirefoxBrowser bool, userAgent, oldWid string) (wsConnectItem, error) { //生成连接唯一标识(失败重试10次) - uniqueId, err := l.getUniqueId(userInfo, userAgent, 10) + wid, err := l.getUniqueId(userInfo, userAgent, 10) if err != nil { //发送获取唯一标识失败的消息 if isFirefoxBrowser { @@ -208,7 +208,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use break } logx.Info("====复用旧的ws连接成功====") - uniqueId = oldWid + wid = oldWid } } //默认过期时间 @@ -224,7 +224,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use logic: l, closeChan: make(chan struct{}, 1), isClose: false, - uniqueId: uniqueId, + wid: wid, inChan: make(chan []byte, websocketInChanLen), outChan: make(chan []byte, websocketOutChanLen), mutex: sync.Mutex{}, @@ -245,15 +245,15 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use IsAllTemplateTag: 0, }*/ //保存连接 - mapConnPool.Store(uniqueId, ws) + mapConnPool.Store(wid, ws) //累加统计连接数 increaseWebsocketConnectCount(userInfo.UserId, userInfo.GuestId) //非白板用户,需要为这个用户建立map索引便于通过用户查询 - createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, uniqueId) + createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, wid) if isFirefoxBrowser { time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究) } - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, websocket_data.ConnectSuccessMsg{Wid: uniqueId})) + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, websocket_data.ConnectSuccessMsg{Wid: wid})) return ws, nil } @@ -304,7 +304,7 @@ func (w *wsConnectItem) heartbeat() { case <-tick: //看看token是否过期了 if w.connExpireTime > 0 && w.connExpireTime < time.Now().UTC().Unix() { - logx.Info("token过期,关闭连接:", w.uniqueId) + logx.Info("token过期,关闭连接:", w.wid) w.close() return } @@ -313,7 +313,7 @@ func (w *wsConnectItem) heartbeat() { w.debug = nil } if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil { - logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err) + logx.Error("发送心跳信息异常,关闭连接:", w.wid, err) w.close() return } @@ -325,20 +325,22 @@ func (w *wsConnectItem) heartbeat() { func (w *wsConnectItem) close() { w.mutex.Lock() defer w.mutex.Unlock() - logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closing....") + logx.Info("###websocket:", w.wid, " uid:", w.userId, " gid:", w.guestId, " is closing....") //发送关闭信息 _ = w.conn.WriteMessage(websocket.CloseMessage, nil) w.conn.Close() - mapConnPool.Delete(w.uniqueId) + mapConnPool.Delete(w.wid) if !w.isClose { w.isClose = true close(w.closeChan) //删除用户级索引 - deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId) + deleteUserConnPoolElement(w.userId, w.guestId, w.wid) //减少连接数统计 decreaseWebsocketConnectCount(w.userId, w.guestId) + //通知unity取消任务 + sendCancelRenderMsgToUnity(w.wid, time.Now().UTC().Unix()) } - logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed") + logx.Info("###websocket:", w.wid, " uid:", w.userId, " gid:", w.guestId, " is closed") } // 读取出口缓冲队列数据输出返回给浏览器端 diff --git a/server/websocket/internal/logic/ws_render_cancel_notify_unity.go b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go new file mode 100644 index 00000000..d5316920 --- /dev/null +++ b/server/websocket/internal/logic/ws_render_cancel_notify_unity.go @@ -0,0 +1,88 @@ +package logic + +import ( + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/zeromicro/go-zero/core/logx" + "net" + "time" +) + +var ( + //取消unity僵尸任务控制通道 + cancelUnityCtlChan = make(chan cancelUnityCtlChanItem, 1000) + cancelRenderContextPanicMsg any = "cancel_render_context_panic_msg" +) + +// 控制通道元素 +type cancelUnityCtlChanItem struct { + Wid string `json:"wid"` //ws的唯一id + DeadlineTime int64 `json:"deadline_time"` //截断时间 + Sign string `json:"sign"` //有效签名 +} + +// 取消渲染抛出的异常 +func cancelRenderPanic() { + panic(cancelRenderContextPanicMsg) +} + +// 判断是否是取消渲染的异常 +func isCancelRenderPanic(err any) bool { + return err == cancelRenderContextPanicMsg +} + +// 发送取消上下文消息给unity +func sendCancelRenderMsgToUnity(wid string, deadlineTime int64) { + h := md5.New() + h.Write([]byte(fmt.Sprintf("%s_%d", wid, deadlineTime))) + data := cancelUnityCtlChanItem{ + Wid: wid, + DeadlineTime: deadlineTime, + Sign: hex.EncodeToString(h.Sum(nil)), + } + select { + case cancelUnityCtlChan <- data: + case <-time.After(time.Millisecond * 200): + logx.Error("sendCancelRenderMsgToUnity数据超时丢弃") + } +} + +// 拨号udp +func DialUdp(ctx context.Context) error { + conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP("localhost"), Port: 9501}) + if err != nil { + return err + } + go ConsumeCancelUnityChanMessage(ctx, conn) + return nil +} + +// 消费数据 +func ConsumeCancelUnityChanMessage(ctx context.Context, conn *net.UDPConn) { + defer func() { + if err := recover(); err != nil { + logx.Error("ConsumeCancelUnityChanMessage 异常:", err) + } + }() + go func() { + select { + case <-ctx.Done(): + panic(any("ConsumeCancelUnityChanMessage ctx deadline")) + } + }() + defer conn.Close() + for { + select { + case data := <-cancelUnityCtlChan: + d, _ := json.Marshal(data) + _, err := conn.Write(d) + if err != nil { + logx.Error("发送udp包失败:", err) + continue + } + } + } +} diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index 5c88d428..15ef0eb3 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -85,6 +85,11 @@ func (w *wsConnectItem) consumeRenderImageData() { defer func() { if err := recover(); err != nil { logx.Error("func consumeRenderImageData panic:", err) + //如果是上下文取消渲染的异常 + if isCancelRenderPanic(err) { + //通知unity取消任务 + sendCancelRenderMsgToUnity(w.wid, time.Now().UTC().Unix()) + } } }() //限制并发 @@ -118,7 +123,8 @@ func (w *wsConnectItem) consumeRenderImageData() { }() select { case <-w.extendRenderProperty.renderCtx.Done(): - panic("检测到模板标签/颜色/logo变化,渲染取消旧的任务") + //抛出取消渲染异常 + cancelRenderPanic() case <-tmpChan: return } @@ -390,7 +396,7 @@ func (w *wsConnectItem) assembleRenderDataToUnity(taskId string, resolution int, w.sendAssembleRenderDataStepResponseMessage(info.RequestId) temId := websocket_data.ToUnityIdStruct{ TaskId: taskId, - Wid: w.uniqueId, + Wid: w.wid, RequestId: info.RequestId, RenderBeginTime: time.Now().UTC().UnixMilli(), TemplateTag: info.RenderData.TemplateTag, diff --git a/server/websocket/internal/logic/ws_statistics.go b/server/websocket/internal/logic/ws_statistics.go index 843debf1..73b0aa4f 100644 --- a/server/websocket/internal/logic/ws_statistics.go +++ b/server/websocket/internal/logic/ws_statistics.go @@ -213,7 +213,7 @@ func ConsumeWebsocketStatData(ctx context.Context) { go func() { select { case <-ctx.Done(): - panic("ConsumeWebsocketStatData ctx deadline") + panic(any("ConsumeWebsocketStatData ctx deadline")) } }() for { diff --git a/server/websocket/internal/logic/ws_user_connect_pool.go b/server/websocket/internal/logic/ws_user_connect_pool.go index 140b731c..d272e72e 100644 --- a/server/websocket/internal/logic/ws_user_connect_pool.go +++ b/server/websocket/internal/logic/ws_user_connect_pool.go @@ -65,7 +65,7 @@ func ConsumeUserConnPoolCtlChanData(ctx context.Context) { go func() { select { case <-ctx.Done(): - panic("ConsumeUserConnPoolCtlChanData ctx deadline") + panic(any("ConsumeUserConnPoolCtlChanData ctx deadline")) } }() var ( diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index 34673a22..a3d720a5 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "fusenapi/server/websocket/internal/logic" + "github.com/zeromicro/go-zero/core/logx" "net/http" "fusenapi/utils/auth" @@ -39,6 +40,11 @@ func main() { go logic.ConsumeUserConnPoolCtlChanData(ctx1) //消费连接统计信息 go logic.ConsumeWebsocketStatData(ctx1) + //拨号udp消费控制unity取消僵尸任务的消息 + if err := logic.DialUdp(ctx1); err != nil { + logx.Error("dail udp err:", err) + return + } fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() }