package logic import ( "context" "fmt" "github.com/zeromicro/go-zero/core/logx" "sync" "time" ) type websocketStatType string const ( TYPE_CUR_CONNECT_COUNT websocketStatType = "TYPE_CUR_CONNECT_COUNT" //ws连接数 TYPE_CUR_COMBINE_IMAGE_COUNT websocketStatType = "TYPE_CUR_COMBINE_IMAGE_COUNT" //合图数 TYPE_CUR_COMBINE_IMAGE_ERR_COUNT websocketStatType = "TYPE_CUR_COMBINE_IMAGE_ERR_COUNT" //合图失败数 TYPE_CUR_UNITY_HANDLE_COUNT websocketStatType = "TYPE_CUR_UNITY_HANDLE_COUNT" //unity处理数 TYPE_UNITY_ERR_COUNT websocketStatType = "TYPE_UNITY_ERR_COUNT" //unity错误处理数 ) type websocketStatData struct { UserId int64 `json:"user_id"` //用户id GuestId int64 `json:"guest_id"` //游客id Type websocketStatType `json:"type"` //类型 Num int `json:"num"` //数值 } type mapUserWsStatItem struct { CurCombineCount int `json:"cur_combine_count"` //当前合图数 CombineErrCount int `json:"combine_err_count"` //合图失败数 CurWsConnectCount int `json:"cur_ws_connect_count"` //当前连接数 CurUnityHandleCount int `json:"cur_unity_handle_count"` //当前unity处理数 UnityErrCount int `json:"unity_err_count"` //unity处理错误数 } // 统计信息 var ( //用户连接统计 mapUserWsStat = sync.Map{} //消息控制通道的数据结构 websocketStat = make(chan websocketStatData, 1000) //ws总的连接数 curWsTotalCount int //合图失败总数 combineErrTotalCount int //unity正在渲染总数 curUnityHandleTotalCount int //unity错误统计数 unityErrTotalCount int //算法正在合图总数 curCombineTotalCount int ) // 累增ws连接数计数 func increaseWebsocketConnectCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_CONNECT_COUNT, Num: 1, } select { case websocketStat <- data: curWsTotalCount += data.Num return case <-time.After(time.Millisecond * 200): logx.Error("increaseWebsocketConnectCount 输入管道超时,丢弃消息") return } } // 减少ws连接数计数 func decreaseWebsocketConnectCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_CONNECT_COUNT, Num: -1, } select { case websocketStat <- data: curWsTotalCount += data.Num if curWsTotalCount < 0 { curWsTotalCount = 0 } return case <-time.After(time.Millisecond * 200): logx.Error("decreaseWebsocketConnectCount 输入管道超时,丢弃消息") return } } // 累增合图请求数计数 func increaseCombineRequestCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_COMBINE_IMAGE_COUNT, Num: 1, } select { case websocketStat <- data: curCombineTotalCount += data.Num return case <-time.After(time.Millisecond * 200): logx.Error("increaseCombineRequestCount 输入管道超时,丢弃消息") return } } // 减少合图请求数计数 func decreaseCombineRequestCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_COMBINE_IMAGE_COUNT, Num: -1, } select { case websocketStat <- data: curCombineTotalCount += data.Num if curCombineTotalCount < 0 { curCombineTotalCount = 0 } return case <-time.After(time.Millisecond * 200): logx.Error("decreaseCombineRequestCount 输入管道超时,丢弃消息") return } } // 累增合图失败数计数 func increaseCombineRequestErrorCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_COMBINE_IMAGE_ERR_COUNT, Num: 1, } select { case websocketStat <- data: combineErrTotalCount += data.Num return case <-time.After(time.Millisecond * 200): logx.Error("increaseCombineRequestErrorCount 输入管道超时,丢弃消息") return } } // 累增unity请求数计数 func increaseUnityRequestCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_UNITY_HANDLE_COUNT, Num: 1, } select { case websocketStat <- data: curUnityHandleTotalCount += data.Num return case <-time.After(time.Millisecond * 200): logx.Error("increaseUnityRequestCount 输入管道超时,丢弃消息") return } } // 减少unity请求数计数 func decreaseUnityRequestCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_CUR_UNITY_HANDLE_COUNT, Num: -1, } select { case websocketStat <- data: curUnityHandleTotalCount += data.Num if curUnityHandleTotalCount < 0 { curUnityHandleTotalCount = 0 } return case <-time.After(time.Millisecond * 200): logx.Error("decreaseUnityRequestCount 输入管道超时,丢弃消息") return } } // 累曾unity错误统计 func increaseUnityErrorCount(userId, guestId int64) { data := websocketStatData{ UserId: userId, GuestId: guestId, Type: TYPE_UNITY_ERR_COUNT, Num: 1, } select { case websocketStat <- data: unityErrTotalCount += data.Num return case <-time.After(time.Millisecond * 200): logx.Error("increaseUnityErrorCount 输入管道超时,丢弃消息") return } } // 消费数据 func ConsumeWebsocketStatData(ctx context.Context) { defer func() { if err := recover(); err != nil { logx.Error("ConsumeWebsocketStatData panic:", err) } }() go func() { select { case <-ctx.Done(): panic(any("ConsumeWebsocketStatData ctx deadline")) } }() for { select { case data := <-websocketStat: key := fmt.Sprintf("%d_%d", data.UserId, data.GuestId) statData, ok := mapUserWsStat.Load(key) switch data.Type { case TYPE_CUR_CONNECT_COUNT: //ws连接计数 if !ok { mapUserWsStat.Store(key, mapUserWsStatItem{ CurWsConnectCount: data.Num, }) continue } stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.CurWsConnectCount += data.Num //没有连接就删除 if stat.CurWsConnectCount <= 0 { mapUserWsStat.Delete(key) continue } //保存统计 mapUserWsStat.Store(key, stat) case TYPE_CUR_COMBINE_IMAGE_COUNT: //请求算法合图计数 //不存在 if !ok { continue } //存在 stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.CurCombineCount += data.Num if stat.CurCombineCount < 0 { stat.CurCombineCount = 0 } //保存统计 mapUserWsStat.Store(key, stat) case TYPE_CUR_UNITY_HANDLE_COUNT: //unity处理计数 if !ok { continue } //存在 stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.CurUnityHandleCount += data.Num if stat.CurUnityHandleCount <= 0 { stat.CurUnityHandleCount = 0 } //保存统计 mapUserWsStat.Store(key, stat) case TYPE_CUR_COMBINE_IMAGE_ERR_COUNT: //合图失败计数 if !ok { continue } //存在 stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.CombineErrCount += data.Num //保存统计 mapUserWsStat.Store(key, stat) case TYPE_UNITY_ERR_COUNT: //unity错误信息 if !ok { continue } //存在 stat, ok := statData.(mapUserWsStatItem) if !ok { logx.Error("断言mapUserWsStatItem错误") continue } stat.UnityErrCount += data.Num //保存统计 mapUserWsStat.Store(key, stat) } } } }