package logic import ( "context" "fmt" "github.com/zeromicro/go-zero/core/logx" "sync" "time" ) type websocketStatType string const ( TYPE_CUR_CONNECT_COUNT websocketStatType = "TYPE_CUR_CONNECT_COUNT" //ws连接数 TYPE_CUR_COMBINE_IMAGE_COUNT websocketStatType = "TYPE_CUR_COMBINE_IMAGE_COUNT" //合图数 TYPE_CUR_UNITY_HANDLE_COUNT websocketStatType = "TYPE_CUR_UNITY_HANDLE_COUNT" //unity处理数 ) type websocketStatItem struct { UserId int64 `json:"user_id"` GuestId int64 `json:"guest_id"` Type websocketStatType `json:"type"` //类型 Value int `json:"value"` //数值 UnityRenderLogId int64 `json:"unity_render_log_id"` //unity渲染的日志id } type mapUserWsStatItem struct { CurCombineCount int `json:"cur_combine_count"` //当前合图数 CurWsConnectCount int `json:"cur_ws_connect_count"` //当前连接数 CurUnityHandleCount int `json:"cur_unity_handle_count"` //当前unity处理数 } // 统计信息 var ( //用户连接统计 mapUserWsStat = sync.Map{} //添加or减少连接的控制chan websocketStat = make(chan websocketStatItem, 1000) ) // 累增ws连接数计数 func increaseWebsocketConnectCount(userId, guestId int64) { data := websocketStatItem{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_CONNECT_COUNT, Value: 1, } select { case websocketStat <- data: return case <-time.After(time.Millisecond * 200): logx.Error("increaseWebsocketConnectCount 输入管道超时,丢弃消息") return } } // 减少ws连接数计数 func decreaseWebsocketConnectCount(userId, guestId int64) { data := websocketStatItem{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_CONNECT_COUNT, Value: -1, } select { case websocketStat <- data: return case <-time.After(time.Millisecond * 200): logx.Error("decreaseWebsocketConnectCount 输入管道超时,丢弃消息") return } } // 累增合图请求数计数 func increaseCombineRequestCount(userId, guestId int64) { data := websocketStatItem{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_COMBINE_IMAGE_COUNT, Value: 1, } select { case websocketStat <- data: return case <-time.After(time.Millisecond * 200): logx.Error("increaseCombineRequestCount 输入管道超时,丢弃消息") return } } // 减少合图请求数计数 func decreaseCombineRequestCount(userId, guestId int64) { data := websocketStatItem{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_COMBINE_IMAGE_COUNT, Value: -1, } select { case websocketStat <- data: return case <-time.After(time.Millisecond * 200): logx.Error("decreaseCombineRequestCount 输入管道超时,丢弃消息") return } } // 累增unity请求数计数 func increaseUnityRequestCount(userId, guestId int64) { data := websocketStatItem{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_UNITY_HANDLE_COUNT, Value: 1, } select { case websocketStat <- data: return case <-time.After(time.Millisecond * 200): logx.Error("decreaseCombineRequestCount 输入管道超时,丢弃消息") return } } // 减少unity请求数计数 func decreaseUnityRequestCount(userId, guestId int64) { data := websocketStatItem{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_UNITY_HANDLE_COUNT, Value: -1, } select { case websocketStat <- data: return case <-time.After(time.Millisecond * 200): logx.Error("decreaseUnityRequestCount 输入管道超时,丢弃消息") return } } // 消费数据 func ConsumeWebsocketStatData(ctx context.Context) { defer func() { if err := recover(); err != nil { logx.Error("ConsumeWebsocketStatData panic:", err) } }() go func() { select { case <-ctx.Done(): panic("ConsumeWebsocketStatData ctx deadline") } }() for { select { case data := <-websocketStat: key := fmt.Sprintf("%d_%d", data.UserId, data.GuestId) statData, ok := mapUserWsStat.Load(key) switch data.Type { case TYPE_CUR_CONNECT_COUNT: //ws连接计数 if !ok { mapUserWsStat.Store(key, mapUserWsStatItem{ CurWsConnectCount: data.Value, }) continue } stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.CurWsConnectCount += data.Value //没有连接就删除 if stat.CurWsConnectCount <= 0 { mapUserWsStat.Delete(key) continue } //保存统计 mapUserWsStat.Store(key, stat) case TYPE_CUR_COMBINE_IMAGE_COUNT: //请求算法合图计数 //不存在 if !ok { continue } //存在 stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.CurCombineCount += data.Value if stat.CurCombineCount < 0 { stat.CurCombineCount = 0 } //保存统计 mapUserWsStat.Store(key, stat) case TYPE_CUR_UNITY_HANDLE_COUNT: //unity处理计数 if !ok { continue } //存在 stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.CurUnityHandleCount += data.Value if stat.CurUnityHandleCount <= 0 { stat.CurUnityHandleCount = 0 } //保存统计 mapUserWsStat.Store(key, stat) } } } }