This commit is contained in:
laodaming 2023-07-27 16:27:42 +08:00
parent 4eb37c3820
commit 58c0580cad
3 changed files with 36 additions and 10 deletions

View File

@ -12,6 +12,7 @@ const (
WEBSOCKET_RENDER_IMAGE = "WEBSOCKET_RENDER_IMAGE"
//数据格式错误
WEBSOCKET_ERR_DATA_FORMAT = "WEBSOCKET_ERR_DATA_FORMAT"
//
)
// 云渲染完成通知api需要的签名字符串

View File

@ -7,7 +7,7 @@ import (
"fusenapi/constants"
"fusenapi/server/websocket/internal/types"
"fusenapi/utils/auth"
"github.com/google/uuid"
"fusenapi/utils/id_generator"
"github.com/gorilla/websocket"
"net/http"
"sync"
@ -34,6 +34,8 @@ func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Data
}
var (
//全局websocketid生成器
websocketIdGenerator = id_generator.NewWebsocketId(1)
//临时缓存对象池
buffPool = sync.Pool{
New: func() interface{} {
@ -61,7 +63,7 @@ type wsConnectItem struct {
conn *websocket.Conn //websocket的连接
closeChan chan struct{} //ws连接关闭chan
isClose bool //是否已经关闭
flag string //ws连接唯一标识
uniqueId uint64 //ws连接唯一标识
inChan chan []byte //接受消息缓冲通道
outChan chan []byte //发送回客户端的消息
mutex sync.Mutex //互斥锁
@ -91,10 +93,10 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
return
}*/
//生成连接唯一标识
flag := uuid.New().String() + "time=" + time.Now().Format("15-04-05")
uniqueId := websocketIdGenerator.Get()
ws := wsConnectItem{
conn: conn,
flag: flag,
uniqueId: uniqueId,
closeChan: make(chan struct{}, 1),
inChan: make(chan []byte, 100),
outChan: make(chan []byte, 100),
@ -104,12 +106,12 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
},
}
//保存连接
mapConnPool.Store(flag, ws)
mapConnPool.Store(uniqueId, ws)
defer ws.close()
//把连接成功消息发回去
time.Sleep(time.Second) //兼容下火狐
rsp.T = constants.WEBSOCKET_CONNECT_SUCCESS
rsp.D = flag
rsp.D = uniqueId
b, _ := json.Marshal(rsp)
_ = conn.WriteMessage(websocket.TextMessage, b)
//循环读客户端信息
@ -155,7 +157,7 @@ func (w *wsConnectItem) heartbeat() {
case <-tick:
//发送心跳信息
if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
logx.Error("发送心跳信息异常,关闭连接:", w.flag, err)
logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err)
w.close()
return
}
@ -167,16 +169,16 @@ func (w *wsConnectItem) heartbeat() {
func (w *wsConnectItem) close() {
w.mutex.Lock()
defer w.mutex.Unlock()
logx.Info("websocket:", w.flag, " is closing...")
logx.Info("websocket:", w.uniqueId, " is closing...")
//发送关闭信息
_ = w.conn.WriteMessage(websocket.CloseMessage, nil)
w.conn.Close()
mapConnPool.Delete(w.flag)
mapConnPool.Delete(w.uniqueId)
if !w.isClose {
w.isClose = true
close(w.closeChan)
}
logx.Info("websocket:", w.flag, " is closed")
logx.Info("websocket:", w.uniqueId, " is closed")
}
// 读取输出返回给客户端

View File

@ -0,0 +1,23 @@
package id_generator
import "sync"
type WebsocketId struct {
nodeId uint64
count uint64
mu sync.Mutex
}
func (wid *WebsocketId) Get() uint64 {
wid.mu.Lock()
defer wid.mu.Unlock()
wid.count++
return (wid.count << 8) | wid.nodeId
}
func NewWebsocketId(NodeId uint8) *WebsocketId {
return &WebsocketId{
nodeId: uint64(NodeId),
count: 0,
}
}