fix
This commit is contained in:
parent
1bd606a89c
commit
265489956c
|
@ -82,7 +82,7 @@ type wsConnectItem struct {
|
|||
logic *DataTransferLogic //logic(基本属性,用于获取上下文,配置或者操作数据库)
|
||||
closeChan chan struct{} //ws连接关闭chan(基本属性)
|
||||
isClose bool //是否已经关闭(基本属性)
|
||||
uniqueId string //ws连接唯一标识(基本属性)
|
||||
wid string //ws连接唯一标识(基本属性)
|
||||
inChan chan []byte //接受消息缓冲队列(基本属性)
|
||||
outChan chan []byte //要发送回客户端的消息缓冲队列(基本属性)
|
||||
mutex sync.Mutex //互斥锁(基本属性)
|
||||
|
@ -170,7 +170,7 @@ func (l *DataTransferLogic) DataTransfer(req *types.DataTransferReq, w http.Resp
|
|||
// 设置连接
|
||||
func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.UserInfo, isFirefoxBrowser bool, userAgent, oldWid string) (wsConnectItem, error) {
|
||||
//生成连接唯一标识(失败重试10次)
|
||||
uniqueId, err := l.getUniqueId(userInfo, userAgent, 10)
|
||||
wid, err := l.getUniqueId(userInfo, userAgent, 10)
|
||||
if err != nil {
|
||||
//发送获取唯一标识失败的消息
|
||||
if isFirefoxBrowser {
|
||||
|
@ -208,7 +208,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
|
|||
break
|
||||
}
|
||||
logx.Info("====复用旧的ws连接成功====")
|
||||
uniqueId = oldWid
|
||||
wid = oldWid
|
||||
}
|
||||
}
|
||||
//默认过期时间
|
||||
|
@ -224,7 +224,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
|
|||
logic: l,
|
||||
closeChan: make(chan struct{}, 1),
|
||||
isClose: false,
|
||||
uniqueId: uniqueId,
|
||||
wid: wid,
|
||||
inChan: make(chan []byte, websocketInChanLen),
|
||||
outChan: make(chan []byte, websocketOutChanLen),
|
||||
mutex: sync.Mutex{},
|
||||
|
@ -245,15 +245,15 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
|
|||
IsAllTemplateTag: 0,
|
||||
}*/
|
||||
//保存连接
|
||||
mapConnPool.Store(uniqueId, ws)
|
||||
mapConnPool.Store(wid, ws)
|
||||
//累加统计连接数
|
||||
increaseWebsocketConnectCount(userInfo.UserId, userInfo.GuestId)
|
||||
//非白板用户,需要为这个用户建立map索引便于通过用户查询
|
||||
createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, uniqueId)
|
||||
createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, wid)
|
||||
if isFirefoxBrowser {
|
||||
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
|
||||
}
|
||||
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, websocket_data.ConnectSuccessMsg{Wid: uniqueId}))
|
||||
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, websocket_data.ConnectSuccessMsg{Wid: wid}))
|
||||
return ws, nil
|
||||
}
|
||||
|
||||
|
@ -304,7 +304,7 @@ func (w *wsConnectItem) heartbeat() {
|
|||
case <-tick:
|
||||
//看看token是否过期了
|
||||
if w.connExpireTime > 0 && w.connExpireTime < time.Now().UTC().Unix() {
|
||||
logx.Info("token过期,关闭连接:", w.uniqueId)
|
||||
logx.Info("token过期,关闭连接:", w.wid)
|
||||
w.close()
|
||||
return
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ func (w *wsConnectItem) heartbeat() {
|
|||
w.debug = nil
|
||||
}
|
||||
if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
|
||||
logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err)
|
||||
logx.Error("发送心跳信息异常,关闭连接:", w.wid, err)
|
||||
w.close()
|
||||
return
|
||||
}
|
||||
|
@ -325,20 +325,22 @@ func (w *wsConnectItem) heartbeat() {
|
|||
func (w *wsConnectItem) close() {
|
||||
w.mutex.Lock()
|
||||
defer w.mutex.Unlock()
|
||||
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closing....")
|
||||
logx.Info("###websocket:", w.wid, " uid:", w.userId, " gid:", w.guestId, " is closing....")
|
||||
//发送关闭信息
|
||||
_ = w.conn.WriteMessage(websocket.CloseMessage, nil)
|
||||
w.conn.Close()
|
||||
mapConnPool.Delete(w.uniqueId)
|
||||
mapConnPool.Delete(w.wid)
|
||||
if !w.isClose {
|
||||
w.isClose = true
|
||||
close(w.closeChan)
|
||||
//删除用户级索引
|
||||
deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId)
|
||||
deleteUserConnPoolElement(w.userId, w.guestId, w.wid)
|
||||
//减少连接数统计
|
||||
decreaseWebsocketConnectCount(w.userId, w.guestId)
|
||||
//通知unity取消任务
|
||||
sendCancelRenderMsgToUnity(w.wid, time.Now().UTC().Unix())
|
||||
}
|
||||
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed")
|
||||
logx.Info("###websocket:", w.wid, " uid:", w.userId, " gid:", w.guestId, " is closed")
|
||||
}
|
||||
|
||||
// 读取出口缓冲队列数据输出返回给浏览器端
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
package logic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
//取消unity僵尸任务控制通道
|
||||
cancelUnityCtlChan = make(chan cancelUnityCtlChanItem, 1000)
|
||||
cancelRenderContextPanicMsg any = "cancel_render_context_panic_msg"
|
||||
)
|
||||
|
||||
// 控制通道元素
|
||||
type cancelUnityCtlChanItem struct {
|
||||
Wid string `json:"wid"` //ws的唯一id
|
||||
DeadlineTime int64 `json:"deadline_time"` //截断时间
|
||||
Sign string `json:"sign"` //有效签名
|
||||
}
|
||||
|
||||
// 取消渲染抛出的异常
|
||||
func cancelRenderPanic() {
|
||||
panic(cancelRenderContextPanicMsg)
|
||||
}
|
||||
|
||||
// 判断是否是取消渲染的异常
|
||||
func isCancelRenderPanic(err any) bool {
|
||||
return err == cancelRenderContextPanicMsg
|
||||
}
|
||||
|
||||
// 发送取消上下文消息给unity
|
||||
func sendCancelRenderMsgToUnity(wid string, deadlineTime int64) {
|
||||
h := md5.New()
|
||||
h.Write([]byte(fmt.Sprintf("%s_%d", wid, deadlineTime)))
|
||||
data := cancelUnityCtlChanItem{
|
||||
Wid: wid,
|
||||
DeadlineTime: deadlineTime,
|
||||
Sign: hex.EncodeToString(h.Sum(nil)),
|
||||
}
|
||||
select {
|
||||
case cancelUnityCtlChan <- data:
|
||||
case <-time.After(time.Millisecond * 200):
|
||||
logx.Error("sendCancelRenderMsgToUnity数据超时丢弃")
|
||||
}
|
||||
}
|
||||
|
||||
// 拨号udp
|
||||
func DialUdp(ctx context.Context) error {
|
||||
conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP("localhost"), Port: 9501})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go ConsumeCancelUnityChanMessage(ctx, conn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 消费数据
|
||||
func ConsumeCancelUnityChanMessage(ctx context.Context, conn *net.UDPConn) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logx.Error("ConsumeCancelUnityChanMessage 异常:", err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
panic(any("ConsumeCancelUnityChanMessage ctx deadline"))
|
||||
}
|
||||
}()
|
||||
defer conn.Close()
|
||||
for {
|
||||
select {
|
||||
case data := <-cancelUnityCtlChan:
|
||||
d, _ := json.Marshal(data)
|
||||
_, err := conn.Write(d)
|
||||
if err != nil {
|
||||
logx.Error("发送udp包失败:", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -85,6 +85,11 @@ func (w *wsConnectItem) consumeRenderImageData() {
|
|||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logx.Error("func consumeRenderImageData panic:", err)
|
||||
//如果是上下文取消渲染的异常
|
||||
if isCancelRenderPanic(err) {
|
||||
//通知unity取消任务
|
||||
sendCancelRenderMsgToUnity(w.wid, time.Now().UTC().Unix())
|
||||
}
|
||||
}
|
||||
}()
|
||||
//限制并发
|
||||
|
@ -118,7 +123,8 @@ func (w *wsConnectItem) consumeRenderImageData() {
|
|||
}()
|
||||
select {
|
||||
case <-w.extendRenderProperty.renderCtx.Done():
|
||||
panic("检测到模板标签/颜色/logo变化,渲染取消旧的任务")
|
||||
//抛出取消渲染异常
|
||||
cancelRenderPanic()
|
||||
case <-tmpChan:
|
||||
return
|
||||
}
|
||||
|
@ -390,7 +396,7 @@ func (w *wsConnectItem) assembleRenderDataToUnity(taskId string, resolution int,
|
|||
w.sendAssembleRenderDataStepResponseMessage(info.RequestId)
|
||||
temId := websocket_data.ToUnityIdStruct{
|
||||
TaskId: taskId,
|
||||
Wid: w.uniqueId,
|
||||
Wid: w.wid,
|
||||
RequestId: info.RequestId,
|
||||
RenderBeginTime: time.Now().UTC().UnixMilli(),
|
||||
TemplateTag: info.RenderData.TemplateTag,
|
||||
|
|
|
@ -213,7 +213,7 @@ func ConsumeWebsocketStatData(ctx context.Context) {
|
|||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
panic("ConsumeWebsocketStatData ctx deadline")
|
||||
panic(any("ConsumeWebsocketStatData ctx deadline"))
|
||||
}
|
||||
}()
|
||||
for {
|
||||
|
|
|
@ -65,7 +65,7 @@ func ConsumeUserConnPoolCtlChanData(ctx context.Context) {
|
|||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
panic("ConsumeUserConnPoolCtlChanData ctx deadline")
|
||||
panic(any("ConsumeUserConnPoolCtlChanData ctx deadline"))
|
||||
}
|
||||
}()
|
||||
var (
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"fusenapi/server/websocket/internal/logic"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"net/http"
|
||||
|
||||
"fusenapi/utils/auth"
|
||||
|
@ -39,6 +40,11 @@ func main() {
|
|||
go logic.ConsumeUserConnPoolCtlChanData(ctx1)
|
||||
//消费连接统计信息
|
||||
go logic.ConsumeWebsocketStatData(ctx1)
|
||||
//拨号udp消费控制unity取消僵尸任务的消息
|
||||
if err := logic.DialUdp(ctx1); err != nil {
|
||||
logx.Error("dail udp err:", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
|
||||
server.Start()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user