package logic import ( "context" "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" "sync" "time" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type CommonNotifyLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommonNotifyLogic { return &CommonNotifyLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } // 定义公共回调未找到websocket连接时暂存数据缓冲队列 var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000) type commonConnectionNotFoundDataCacheChanItem struct { retryTimes int //重回队列次数 data types.CommonNotifyReq //数据 } // 放入缓冲队列 func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { select { case commonConnectionNotFoundDataCacheChan <- data: return case <-time.After(time.Millisecond * 50): //超50ms就丢弃 return } } // 取出元素 func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { return <-commonConnectionNotFoundDataCacheChan } // 保证处理消息就一个循环在执行 var consumeCommonCacheData sync.Once // 消费公共通知未处理的消息(目前是轮巡方式,待优化) func (l *CommonNotifyLogic) consumeCommonCacheData() { //单例 consumeCommonCacheData.Do(func() { for { time.Sleep(time.Millisecond * 200) info := l.popCommonNotifyCache() //查询websocket连接 value, ok := mapConnPool.Load(info.data.Wid) //没有连接 if !ok { info.retryTimes-- //大于0,则放回队列 if info.retryTimes > 0 { l.pushCommonNotifyCache(info) continue } //否则直接丢弃消息 continue } //断言连接 ws, ok := value.(wsConnectItem) if !ok { logx.Error("渲染回调断言websocket连接失败") continue } //发送 ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) } }) } // 处理进入前逻辑w,r // func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { // } func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { //websocket连接id不能为空 if req.Wid == "" { return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") } //触发消费公共未处理的消息(该方法是单例) go l.consumeCommonCacheData() //查询websocket连接 value, ok := mapConnPool.Load(req.Wid) if !ok { //没找到连接就放到公共缓冲队列 go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ retryTimes: 20, //重试20次 data: types.CommonNotifyReq{ Wid: req.Wid, Data: req.Data, }, }) return resp.SetStatusWithMessage(basic.CodeOK, "success") } //断言连接 ws, ok := value.(wsConnectItem) if !ok { logx.Error("渲染回调断言websocket连接失败") return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") } ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) return resp.SetStatusWithMessage(basic.CodeOK, "success") } // 处理逻辑后 w,r 如:重定向, resp 必须重新处理 // func (l *CommonNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) { // // httpx.OkJsonCtx(r.Context(), w, resp) // }