fusenapi/server/websocket/internal/logic/datatransferlogic.go
laodaming 90277394fa fix
2023-10-11 17:58:38 +08:00

499 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logic
//websocket连接
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
"fusenapi/utils/encryption_decryption"
"fusenapi/utils/websocket_data"
"net/http"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"context"
"fusenapi/server/websocket/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type DataTransferLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DataTransferLogic {
return &DataTransferLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
var (
//临时对象缓存池
buffPool = sync.Pool{
New: func() interface{} {
return bytes.Buffer{}
},
}
//升级websocket
upgrader = websocket.Upgrader{
//最大可读取大小 1M
ReadBufferSize: 1024,
//最大可写大小 1M
WriteBufferSize: 1024,
//握手超时时间15s
HandshakeTimeout: time.Second * 15,
//允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
//写的缓冲队列
WriteBufferPool: &buffPool,
//是否支持压缩
EnableCompression: true,
}
//websocket连接存储
mapConnPool = sync.Map{}
//每个websocket连接入口缓冲队列长度默认值
websocketInChanLen = 2000
//每个websocket连接出口缓冲队列长度默认值
websocketOutChanLen = 2000
//是否开启debug
openDebug = true
//允许跨域的origin
mapAllowOrigin = map[string]struct{}{
"https://www.fusen.3718.cn": struct{}{},
"http://www.fusen.3718.cn": struct{}{},
}
)
// 用户标识的连接增删操作队列传输的值的结构
type userConnPoolCtlChanItem struct {
userId int64 //必须两个用户id任意一个不为0
guestId int64 //必须两个用户id任意一个不为0
uniqueId string //主连接池唯一标识(添加/删除时候必须)
message []byte //消息(发送消息传的,格式是经过标准输出序列化后的数据)
//messageType constants.Websocket //消息类型(发送消息传的)
option int64 //操作 2发消息 1增加 0删除
}
// 每个连接的连接基本属性
type wsConnectItem struct {
conn *websocket.Conn //websocket的连接(基本属性)
userAgent string //用户代理头信息(基本属性,用于重连标识验证因素之一)
logic *DataTransferLogic //logic(基本属性,用于获取上下文,配置或者操作数据库)
closeChan chan struct{} //ws连接关闭chan(基本属性)
isClose bool //是否已经关闭(基本属性)
uniqueId string //ws连接唯一标识(基本属性)
inChan chan []byte //接受消息缓冲队列(基本属性)
outChan chan []byte //要发送回客户端的消息缓冲队列(基本属性)
mutex sync.Mutex //互斥锁(基本属性)
userId int64 //用户id(基本属性)
guestId int64 //游客id(基本属性)
extendRenderProperty extendRenderProperty //扩展云渲染属性(扩展属性)
}
// 请求建立连接升级websocket协议
func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
//判断是不是允许的跨域
if !openDebug {
upgrader.CheckOrigin = func(r *http.Request) bool {
if _, ok := mapAllowOrigin[origin]; !ok {
return false
}
return true
}
}
//把子协议携带的token设置到标准token头信息中
secWebsocketProtocol := r.Header.Get("Sec-Websocket-Protocol")
oldWid := ""
//有token是正常用户无则是白板用户也可以连接
if secWebsocketProtocol != "" {
s := strings.Split(secWebsocketProtocol, ",")
if len(s) != 2 {
w.Write([]byte("invalid secWebsocketProtocol param"))
return
}
//有效token
if s[0] != "empty_token" {
r.Header.Set("Authorization", "Bearer "+s[0])
}
//有效wid
if s[1] != "empty_wid" {
oldWid = s[1]
}
//设置Sec-Websocket-Protocol
upgrader.Subprotocols = []string{secWebsocketProtocol}
}
//判断下是否火狐浏览器(获取浏览器第一条消息返回有收不到的bug需要延迟1秒)
userAgent := r.Header.Get("User-Agent")
//是否火狐浏览器
isFirefoxBrowser := false
if strings.Contains(userAgent, "Firefox") {
isFirefoxBrowser = true
}
//升级websocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logx.Error("http upgrade websocket err:", err)
return
}
//支持写压缩
conn.EnableWriteCompression(true)
//鉴权不成功后断开
var (
userInfo *auth.UserInfo
isAuth bool
)
isAuth, userInfo = l.checkAuth(r)
if !isAuth {
//未授权响应消息
l.unAuthResponse(conn, isFirefoxBrowser)
conn.Close()
return
}
//设置连接
ws, err := l.setConnPool(conn, userInfo, isFirefoxBrowser, userAgent, oldWid)
if err != nil {
conn.Close()
return
}
//循环读客户端信息
go ws.reciveBrowserMessage()
//消费出口数据并发送浏览器端
go ws.consumeOutChanData()
//消费入口数据
go ws.consumeInChanData()
//消费渲染缓冲队列
go ws.consumeRenderImageData()
//心跳
ws.heartbeat()
}
// 设置连接
func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.UserInfo, isFirefoxBrowser bool, userAgent, oldWid string) (wsConnectItem, error) {
//生成连接唯一标识失败重试10次
uniqueId, err := l.getUniqueId(userInfo, userAgent, 10)
if err != nil {
//发送获取唯一标识失败的消息
l.sendGetUniqueIdErrResponse(conn)
return wsConnectItem{}, err
}
if oldWid != "" {
//解析传入的wid是不是属于自己的用户的
decryptionWid, err := encryption_decryption.CBCDecrypt(oldWid)
if err != nil {
logx.Error(err)
return wsConnectItem{}, errors.New("解码wid失败")
}
lendecryptionWid := len(decryptionWid)
//合成client后缀,不是同个后缀的不能复用
userPart := getUserJoinPart(userInfo.UserId, userInfo.GuestId, userAgent)
lenUserPart := len(userPart)
canUseOldWid := true
//长度太短
if lendecryptionWid <= lenUserPart {
logx.Info("复用的连接标识太短,不符合重用条件")
canUseOldWid = false
}
//尾部不同不能复用
if decryptionWid[lendecryptionWid-lenUserPart:] != userPart {
logx.Info("尾部用户信息不同,不符合重用条件")
canUseOldWid = false
}
//存在是不能给他申请重新绑定
if _, ok := mapConnPool.Load(oldWid); ok {
logx.Info("复用的连接标识已被其他客户端使用,不符合重用条件")
canUseOldWid = false
}
//检测通过可以用旧的
if canUseOldWid {
logx.Info("====复用旧的ws连接成功====")
uniqueId = oldWid
}
}
renderCtx, renderCtxCancelFunc := context.WithCancel(l.ctx)
ws := wsConnectItem{
conn: conn,
userAgent: userAgent,
logic: l,
closeChan: make(chan struct{}, 1),
isClose: false,
uniqueId: uniqueId,
inChan: make(chan []byte, websocketInChanLen),
outChan: make(chan []byte, websocketOutChanLen),
mutex: sync.Mutex{},
userId: userInfo.UserId,
guestId: userInfo.GuestId,
extendRenderProperty: extendRenderProperty{
renderChan: make(chan websocket_data.RenderImageReqMsg, renderChanLen),
renderCtx: renderCtx,
renderCtxCancelFunc: renderCtxCancelFunc,
},
}
//保存连接
mapConnPool.Store(uniqueId, ws)
//非白板用户需要为这个用户建立map索引便于通过用户查询
createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, uniqueId)
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId))
//发送累加统计连接书
increaseWebsocketConnectCount()
return ws, nil
}
// 获取websocket发送到前端使用的数据传输类型debug开启是文本否则是二进制
func getWebsocketBaseTransferDataFormat() int {
if openDebug {
return websocket.TextMessage
}
return websocket.BinaryMessage
}
// 获取唯一id
func (l *DataTransferLogic) getUniqueId(userInfo *auth.UserInfo, userAgent string, retryTimes int) (uniqueId string, err error) {
if retryTimes < 0 {
return "", errors.New("failed to get unique id")
}
//后面拼接上用户id
uniqueId = hex.EncodeToString([]byte(uuid.New().String())) + getUserJoinPart(userInfo.UserId, userInfo.GuestId, userAgent)
//存在则从新获取
if _, ok := mapConnPool.Load(uniqueId); ok {
uniqueId, err = l.getUniqueId(userInfo, userAgent, retryTimes-1)
if err != nil {
return "", err
}
}
//加密
uniqueId, err = encryption_decryption.CBCEncrypt(uniqueId)
if err != nil {
return "", err
}
return uniqueId, nil
}
// 鉴权
func (l *DataTransferLogic) checkAuth(r *http.Request) (isAuth bool, userInfo *auth.UserInfo) {
// 解析JWT token,并对空用户进行判断
userInfo, err := basic.ParseJwtToken(r, l.svcCtx)
if err != nil {
return false, nil
}
if userInfo.UserId > 0 {
userInfo.GuestId = 0
}
//白板用户
return true, userInfo
}
// 鉴权失败通知
func (l *DataTransferLogic) unAuthResponse(conn *websocket.Conn, isFirefoxBrowser bool) {
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_UNAUTH,
D: nil,
}
b, _ := json.Marshal(rsp)
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
//先发一条正常信息
_ = conn.WriteMessage(getWebsocketBaseTransferDataFormat(), b)
//发送关闭信息
_ = conn.WriteMessage(websocket.CloseMessage, nil)
}
// 获取唯一标识失败通知
func (l *DataTransferLogic) sendGetUniqueIdErrResponse(conn *websocket.Conn) {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_CONNECT_ERR,
D: "err to gen unique id ",
}
b, _ := json.Marshal(rsp)
//先发一条正常信息
_ = conn.WriteMessage(getWebsocketBaseTransferDataFormat(), b)
//发送关闭信息
_ = conn.WriteMessage(websocket.CloseMessage, nil)
}
// 心跳检测
func (w *wsConnectItem) heartbeat() {
tick := time.Tick(time.Second * 5)
for {
select {
case <-w.closeChan:
return
case <-tick:
//发送心跳信息
if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err)
w.close()
return
}
}
}
}
// 关闭websocket连接
func (w *wsConnectItem) close() {
w.mutex.Lock()
defer w.mutex.Unlock()
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closing....")
//发送关闭信息
_ = w.conn.WriteMessage(websocket.CloseMessage, nil)
w.conn.Close()
mapConnPool.Delete(w.uniqueId)
if !w.isClose {
w.isClose = true
close(w.closeChan)
//删除用户级索引
deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId)
//减少连接数统计
decreaseWebsocketConnectCount()
}
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed")
}
// 读取出口缓冲队列数据输出返回给浏览器端
func (w *wsConnectItem) consumeOutChanData() {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeOutChanData panic:", err)
}
}()
for {
select {
case <-w.closeChan: //如果关闭了
return
case data := <-w.outChan:
if err := w.conn.WriteMessage(websocket.TextMessage, data); err != nil {
logx.Error("websocket write loop err:", err)
w.close()
return
}
}
}
}
// 消费websocket入口数据池中的数据
func (w *wsConnectItem) consumeInChanData() {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeInChanData:", err)
}
}()
for {
select {
case <-w.closeChan:
return
case data := <-w.inChan:
//对不同消息类型分发处理
w.allocationProcessing(data)
}
}
}
// 接受浏览器端发来的消息并写入入口缓冲队列
func (w *wsConnectItem) reciveBrowserMessage() {
defer func() {
if err := recover(); err != nil {
logx.Error("acceptBrowserMessage panic:", err)
}
}()
for {
select {
case <-w.closeChan: //如果关闭了
return
default: //收取消息
msgType, data, err := w.conn.ReadMessage()
if err != nil {
logx.Error("接受信息错误:", err)
//关闭连接
w.close()
return
}
switch msgType {
case websocket.PingMessage, websocket.PongMessage: //心跳消息(过滤不处理)
continue
case websocket.BinaryMessage, websocket.TextMessage: //主要消息
w.sendToInChan(data)
case websocket.CloseMessage: //客户端主动关闭消息
w.close()
}
}
}
}
// 把要传递给客户端的数据放入出口缓冲队列
func (w *wsConnectItem) sendToOutChan(data []byte) {
select {
case <-w.closeChan:
return
case w.outChan <- data:
return
}
}
// 发送接受到的消息到入口缓冲队列中
func (w *wsConnectItem) sendToInChan(data []byte) {
select {
case <-w.closeChan: //关闭了
return
case w.inChan <- data:
return
}
}
// 格式化为websocket标准返回格式
func (w *wsConnectItem) respondDataFormat(msgType constants.Websocket, data interface{}) []byte {
d := websocket_data.DataTransferData{
T: msgType,
D: data,
}
b, _ := json.Marshal(d)
return b
}
// 获取用户拼接部分(复用标识用到)
func getUserJoinPart(userId, guestId int64, userAgent string) string {
if userId > 0 {
guestId = 0
}
return fmt.Sprintf("|_%d_%d_|_%s_|", userId, guestId, userAgent)
}
// 处理入口缓冲队列中不同类型的数据(分发处理)
func (w *wsConnectItem) allocationProcessing(data []byte) {
var parseInfo websocket_data.DataTransferData
if err := json.Unmarshal(data, &parseInfo); err != nil {
w.incomeDataFormatErrResponse("invalid format of income message:" + string(data))
logx.Error("invalid format of websocket message:", err)
return
}
d, _ := json.Marshal(parseInfo.D)
//获取工厂实例
processor := w.newAllocationProcessor(parseInfo.T)
if processor == nil {
logx.Error("未知消息类型:", string(data))
return
}
//执行工厂方法
processor.allocationMessage(w, d)
}