Merge branch 'develop' of gitee.com:fusenpack/fusenapi into develop

This commit is contained in:
momo 2023-11-02 15:16:51 +08:00
commit 6dfdd667ad
6 changed files with 14 additions and 44 deletions

View File

@ -49,18 +49,12 @@ func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) {
} }
// 消费公共通知未处理的消息(目前是轮巡方式,待优化) // 消费公共通知未处理的消息(目前是轮巡方式,待优化)
func ConsumeCommonCacheData(ctx context.Context) { func ConsumeCommonCacheData() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("consumeCommonCacheData panic :", err) logx.Error("consumeCommonCacheData panic :", err)
} }
}() }()
go func() {
select {
case <-ctx.Done():
panic("consumeCommonCacheData ctx deadline")
}
}()
for { for {
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
info := popCommonNotifyCache() info := popCommonNotifyCache()

View File

@ -46,6 +46,7 @@ func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *a
logx.Error("解析taskId错误") logx.Error("解析taskId错误")
return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "failed to parse param taskId !!!!") return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "failed to parse param taskId !!!!")
} }
//发出去的时间必须是大于服务启动的时间才参与统计
if info.RenderBeginTime > serverStartTime { if info.RenderBeginTime > serverStartTime {
//logx.Info("任务时间:", info.RenderBeginTime, "服务器启动时间:", serverStartTime) //logx.Info("任务时间:", info.RenderBeginTime, "服务器启动时间:", serverStartTime)
//统计unity处理数 //统计unity处理数

View File

@ -1,7 +1,6 @@
package logic package logic
import ( import (
"context"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
@ -50,14 +49,14 @@ func sendCancelRenderMsgToUnity(wid string, deadlineTime int64) {
} }
// 拨号udp // 拨号udp
func DialUdp(ctx context.Context, config config.Config) error { func DialUdp(config config.Config) error {
localAddr := &net.UDPAddr{IP: net.ParseIP(config.Unity.Udp.LocalAddr), Port: config.Unity.Udp.LocalPort} localAddr := &net.UDPAddr{IP: net.ParseIP(config.Unity.Udp.LocalAddr), Port: config.Unity.Udp.LocalPort}
remoteAddr := &net.UDPAddr{IP: net.ParseIP(config.Unity.Udp.RemoteAddr), Port: config.Unity.Udp.RemotePort} remoteAddr := &net.UDPAddr{IP: net.ParseIP(config.Unity.Udp.RemoteAddr), Port: config.Unity.Udp.RemotePort}
conn, err := net.DialUDP("udp", localAddr, remoteAddr) conn, err := net.DialUDP("udp", localAddr, remoteAddr)
if err != nil { if err != nil {
return err return err
} }
go ConsumeCancelUnityChanMessage(ctx, conn) go ConsumeCancelUnityChanMessage(conn)
return nil return nil
} }
@ -69,18 +68,12 @@ func signMessage(wid string, deadlineTime int64) string {
} }
// 消费数据 // 消费数据
func ConsumeCancelUnityChanMessage(ctx context.Context, conn *net.UDPConn) { func ConsumeCancelUnityChanMessage(conn *net.UDPConn) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("ConsumeCancelUnityChanMessage 异常:", err) logx.Error("ConsumeCancelUnityChanMessage 异常:", err)
} }
}() }()
go func() {
select {
case <-ctx.Done():
panic(any("ConsumeCancelUnityChanMessage ctx deadline"))
}
}()
defer conn.Close() defer conn.Close()
for { for {
select { select {

View File

@ -1,7 +1,6 @@
package logic package logic
import ( import (
"context"
"fmt" "fmt"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"sync" "sync"
@ -206,18 +205,12 @@ func increaseUnityErrorCount(userId, guestId int64) {
} }
// 消费数据 // 消费数据
func ConsumeWebsocketStatData(ctx context.Context) { func ConsumeWebsocketStatData() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("ConsumeWebsocketStatData panic:", err) logx.Error("ConsumeWebsocketStatData panic:", err)
} }
}() }()
go func() {
select {
case <-ctx.Done():
panic(any("ConsumeWebsocketStatData ctx deadline"))
}
}()
for { for {
select { select {
case data := <-websocketStat: case data := <-websocketStat:
@ -237,11 +230,11 @@ func ConsumeWebsocketStatData(ctx context.Context) {
continue continue
} }
stat.CurWsConnectCount += data.Num stat.CurWsConnectCount += data.Num
//没有连接就删除 //没有连接就删除(先不删除)
if stat.CurWsConnectCount <= 0 { /*if stat.CurWsConnectCount <= 0 {
mapUserWsStat.Delete(key) mapUserWsStat.Delete(key)
continue continue
} }*/
//保存统计 //保存统计
mapUserWsStat.Store(key, stat) mapUserWsStat.Store(key, stat)
case TYPE_CUR_COMBINE_IMAGE_COUNT: //请求算法合图计数 case TYPE_CUR_COMBINE_IMAGE_COUNT: //请求算法合图计数

View File

@ -1,7 +1,6 @@
package logic package logic
import ( import (
"context"
"fmt" "fmt"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
@ -56,18 +55,12 @@ func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) {
} }
// 消费用户索引创建/删除/发送消息中的任务数据 // 消费用户索引创建/删除/发送消息中的任务数据
func ConsumeUserConnPoolCtlChanData(ctx context.Context) { func ConsumeUserConnPoolCtlChanData() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("ConsumeUserConnPoolCtlChanData panic:", err) logx.Error("ConsumeUserConnPoolCtlChanData panic:", err)
} }
}() }()
go func() {
select {
case <-ctx.Done():
panic(any("ConsumeUserConnPoolCtlChanData ctx deadline"))
}
}()
var ( var (
data userConnPoolCtlChanItem data userConnPoolCtlChanItem
userKey string userKey string

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"fusenapi/server/websocket/internal/logic" "fusenapi/server/websocket/internal/logic"
@ -30,17 +29,14 @@ func main() {
ctx := svc.NewServiceContext(c) ctx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, ctx) handler.RegisterHandlers(server, ctx)
ctx1 := context.Background()
ctx1, cancel := context.WithCancel(ctx1)
defer cancel()
//消费公共通知队列的数据 //消费公共通知队列的数据
go logic.ConsumeCommonCacheData(ctx1) go logic.ConsumeCommonCacheData()
//消费用户索引创建/删除/发送消息中的任务数据 //消费用户索引创建/删除/发送消息中的任务数据
go logic.ConsumeUserConnPoolCtlChanData(ctx1) go logic.ConsumeUserConnPoolCtlChanData()
//消费连接统计信息 //消费连接统计信息
go logic.ConsumeWebsocketStatData(ctx1) go logic.ConsumeWebsocketStatData()
//拨号udp消费控制unity取消僵尸任务的消息 //拨号udp消费控制unity取消僵尸任务的消息
if err := logic.DialUdp(ctx1, c); err != nil { if err := logic.DialUdp(c); err != nil {
logx.Error("dail udp err:", err) logx.Error("dail udp err:", err)
return return
} }