This commit is contained in:
laodaming 2023-08-25 14:49:24 +08:00
parent a68a060a06
commit 826ba92b8e

View File

@ -5,6 +5,8 @@ import (
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
"sync"
"time"
"fusenapi/server/websocket/internal/svc"
"fusenapi/server/websocket/internal/types"
@ -26,6 +28,68 @@ func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm
}
}
// 定义公共回调未找到websocket连接时暂存数据缓冲队列
var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000)
type commonConnectionNotFoundDataCacheChanItem struct {
retryTimes int //重回队列次数
data types.CommonNotifyReq //数据
}
// 放入缓冲队列
func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCacheChanItem) {
select {
case commonConnectionNotFoundDataCacheChan <- data:
return
case <-time.After(time.Second): //超1秒就丢弃
return
}
}
// 取出元素
func (l *CommonNotifyLogic) popCommonCache() (data commonConnectionNotFoundDataCacheChanItem) {
return <-commonConnectionNotFoundDataCacheChan
}
// 保证处理消息就一个循环在执行
var consumeCommonCacheData sync.Once
// 消费公共通知未处理的消息
func (l *CommonNotifyLogic) consumeCommonCacheData() {
//单例
consumeCommonCacheData.Do(func() {
tick := time.Tick(time.Millisecond * 200)
for {
select {
case <-tick: //200毫秒触发一次
info := l.popCommonCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
l.pushCommonCache(info)
continue
}
//否则直接丢弃消息
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
}
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
}
}
})
}
// 处理进入前逻辑w,r
// func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) {
// }
@ -35,10 +99,20 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a
if req.Wid == "" {
return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty")
}
//触发消费公共未处理的消息(只有第一次有用)
l.consumeCommonCacheData()
//查询websocket连接
value, ok := mapConnPool.Load(req.Wid)
if !ok {
return resp.SetStatusWithMessage(basic.CodeOK, "success,but connection is not found")
//没找到连接就放到公共缓冲队列
l.pushCommonCache(commonConnectionNotFoundDataCacheChanItem{
retryTimes: 4, //重试4次
data: types.CommonNotifyReq{
Wid: req.Wid,
Data: req.Data,
},
})
return resp.SetStatusWithMessage(basic.CodeOK, "success")
}
//断言连接
ws, ok := value.(wsConnectItem)