fusenapi/server/websocket/internal/logic/datatransferlogic.go
laodaming a9d565aff9 fix
2023-08-25 12:21:02 +08:00

393 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logic
//websocket连接
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/encryption_decryption"
"fusenapi/utils/websocket_data"
"net/http"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"context"
"fusenapi/server/websocket/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type DataTransferLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DataTransferLogic {
return &DataTransferLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
var (
//临时缓存对象池
buffPool = sync.Pool{
New: func() interface{} {
return bytes.Buffer{}
},
}
//升级websocket
upgrader = websocket.Upgrader{
//最大可读取大小 1M
ReadBufferSize: 1024,
//最大可写大小 1M
WriteBufferSize: 1024,
//握手超时时间15s
HandshakeTimeout: time.Second * 15,
//允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
//写的缓存池
WriteBufferPool: &buffPool,
//是否支持压缩
EnableCompression: false,
}
//websocket连接存储
mapConnPool = sync.Map{}
)
// 每个连接的连接基本属性
type wsConnectItem struct {
conn *websocket.Conn //websocket的连接(基本属性)
logic *DataTransferLogic //logic(基本属性,用于获取上下文,配置或者操作数据库)
closeChan chan struct{} //ws连接关闭chan(基本属性)
isClose bool //是否已经关闭(基本属性)
uniqueId string //ws连接唯一标识(基本属性)
inChan chan []byte //接受消息缓冲池(基本属性)
outChan chan []byte //要发送回客户端的消息缓冲池(基本属性)
mutex sync.Mutex //互斥锁(基本属性)
userId int64 //用户id(基本属性)
guestId int64 //游客id(基本属性)
extendRenderProperty extendRenderProperty //扩展云渲染属性(扩展属性)
}
// 请求建立连接升级websocket协议
func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) {
//把子协议携带的token设置到标准token头信息中
token := r.Header.Get("Sec-Websocket-Protocol")
//有token是正常用户无则是白板用户也可以连接
if token != "" {
r.Header.Set("Authorization", "Bearer "+token)
//设置Sec-Websocket-Protocol
upgrader.Subprotocols = []string{token}
}
//判断下是否火狐浏览器(获取浏览器第一条消息返回有收不到的bug需要延迟1秒)
userAgent := r.Header.Get("User-Agent")
//是否火狐浏览器
isFirefoxBrowser := false
if strings.Contains(userAgent, "Firefox") {
isFirefoxBrowser = true
}
//升级websocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logx.Error("http upgrade websocket err:", err)
return
}
//鉴权不成功后断开
var (
userInfo *auth.UserInfo
isAuth bool
)
isAuth, userInfo = l.checkAuth(r)
if !isAuth {
//未授权响应消息
l.unAuthResponse(conn, isFirefoxBrowser)
conn.Close()
return
}
//设置连接
ws, err := l.setConnPool(conn, *userInfo, isFirefoxBrowser)
if err != nil {
conn.Close()
return
}
//循环读客户端信息
go ws.readLoop()
//循环把数据发送给客户端
go ws.writeLoop()
//推消息到云渲染
go ws.sendLoop()
//操作连接中渲染任务的增加/删除
go ws.operationRenderTask()
//消费渲染缓冲队列
go ws.renderImage()
//心跳
ws.heartbeat()
}
// 设置连接
func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo, isFirefoxBrowser bool) (wsConnectItem, error) {
//生成连接唯一标识失败重试10次
uniqueId, err := l.getUniqueId(userInfo, 10)
if err != nil {
//发送获取唯一标识失败的消息
l.sendGetUniqueIdErrResponse(conn)
return wsConnectItem{}, err
}
ws := wsConnectItem{
conn: conn,
logic: l,
uniqueId: uniqueId,
closeChan: make(chan struct{}, 1),
inChan: make(chan []byte, 1000),
outChan: make(chan []byte, 1000),
userId: userInfo.UserId,
guestId: userInfo.GuestId,
extendRenderProperty: extendRenderProperty{
renderImageTask: make(map[string]*renderTask),
renderImageTaskCtlChan: make(chan renderImageControlChanItem, 500),
renderChan: make(chan []byte, 500),
},
}
//保存连接
mapConnPool.Store(uniqueId, ws)
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId))
return ws, nil
}
// 获取唯一id
func (l *DataTransferLogic) getUniqueId(userInfo auth.UserInfo, retryTimes int) (uniqueId string, err error) {
if retryTimes < 0 {
return "", errors.New("failed to get unique id")
}
//后面拼接上用户id
uniqueId = hex.EncodeToString([]byte(uuid.New().String())) + getUserJoinPart(userInfo.UserId, userInfo.GuestId)
//存在则从新获取
if _, ok := mapConnPool.Load(uniqueId); ok {
uniqueId, err = l.getUniqueId(userInfo, retryTimes-1)
if err != nil {
return "", err
}
}
//加密
uniqueId, err = encryption_decryption.CBCEncrypt(uniqueId)
if err != nil {
return "", err
}
return uniqueId, nil
}
// 鉴权
func (l *DataTransferLogic) checkAuth(r *http.Request) (isAuth bool, userInfo *auth.UserInfo) {
// 解析JWT token,并对空用户进行判断
claims, err := l.svcCtx.ParseJwtToken(r)
// 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息
if err != nil {
logx.Error(err)
return false, nil
}
if claims != nil {
// 从token中获取对应的用户信息
userInfo, err = auth.GetUserInfoFormMapClaims(claims)
// 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息
if err != nil {
logx.Error(err)
return false, nil
}
userInfo.UserId = 39
userInfo.GuestId = 0
return true, userInfo
}
//白板用户
return true, &auth.UserInfo{}
}
// 鉴权失败通知
func (l *DataTransferLogic) unAuthResponse(conn *websocket.Conn, isFirefoxBrowser bool) {
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_UNAUTH,
D: nil,
}
b, _ := json.Marshal(rsp)
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
//先发一条正常信息
_ = conn.WriteMessage(websocket.TextMessage, b)
//发送关闭信息
_ = conn.WriteMessage(websocket.CloseMessage, nil)
}
// 获取唯一标识失败通知
func (l *DataTransferLogic) sendGetUniqueIdErrResponse(conn *websocket.Conn) {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_GEN_UNIQUE_ID_ERR,
D: "err to gen unique id ",
}
b, _ := json.Marshal(rsp)
//先发一条正常信息
_ = conn.WriteMessage(websocket.TextMessage, b)
//发送关闭信息
_ = conn.WriteMessage(websocket.CloseMessage, nil)
}
// 心跳检测
func (w *wsConnectItem) heartbeat() {
tick := time.Tick(time.Second * 5)
for {
select {
case <-w.closeChan:
return
case <-tick:
//发送心跳信息
if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err)
w.close()
return
}
}
}
}
// 关闭websocket连接
func (w *wsConnectItem) close() {
w.mutex.Lock()
defer w.mutex.Unlock()
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closing....")
//发送关闭信息
_ = w.conn.WriteMessage(websocket.CloseMessage, nil)
w.conn.Close()
mapConnPool.Delete(w.uniqueId)
if !w.isClose {
w.isClose = true
close(w.closeChan)
}
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed")
}
// 读取出口缓冲池数据输出返回给浏览器端
func (w *wsConnectItem) writeLoop() {
defer func() {
if err := recover(); err != nil {
logx.Error("write loop panic:", err)
}
}()
for {
select {
case <-w.closeChan: //如果关闭了
return
case data := <-w.outChan:
if err := w.conn.WriteMessage(websocket.TextMessage, data); err != nil {
logx.Error("websocket write loop err:", err)
w.close()
return
}
}
}
}
// 接受客户端发来的消息并写入入口缓冲池
func (w *wsConnectItem) readLoop() {
defer func() {
if err := recover(); err != nil {
logx.Error("read loop panic:", err)
}
}()
for {
select {
case <-w.closeChan: //如果关闭了
return
default:
msgType, data, err := w.conn.ReadMessage()
if err != nil {
logx.Error("接受信息错误:", err)
//关闭连接
w.close()
return
}
//ping的消息不处理
if msgType != websocket.PingMessage {
//消息传入缓冲通道
w.inChan <- data
}
}
}
}
// 消费websocket入口数据池中的数据
func (w *wsConnectItem) sendLoop() {
defer func() {
if err := recover(); err != nil {
logx.Error("send loop panic:", err)
}
}()
for {
select {
case <-w.closeChan:
return
case data := <-w.inChan:
w.dealwithReciveData(data)
}
}
}
// 把要传递给客户端的数据放入出口缓冲池
func (w *wsConnectItem) sendToOutChan(data []byte) {
select {
case <-w.closeChan:
return
case w.outChan <- data:
return
case <-time.After(time.Second * 3): //阻塞超过3秒丢弃
return
}
}
// 格式化为websocket标准返回格式
func (w *wsConnectItem) respondDataFormat(msgType constants.Websocket, data interface{}) []byte {
d := websocket_data.DataTransferData{
T: msgType,
D: data,
}
b, _ := json.Marshal(d)
return b
}
// 处理入口缓冲池中不同类型的数据(分发处理)
func (w *wsConnectItem) dealwithReciveData(data []byte) {
var parseInfo websocket_data.DataTransferData
if err := json.Unmarshal(data, &parseInfo); err != nil {
w.incomeDataFormatErrResponse("invalid format of income message:" + string(data))
logx.Error("invalid format of websocket message:", err)
return
}
d, _ := json.Marshal(parseInfo.D)
//分消息类型给到不同逻辑处理,可扩展
switch parseInfo.T {
//图片渲染
case constants.WEBSOCKET_RENDER_IMAGE:
w.sendToRenderChan(d)
//刷新重连请求恢复上次连接的标识
case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT:
w.reuseLastConnect(d)
default:
logx.Error("未知消息类型:", parseInfo.T)
}
}