fusenapi/server/websocket/internal/logic/commonnotifylogic.go

137 lines
3.8 KiB
Go
Raw Normal View History

2023-08-24 07:45:23 +00:00
package logic
import (
"context"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
2023-08-25 06:49:24 +00:00
"time"
2023-08-24 07:45:23 +00:00
"fusenapi/server/websocket/internal/svc"
"fusenapi/server/websocket/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type CommonNotifyLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommonNotifyLogic {
return &CommonNotifyLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
2023-08-25 06:49:24 +00:00
// 定义公共回调未找到websocket连接时暂存数据缓冲队列
2023-09-21 09:08:04 +00:00
var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 100)
2023-08-25 06:49:24 +00:00
type commonConnectionNotFoundDataCacheChanItem struct {
retryTimes int //重回队列次数
data types.CommonNotifyReq //数据
}
// 放入缓冲队列
2023-09-04 06:26:55 +00:00
func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) {
2023-08-25 06:49:24 +00:00
select {
case commonConnectionNotFoundDataCacheChan <- data:
return
2023-08-25 07:06:09 +00:00
case <-time.After(time.Millisecond * 50): //超50ms就丢弃
2023-08-25 06:49:24 +00:00
return
}
}
// 取出元素
2023-09-04 06:26:55 +00:00
func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) {
2023-08-25 06:49:24 +00:00
return <-commonConnectionNotFoundDataCacheChan
}
2023-08-28 02:00:18 +00:00
// 消费公共通知未处理的消息(目前是轮巡方式,待优化)
2023-09-04 06:26:55 +00:00
func ConsumeCommonCacheData(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeCommonCacheData panic :", err)
}
}()
go func() {
select {
case <-ctx.Done():
panic("consumeCommonCacheData ctx deadline")
}
}()
for {
time.Sleep(time.Millisecond * 200)
info := popCommonNotifyCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
pushCommonNotifyCache(info)
2023-08-28 02:00:18 +00:00
continue
}
2023-09-04 06:26:55 +00:00
//否则直接丢弃消息
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
2023-08-25 06:49:24 +00:00
}
2023-09-04 06:26:55 +00:00
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
}
2023-08-25 06:49:24 +00:00
}
2023-08-24 07:45:23 +00:00
// 处理进入前逻辑w,r
// func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) {
// }
func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) {
2023-09-04 07:17:30 +00:00
searchConnectType := 1
2023-08-24 07:59:55 +00:00
if req.Wid == "" {
2023-09-04 07:00:11 +00:00
if req.UserId == 0 && req.GuestId == 0 {
return resp.SetStatusWithMessage(basic.CodeOK, "用户信息或者连接标识必须保证至少有其中一个")
2023-09-04 06:26:55 +00:00
}
2023-09-04 07:17:30 +00:00
searchConnectType = 2
2023-08-24 07:45:23 +00:00
}
2023-09-04 06:26:55 +00:00
switch searchConnectType {
2023-09-04 07:17:30 +00:00
case 1: //直接通过唯一标识发消息
2023-09-04 06:26:55 +00:00
//查询websocket连接
value, ok := mapConnPool.Load(req.Wid)
if !ok {
//没找到连接就放到公共缓冲队列
pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{
retryTimes: 20, //重试20次
data: types.CommonNotifyReq{
Wid: req.Wid,
Data: req.Data,
},
})
return resp.SetStatusWithMessage(basic.CodeOK, "success")
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误")
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
2023-09-04 07:17:30 +00:00
case 2: //通过用户信息找连接发送
2023-09-04 07:06:07 +00:00
sendToOutChanByUserIndex(req.UserId, req.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
2023-08-24 07:45:23 +00:00
}
return resp.SetStatusWithMessage(basic.CodeOK, "success")
}
// 处理逻辑后 w,r 如:重定向, resp 必须重新处理
// func (l *CommonNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) {
// // httpx.OkJsonCtx(r.Context(), w, resp)
// }