package logic import ( "context" "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" "time" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type CommonNotifyLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommonNotifyLogic { return &CommonNotifyLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } // 定义公共回调未找到websocket连接时暂存数据缓冲队列 var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000) type commonConnectionNotFoundDataCacheChanItem struct { retryTimes int //重回队列次数 data types.CommonNotifyReq //数据 } // 放入缓冲队列 func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { select { case commonConnectionNotFoundDataCacheChan <- data: return case <-time.After(time.Millisecond * 50): //超50ms就丢弃 return } } // 取出元素 func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { return <-commonConnectionNotFoundDataCacheChan } // 消费公共通知未处理的消息(目前是轮巡方式,待优化) func ConsumeCommonCacheData(ctx context.Context) { defer func() { if err := recover(); err != nil { logx.Error("consumeCommonCacheData panic :", err) } }() go func() { select { case <-ctx.Done(): panic("consumeCommonCacheData ctx deadline") } }() for { time.Sleep(time.Millisecond * 200) info := popCommonNotifyCache() //查询websocket连接 value, ok := mapConnPool.Load(info.data.Wid) //没有连接 if !ok { info.retryTimes-- //大于0,则放回队列 if info.retryTimes > 0 { pushCommonNotifyCache(info) continue } //否则直接丢弃消息 continue } //断言连接 ws, ok := value.(wsConnectItem) if !ok { logx.Error("渲染回调断言websocket连接失败") continue } //发送 ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) } } // 处理进入前逻辑w,r // func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { // } func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { searchConnectType := "uniqueId" if req.Wid == "" { if req.UserId == 0 && req.GuestId == 0 { return resp.SetStatusWithMessage(basic.CodeOK, "用户信息或者连接标识必须保证至少有其中一个") } searchConnectType = "userInfo" } switch searchConnectType { case "uniqueId": //直接通过唯一标识发消息 //查询websocket连接 value, ok := mapConnPool.Load(req.Wid) if !ok { //没找到连接就放到公共缓冲队列 pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ retryTimes: 20, //重试20次 data: types.CommonNotifyReq{ Wid: req.Wid, Data: req.Data, }, }) return resp.SetStatusWithMessage(basic.CodeOK, "success") } //断言连接 ws, ok := value.(wsConnectItem) if !ok { logx.Error("渲染回调断言websocket连接失败") return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") } ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) case "userInfo": //通过用户信息找连接发送 sendToOutChanByUserIndex(userinfo.UserId, userinfo.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) } return resp.SetStatusWithMessage(basic.CodeOK, "success") } // 处理逻辑后 w,r 如:重定向, resp 必须重新处理 // func (l *CommonNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) { // // httpx.OkJsonCtx(r.Context(), w, resp) // }