diff --git a/proxyserver/main.go b/proxyserver/main.go index a6e25325..1b0b9f9c 100644 --- a/proxyserver/main.go +++ b/proxyserver/main.go @@ -18,7 +18,6 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/zeromicro/go-zero/core/logx" "gopkg.in/yaml.v2" _ "fusenapi/utils/auth" @@ -105,16 +104,8 @@ func main() { // 对/api开头的请求进行反向代理 proxy := httputil.NewSingleHostReverseProxy(apiURL) - proxy.ErrorHandler = func(res http.ResponseWriter, req *http.Request, err error) { - if err != nil { - - // 在发生错误时进行处理 - logx.Error(err) - logx.Error(res.Header()) - res.WriteHeader(http.StatusNotFound) // 返回404状态码 - } - } proxy.ServeHTTP(w, r) + return } else { @@ -218,22 +209,22 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac target := url.URL{Scheme: "ws", Host: strings.Split(backend.HttpAddress, "//")[1], Path: r.URL.Path} var transfer = func(src, dest *websocket.Conn) { + defer src.Close() + defer dest.Close() + // TODO: 可以做错误处理 for { mType, msg, err := src.ReadMessage() if err != nil { log.Println(err) - break + return } err = dest.WriteMessage(mType, msg) if err != nil { log.Println(err) - break + return } } - - src.Close() - dest.Close() } header := r.Header.Clone() @@ -257,6 +248,7 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } // defer conn.Close() @@ -269,14 +261,14 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac // 解析目标URL,包含了查询参数 targetURL, err := url.Parse(httpAddress + r.URL.String()) if err != nil { - http.Error(w, "Error parsing target URL", http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) return } // 创建新的请求 proxyReq, err := http.NewRequest(r.Method, targetURL.String(), r.Body) if err != nil { - http.Error(w, "Error creating proxy request", http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -294,7 +286,7 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac // 发送请求 resp, err := backend.Client.Do(proxyReq) if err != nil { - http.Error(w, "Error sending proxy request", http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) return } defer resp.Body.Close() @@ -310,7 +302,7 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac w.WriteHeader(resp.StatusCode) _, err = io.Copy(w, resp.Body) if err != nil { - http.Error(w, "Error copying proxy response", http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) return } } diff --git a/run_all_server.sh b/run_all_server.sh index 43723946..6ee27efe 100755 --- a/run_all_server.sh +++ b/run_all_server.sh @@ -4,57 +4,11 @@ single_server_name=$1 go mod tidy go mod vendor - -# 定义一个函数来在每个服务器目录下运行 go run .go -run_server() { - server_name=$1 - echo "Running $server_name" - - - - # 如果之前存在相同名字的 screen 会话,先将其终止 - existing_session=$(screen -ls | grep -w "$server_name") - if [ -n "$existing_session" ]; then - echo "Terminating existing screen session for $server_name" - screen -S "$server_name" -X quit - fi - # 导航到相应的目录 - cd server/$server_name - go build - - [ -f .gitignore ] || echo $server_name > .gitignore - # 使用 screen 运行 go run .go - screen -dmS $server_name -L ./$server_name - - # 返回到上一级目录 - cd - > /dev/null -} - find /tmp/go-build* -mmin +5 -exec rm -rf {} + find /tmp/go-link* -mmin +5 -exec rm -rf {} + -server_dirs=() # 初始化一个空数组 - -if [ -n "$single_server_name" ]; then - server_dirs=("$single_server_name") -else - for dir in server/*/ ; do # 遍历 "server/" 下的所有子目录 - dir=${dir%*/} # 删除末尾的 "/" - dir=${dir##*/} # 删除开头的 "server/" - server_dirs+=("$dir") # 添加到数组 - done -fi - -# 在每个服务器目录下运行相应的 go 程序 -for server_dir in "${server_dirs[@]}"; do - run_server $server_dir -done - - -if [ -n "$single_server_name" ]; then - echo "no proxyserver restart" -else - # 定义目录和screen名称 +run_proxyserver() { + # 定义目录和screen名称 dir_path="./proxyserver" screen_name="proxyserver" @@ -67,6 +21,77 @@ else fi go build # 启动新的screen session并运行go程序 + echo "run $screen_name" screen -dmS $screen_name -L ./$screen_name +} + +# 定义一个函数来在每个服务器目录下运行 go run .go +run_server() { + server_name=$1 + + # 导航到相应的目录 + cd server/$server_name + go build + echo "build $server_name" + + # 如果之前存在相同名字的 screen 会话,先将其终止 + # 首先尝试关闭已存在的screen会话 + existing_session=$(screen -ls | grep -w "$server_name") + if [ -n "$existing_session" ]; then + echo "Terminating existing screen session for $server_name" + screen -S "$server_name" -X quit + while [[ $(screen -ls | grep "\.$server_name\s") ]]; do + sleep 0.1s # 等待0.1秒后再次检查 + echo "wait for $server_name" + done + fi + + # 循环检查screen进程是否存在 + + [ -f .gitignore ] || echo $server_name > .gitignore + # 使用 screen 运行 go run .go + + echo "Running $server_name" + screen -dmS $server_name -L ./$server_name + + # 返回到上一级目录 + cd - > /dev/null +} + +if [ "$single_server_name" = "proxyserver" ]; then + # 重启proxyserver的逻辑 + run_proxyserver +else + + server_dirs=() # 初始化一个空数组 + + if [ -n "$single_server_name" ]; then + server_dirs=("$single_server_name") + else + for dir in server/*/ ; do # 遍历 "server/" 下的所有子目录 + dir=${dir%*/} # 删除末尾的 "/" + dir=${dir##*/} # 删除开头的 "server/" + server_dirs+=("$dir") # 添加到数组 + done + fi + + # 在每个服务器目录下运行相应的 go 程序 + for server_dir in "${server_dirs[@]}"; do + run_server $server_dir + done + + if [ -n "$single_server_name" ]; then + echo "no proxyserver restart" + else + run_proxyserver + fi fi + + + + + + + + diff --git a/server/info/internal/logic/infologic.go b/server/info/internal/logic/infologic.go index 41bc7a91..b77e0bed 100644 --- a/server/info/internal/logic/infologic.go +++ b/server/info/internal/logic/infologic.go @@ -6,6 +6,7 @@ import ( "fusenapi/utils/auth" "fusenapi/utils/basic" "fusenapi/utils/check" + "log" "strings" "context" @@ -82,6 +83,26 @@ func (mquery *ModuleQuery) EncodeEmpty() map[string]any { return qstr } +func QueryDefault(conn *gorm.DB, module string, moduleQuery string, tname string) map[string]any { + + qname := strings.Split(moduleQuery, ".") + queryAsName := qname[len(qname)-1] + sqlstr := fmt.Sprintf( + "select JSON_EXTRACT(metadata,'$.%s') as %s from %s where module = '%s' and user_id = 0 and guest_id = 0 order by ctime DESC limit 1", + moduleQuery, // logo_selected + queryAsName, // logo_selected + tname, // fs_user_info + module, // profile + ) + raw := conn.Raw(sqlstr) + var info map[string]any + err := raw.Scan(&info).Error + if err == gorm.ErrRecordNotFound { + logx.Error(err) + } + return info +} + func (l *InfoLogic) Info(req *types.UserInfoRequest, userinfo *auth.UserInfo) (resp *basic.Response) { // 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data) // userinfo 传入值时, 一定不为null @@ -178,6 +199,29 @@ func (l *InfoLogic) Info(req *types.UserInfoRequest, userinfo *auth.UserInfo) (r } } + // 隐含白板用户逻辑 + if v, ok := metadict["userinfo.profile"]; ok { + + if v == nil { + info := QueryDefault(l.svcCtx.MysqlConn, "profile", "logo_selected", "fs_user_info") + log.Println(info) + metadict["userinfo.profile"] = info + // log.Println(metadict) + } else { + profileDict := v.(map[string]any) + if _, ok := profileDict["logo_selected"]; !ok { + info := QueryDefault(l.svcCtx.MysqlConn, "profile", "logo_selected", "fs_user_info") + profileDict["logo_selected"] = info["logo_selected"] + } + } + + } else if v, ok := metadict["userinfo.profile.logo_selected"]; ok { + if v == nil { + info := QueryDefault(l.svcCtx.MysqlConn, "profile", "logo_selected", "fs_user_info") + metadict["userinfo.profile.logo_selected"] = info + } + } + return resp.SetStatus(basic.CodeOK, metadict) } diff --git a/server/info/internal/logic/infologic_test.go b/server/info/internal/logic/infologic_test.go index c6b32168..ec0e1d05 100644 --- a/server/info/internal/logic/infologic_test.go +++ b/server/info/internal/logic/infologic_test.go @@ -115,9 +115,7 @@ func TestMain(t *testing.T) { if v, ok := metadict["userinfo.profile"]; ok { if v == nil { - info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info") - log.Println(info) metadict["userinfo.profile"] = info // log.Println(metadict) } else { @@ -126,19 +124,15 @@ func TestMain(t *testing.T) { info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info") profileDict["logo_selected"] = info["logo_selected"] } - } } else if v, ok := metadict["userinfo.profile.logo_selected"]; ok { if v == nil { - + info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info") + metadict["userinfo.profile.logo_selected"] = info } } - log.Println(metadict) - - return - } func QueryDefault(conn *gorm.DB, module string, moduleQuery string, tname string) map[string]any { diff --git a/server/shopping-cart/internal/logic/addtocartlogic.go b/server/shopping-cart/internal/logic/addtocartlogic.go index 1d6ef135..b6f4e21c 100644 --- a/server/shopping-cart/internal/logic/addtocartlogic.go +++ b/server/shopping-cart/internal/logic/addtocartlogic.go @@ -10,6 +10,8 @@ import ( "fusenapi/server/shopping-cart/internal/types" "fusenapi/utils/auth" "fusenapi/utils/basic" + "fusenapi/utils/file" + "fusenapi/utils/hash" "gorm.io/gorm" "time" @@ -51,6 +53,31 @@ func (l *AddToCartLogic) AddToCart(req *types.AddToCartReq, userinfo *auth.UserI if cartCount >= 100 { return resp.SetStatusWithMessage(basic.CodeDbSqlErr, "sorry,the count of your carts can`t greater than 100") } + if req.RenderImage != "" { + //上传base64文件 + // 上传文件 + var upload = file.Upload{ + Ctx: l.ctx, + MysqlConn: l.svcCtx.MysqlConn, + AwsSession: l.svcCtx.AwsSession, + } + uploadRes, err := upload.UploadFileByBase64(&file.UploadBaseReq{ + Source: "webGl render image", + FileHash: hash.JsonHashKey(req.RenderImage), + FileData: req.RenderImage, + Metadata: "", + UploadBucket: 1, + ApiType: 2, + UserId: userinfo.UserId, + GuestId: userinfo.GuestId, + FileByte: nil, + }) + if err != nil { + logx.Error(err) + return resp.SetStatusWithMessage(basic.CodeFileUploadErr, "failed to upload webGl render image") + } + req.RenderImage = uploadRes.ResourceUrl + } //获取产品是否存在 productInfo, err := l.svcCtx.AllModels.FsProduct.FindOne(l.ctx, req.ProductId) if err != nil { diff --git a/server/shopping-cart/internal/svc/servicecontext.go b/server/shopping-cart/internal/svc/servicecontext.go index f2b1f557..835b70da 100644 --- a/server/shopping-cart/internal/svc/servicecontext.go +++ b/server/shopping-cart/internal/svc/servicecontext.go @@ -16,6 +16,7 @@ type ServiceContext struct { AllModels *gmodel.AllModelsGen RabbitMq *initalize.RabbitMqHandle Repositories *initalize.Repositories + AwsSession *session.Session } func NewServiceContext(c config.Config) *ServiceContext { @@ -24,10 +25,11 @@ func NewServiceContext(c config.Config) *ServiceContext { Credentials: credentials.NewStaticCredentials(c.AWS.S3.Credentials.AccessKeyID, c.AWS.S3.Credentials.Secret, c.AWS.S3.Credentials.Token), } return &ServiceContext{ - Config: c, - MysqlConn: conn, - AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), - RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), + Config: c, + MysqlConn: conn, + AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), + AwsSession: session.Must(session.NewSession(&config)), Repositories: initalize.NewAllRepositories(&initalize.NewAllRepositorieData{ GormDB: conn, BLMServiceUrl: &c.BLMService.Url, diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 37e15346..7f434499 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -67,9 +67,9 @@ var ( //websocket连接存储 mapConnPool = sync.Map{} //每个websocket连接入口缓冲队列长度默认值 - websocketInChanLen = 500 + websocketInChanLen = 1000 //每个websocket连接出口缓冲队列长度默认值 - websocketOutChanLen = 500 + websocketOutChanLen = 1000 //是否开启debug openDebug = true //允许跨域的origin diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index a900c8b3..53420a69 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -22,6 +22,8 @@ import ( var ( //每个websocket渲染任务缓冲队列长度默认值 renderChanLen = 500 + //每个websocket渲染并发数 + renderChanConcurrency = 100 ) // 渲染处理器 @@ -30,9 +32,7 @@ type renderProcessor struct { // 云渲染属性 type extendRenderProperty struct { - renderChan chan websocket_data.RenderImageReqMsg //渲染消息入口的缓冲队列 - colorSelectedIndex int //选择颜色索引 - templateTag string //模板标签 + renderChan chan websocket_data.RenderImageReqMsg //渲染消息入口的缓冲队列 } // 处理分发到这里的数据 @@ -48,8 +48,6 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) { case <-w.closeChan: //已经关闭 return case w.extendRenderProperty.renderChan <- renderImageData: //发入到缓冲队列 - w.extendRenderProperty.colorSelectedIndex = renderImageData.RenderData.TemplateTagColor.SelectedColorIndex - w.extendRenderProperty.templateTag = renderImageData.RenderData.TemplateTag return } } @@ -58,36 +56,35 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) { func (w *wsConnectItem) consumeRenderImageData() { defer func() { if err := recover(); err != nil { - logx.Error("func renderImage err:", err) + logx.Error("func consumeRenderImageData err:", err) } }() + //限制并发 + limitChan := make(chan struct{}, renderChanConcurrency) + defer close(limitChan) for { select { case <-w.closeChan: //已关闭 return case data := <-w.extendRenderProperty.renderChan: //消费数据 - //属性不同则不发送渲染 - if data.RenderData.TemplateTag != w.extendRenderProperty.templateTag { - continue - } - //属性不同则不发送渲染 - if data.RenderData.TemplateTagColor.SelectedColorIndex != w.extendRenderProperty.colorSelectedIndex { - continue - } - w.renderImage(data) + limitChan <- struct{}{} + go func(d websocket_data.RenderImageReqMsg) { + defer func() { + if err := recover(); err != nil { + logx.Error("func renderImage err:", err) + } + }() + defer func() { + <-limitChan + }() + w.renderImage(d) + }(data) } } } // 执行渲染任务 func (w *wsConnectItem) renderImage(renderImageData websocket_data.RenderImageReqMsg) { - //logx.Info("消费渲染数据:", string(data)) - /*var renderImageData websocket_data.RenderImageReqMsg - if err := json.Unmarshal(data, &renderImageData); err != nil { - w.renderErrResponse(renderImageData.RenderId, renderImageData.RenderData.TemplateTag, "", "数据格式错误", renderImageData.RenderData.ProductId, w.userId, w.guestId, 0, 0, 0, 0) - logx.Error("invalid format of websocket render image message", err) - return - }*/ if renderImageData.RenderData.Logo == "" { w.renderErrResponse(renderImageData.RenderId, renderImageData.RenderData.TemplateTag, "", "请传入logo", renderImageData.RenderData.ProductId, w.userId, w.guestId, 0, 0, 0, 0) return diff --git a/service/repositories/image_handle.go b/service/repositories/image_handle.go index acf1f0c2..3c4f565a 100644 --- a/service/repositories/image_handle.go +++ b/service/repositories/image_handle.go @@ -316,8 +316,9 @@ func (l *defaultImageHandle) LogoCombine(ctx context.Context, in *LogoCombineReq var resultBLM constants.BLMServiceUrlResult err = curl.NewClient(ctx, &curl.Config{ - BaseUrl: *l.BLMServiceUrl, - Url: constants.BLMServiceUrlLogoCombine, + BaseUrl: *l.BLMServiceUrl, + Url: constants.BLMServiceUrlLogoCombine, + RequireTimeout: time.Second * 30, }).PostJson(postMap, &resultBLM) logc.Infof(ctx, "合图--算法请求--合图--结束时间:%v", time.Now().UTC()) diff --git a/utils/curl/client_resty.go b/utils/curl/client_resty.go index 022892b1..abbfd46f 100644 --- a/utils/curl/client_resty.go +++ b/utils/curl/client_resty.go @@ -20,8 +20,9 @@ func NewClient(ctx context.Context, c *Config) Client { client := resty.New().SetBaseURL(c.BaseUrl) // 设置超时时间为 5 分钟 - client.SetTimeout(5 * time.Minute) - + if c.RequireTimeout == 0 { + client.SetTimeout(5 * time.Minute) + } /* 传输链路 */ tracer := otel.GetTracerProvider().Tracer(trace.TraceName) spanCtx, span := tracer.Start( @@ -60,11 +61,12 @@ func NewClient(ctx context.Context, c *Config) Client { type ( Config struct { - BaseUrl string `json:"base_url"` - Url string `json:"url"` - HeaderData map[string]string `json:"header_data"` - RetryCount int64 `json:"retry_count"` - RetryWaitTime int64 `json:"retry_wait_time"` + BaseUrl string `json:"base_url"` + Url string `json:"url"` + HeaderData map[string]string `json:"header_data"` + RetryCount int64 `json:"retry_count"` + RetryWaitTime int64 `json:"retry_wait_time"` + RequireTimeout time.Duration `json:"require_timeout"` } defaultClient struct { c *Config