fusenapi/server/websocket/internal/logic/datatransferlogic.go
laodaming b4bc4ddcf7 fix
2023-09-06 17:18:52 +08:00

576 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logic
//websocket连接
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
"fusenapi/utils/encryption_decryption"
"fusenapi/utils/websocket_data"
"net/http"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"context"
"fusenapi/server/websocket/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type DataTransferLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DataTransferLogic {
return &DataTransferLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
var (
//临时对象缓存池
buffPool = sync.Pool{
New: func() interface{} {
return bytes.Buffer{}
},
}
//升级websocket
upgrader = websocket.Upgrader{
//最大可读取大小 1M
ReadBufferSize: 1024,
//最大可写大小 1M
WriteBufferSize: 1024,
//握手超时时间15s
HandshakeTimeout: time.Second * 15,
//允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
//写的缓冲队列
WriteBufferPool: &buffPool,
//是否支持压缩
EnableCompression: true,
}
//websocket连接存储
mapConnPool = sync.Map{}
//用户标识的连接(白板用户不存)
mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id val是个普通map存储这个用户的所有连接标识
//用户标识的连接增删操作队列
mapUserConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 2000)
//每个websocket连接入口缓冲队列长度
websocketInChanLen = 1000
//每个websocket连接出口缓冲队列长度
websocketOutChanLen = 1000
//渲染任务调度(添加任务/删除任务/修改任务属性缓冲队列长度该队列用于避免map并发读写冲突
renderImageTaskCtlChanLen = 500
//渲染任务缓冲队列长度
renderChanLen = 500
//是否开启debug
openDebug = true
//允许跨域的origin
mapAllowOrigin = map[string]struct{}{
"https://www.fusen.3718.cn": struct{}{},
"http://www.fusen.3718.cn": struct{}{},
}
)
// 用户标识的连接增删操作队列传输的值的结构
type userConnPoolCtlChanItem struct {
userId int64 //必须两个用户id任意一个不为0
guestId int64 //必须两个用户id任意一个不为0
uniqueId string //主连接池唯一标识(添加/删除时候必须)
message []byte //消息(发送消息传的,格式是经过标准输出序列化后的数据)
messageType constants.Websocket //消息类型(发送消息传的)
option int64 //操作 2发消息 1增加 0删除
}
// 每个连接的连接基本属性
type wsConnectItem struct {
conn *websocket.Conn //websocket的连接(基本属性)
userAgent string //用户代理头信息(基本属性,用于重连标识验证因素之一)
logic *DataTransferLogic //logic(基本属性,用于获取上下文,配置或者操作数据库)
closeChan chan struct{} //ws连接关闭chan(基本属性)
isClose bool //是否已经关闭(基本属性)
uniqueId string //ws连接唯一标识(基本属性)
inChan chan []byte //接受消息缓冲队列(基本属性)
outChan chan []byte //要发送回客户端的消息缓冲队列(基本属性)
mutex sync.Mutex //互斥锁(基本属性)
userId int64 //用户id(基本属性)
guestId int64 //游客id(基本属性)
extendRenderProperty extendRenderProperty //扩展云渲染属性(扩展属性)
}
// 请求建立连接升级websocket协议
func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
//判断是不是允许的跨域
if !openDebug {
upgrader.CheckOrigin = func(r *http.Request) bool {
if _, ok := mapAllowOrigin[origin]; !ok {
return false
}
return true
}
}
//把子协议携带的token设置到标准token头信息中
token := r.Header.Get("Sec-Websocket-Protocol")
//有token是正常用户无则是白板用户也可以连接
if token != "" {
r.Header.Set("Authorization", "Bearer "+token)
//设置Sec-Websocket-Protocol
upgrader.Subprotocols = []string{token}
}
//判断下是否火狐浏览器(获取浏览器第一条消息返回有收不到的bug需要延迟1秒)
userAgent := r.Header.Get("User-Agent")
//是否火狐浏览器
isFirefoxBrowser := false
if strings.Contains(userAgent, "Firefox") {
isFirefoxBrowser = true
}
//升级websocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logx.Error("http upgrade websocket err:", err)
return
}
//鉴权不成功后断开
var (
userInfo *auth.UserInfo
isAuth bool
)
isAuth, userInfo = l.checkAuth(r)
if !isAuth {
//未授权响应消息
l.unAuthResponse(conn, isFirefoxBrowser)
conn.Close()
return
}
//设置连接
ws, err := l.setConnPool(conn, userInfo, isFirefoxBrowser, userAgent)
if err != nil {
conn.Close()
return
}
//循环读客户端信息
go ws.reciveBrowserMessage()
//消费出口数据并发送浏览器端
go ws.consumeOutChanData()
//消费入口数据
go ws.consumeInChanData()
//操作连接中渲染任务的增加/删除
go ws.operationRenderTask()
//消费渲染缓冲队列
go ws.consumeRenderImageData()
//心跳
ws.heartbeat()
}
// 设置连接
func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.UserInfo, isFirefoxBrowser bool, userAgent string) (wsConnectItem, error) {
//生成连接唯一标识失败重试10次
uniqueId, err := l.getUniqueId(userInfo, userAgent, 10)
if err != nil {
//发送获取唯一标识失败的消息
l.sendGetUniqueIdErrResponse(conn)
return wsConnectItem{}, err
}
ws := wsConnectItem{
conn: conn,
userAgent: userAgent,
logic: l,
closeChan: make(chan struct{}, 1),
isClose: false,
uniqueId: uniqueId,
inChan: make(chan []byte, websocketInChanLen),
outChan: make(chan []byte, websocketOutChanLen),
mutex: sync.Mutex{},
userId: userInfo.UserId,
guestId: userInfo.GuestId,
extendRenderProperty: extendRenderProperty{
renderImageTask: make(map[string]*renderTask),
renderImageTaskCtlChan: make(chan renderImageControlChanItem, renderImageTaskCtlChanLen),
renderChan: make(chan []byte, renderChanLen),
},
}
//保存连接
mapConnPool.Store(uniqueId, ws)
//非白板用户需要为这个用户建立map索引便于通过用户查询
if userInfo.IsUser() || userInfo.IsGuest() {
createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, uniqueId)
}
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId))
return ws, nil
}
// 添加用户索引池ws连接
func createUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 1,
}
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 从用户索引池删除ws连接
func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 0,
}
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 根据用户索引发现链接并发送消息到出口队列
func sendToOutChanByUserIndex(userId, guestId int64, message []byte) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: "",
message: message,
option: 2,
}
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 消费用户索引创建/删除/发送消息中的任务数据
func ConsumeUserPoolData(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeUserPoolData panic:", err)
}
}()
go func() {
select {
case <-ctx.Done():
panic("ConsumeUserPoolData ctx deadline")
}
}()
for {
select {
case data := <-mapUserConnPoolCtlChan:
key := getmapUserConnPoolUniqueId(data.userId, data.guestId)
switch data.option {
case 2: //发送消息
logx.Info("通过用户id索引发送消息")
mapUserUniqueId, ok := mapUserConnPool[key]
if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", key)
continue
}
for uniqueId, _ := range mapUserUniqueId {
//根据uniqueId查询原始池中连接
mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", key, " 原始uniqueId:", uniqueId)
continue
}
originConn, ok := mapConnPoolVal.(wsConnectItem)
if !ok {
logx.Error("通过用户id索引发送消息,断言原始连接失败用户索引key:", key, " 原始uniqueId:", uniqueId)
continue
}
originConn.sendToOutChan(data.message)
}
case 1: //添加
logx.Info("添加用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
mapUserUniqueId[data.uniqueId] = struct{}{}
} else {
mapUserConnPool[key] = make(map[string]struct{})
mapUserConnPool[key][data.uniqueId] = struct{}{}
}
case 0: //删除
logx.Info("删除用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
delete(mapUserUniqueId, data.uniqueId)
}
default:
}
}
}
}
// 获取mapUserConnPool唯一id
func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) {
if userId > 0 {
guestId = 0
}
return fmt.Sprintf("%d_%d", userId, guestId)
}
// 获取websocket发送到前端使用的数据传输类型debug开启是文本否则是二进制
func getWebsocketBaseTransferDataFormat() int {
if openDebug {
return websocket.TextMessage
}
return websocket.BinaryMessage
}
// 获取唯一id
func (l *DataTransferLogic) getUniqueId(userInfo *auth.UserInfo, userAgent string, retryTimes int) (uniqueId string, err error) {
if retryTimes < 0 {
return "", errors.New("failed to get unique id")
}
//后面拼接上用户id
uniqueId = hex.EncodeToString([]byte(uuid.New().String())) + getUserJoinPart(userInfo.UserId, userInfo.GuestId, userAgent)
//存在则从新获取
if _, ok := mapConnPool.Load(uniqueId); ok {
uniqueId, err = l.getUniqueId(userInfo, userAgent, retryTimes-1)
if err != nil {
return "", err
}
}
//加密
uniqueId, err = encryption_decryption.CBCEncrypt(uniqueId)
if err != nil {
return "", err
}
return uniqueId, nil
}
// 鉴权
func (l *DataTransferLogic) checkAuth(r *http.Request) (isAuth bool, userInfo *auth.UserInfo) {
// 解析JWT token,并对空用户进行判断
userInfo, err := basic.ParseJwtToken(r, l.svcCtx)
if err != nil {
return false, nil
}
if userInfo.UserId > 0 {
userInfo.GuestId = 0
}
//白板用户
return true, userInfo
}
// 鉴权失败通知
func (l *DataTransferLogic) unAuthResponse(conn *websocket.Conn, isFirefoxBrowser bool) {
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_UNAUTH,
D: nil,
}
b, _ := json.Marshal(rsp)
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
//先发一条正常信息
_ = conn.WriteMessage(getWebsocketBaseTransferDataFormat(), b)
//发送关闭信息
_ = conn.WriteMessage(websocket.CloseMessage, nil)
}
// 获取唯一标识失败通知
func (l *DataTransferLogic) sendGetUniqueIdErrResponse(conn *websocket.Conn) {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_CONNECT_ERR,
D: "err to gen unique id ",
}
b, _ := json.Marshal(rsp)
//先发一条正常信息
_ = conn.WriteMessage(getWebsocketBaseTransferDataFormat(), b)
//发送关闭信息
_ = conn.WriteMessage(websocket.CloseMessage, nil)
}
// 心跳检测
func (w *wsConnectItem) heartbeat() {
tick := time.Tick(time.Second * 5)
for {
select {
case <-w.closeChan:
return
case <-tick:
//发送心跳信息
if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err)
w.close()
return
}
}
}
}
// 关闭websocket连接
func (w *wsConnectItem) close() {
w.mutex.Lock()
defer w.mutex.Unlock()
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closing....")
//发送关闭信息
_ = w.conn.WriteMessage(websocket.CloseMessage, nil)
w.conn.Close()
mapConnPool.Delete(w.uniqueId)
if !w.isClose {
w.isClose = true
close(w.closeChan)
//删除用户级索引
deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId)
}
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed")
}
// 读取出口缓冲队列数据输出返回给浏览器端
func (w *wsConnectItem) consumeOutChanData() {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeOutChanData panic:", err)
}
}()
for {
select {
case <-w.closeChan: //如果关闭了
return
case data := <-w.outChan:
if err := w.conn.WriteMessage(websocket.TextMessage, data); err != nil {
logx.Error("websocket write loop err:", err)
w.close()
return
}
}
}
}
// 消费websocket入口数据池中的数据
func (w *wsConnectItem) consumeInChanData() {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeInChanData:", err)
}
}()
for {
select {
case <-w.closeChan:
return
case data := <-w.inChan:
//对不同消息类型分发处理
w.allocationProcessing(data)
}
}
}
// 接受浏览器端发来的消息并写入入口缓冲队列
func (w *wsConnectItem) reciveBrowserMessage() {
defer func() {
if err := recover(); err != nil {
logx.Error("acceptBrowserMessage panic:", err)
}
}()
for {
select {
case <-w.closeChan: //如果关闭了
return
default: //收取消息
msgType, data, err := w.conn.ReadMessage()
if err != nil {
logx.Error("接受信息错误:", err)
//关闭连接
w.close()
return
}
switch msgType {
case websocket.PingMessage, websocket.PongMessage: //心跳消息(过滤不处理)
continue
case websocket.BinaryMessage, websocket.TextMessage: //主要消息
w.sendToInChan(data)
case websocket.CloseMessage: //客户端主动关闭消息
w.close()
}
}
}
}
// 把要传递给客户端的数据放入出口缓冲队列
func (w *wsConnectItem) sendToOutChan(data []byte) {
select {
case <-w.closeChan:
return
case w.outChan <- data:
return
case <-time.After(time.Millisecond * 200): //阻塞超过200ms丢弃
logx.Error("failed to send to out chan,time expired,data:", string(data))
return
}
}
// 发送接受到的消息到入口缓冲队列中
func (w *wsConnectItem) sendToInChan(data []byte) {
select {
case <-w.closeChan: //关闭了
return
case w.inChan <- data:
return
case <-time.After(time.Millisecond * 200): //200豪秒超时丢弃说明超过消费速度了
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_INCOME_CACHE_QUEUE_OVERFLOW, "send message is too frequent,the message is ignore by system:"+string(data)))
return
}
}
// 格式化为websocket标准返回格式
func (w *wsConnectItem) respondDataFormat(msgType constants.Websocket, data interface{}) []byte {
d := websocket_data.DataTransferData{
T: msgType,
D: data,
}
b, _ := json.Marshal(d)
return b
}
// 处理入口缓冲队列中不同类型的数据(分发处理)
func (w *wsConnectItem) allocationProcessing(data []byte) {
var parseInfo websocket_data.DataTransferData
if err := json.Unmarshal(data, &parseInfo); err != nil {
w.incomeDataFormatErrResponse("invalid format of income message:" + string(data))
logx.Error("invalid format of websocket message:", err)
return
}
d, _ := json.Marshal(parseInfo.D)
//获取工厂实例
processor := w.newAllocationProcessor(parseInfo.T)
if processor == nil {
logx.Error("未知消息类型:", string(data))
return
}
//执行工厂方法
processor.allocationMessage(w, d)
}