diff --git a/.gitignore b/.gitignore index 2b19cbb7..ba4fd900 100644 --- a/.gitignore +++ b/.gitignore @@ -52,4 +52,7 @@ server/product-model/product-model server/product-template/product-template server/shopping-cart-confirmation/shopping-cart-confirmation server/upload/upload -server/webset/webset \ No newline at end of file +server/webset/webset + + +shared-state \ No newline at end of file diff --git a/proxyserver/main.go b/proxyserver/main.go index 066a16d1..811f6476 100644 --- a/proxyserver/main.go +++ b/proxyserver/main.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/gorilla/websocket" "gopkg.in/yaml.v2" ) @@ -115,6 +116,7 @@ type Backend struct { HttpAddress string Client *http.Client Handler http.HandlerFunc + Dialer *websocket.Dialer } func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Backend { @@ -142,14 +144,76 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac }, } + dialer := &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + NetDial: func(network, addr string) (net.Conn, error) { + return net.Dial(network, addr) + }, + } + // 创建后端服务对象,包含地址和客户端 backend := &Backend{ HttpAddress: httpAddress, Client: client, + Dialer: dialer, } // 创建处理请求的函数 handleRequest := func(w http.ResponseWriter, r *http.Request) { + + if websocket.IsWebSocketUpgrade(r) { + //todo: 建立websocket的代理 + + target := url.URL{Scheme: "ws", Host: strings.Split(backend.HttpAddress, "//")[1], Path: r.URL.Path} + + var transfer = func(src, dest *websocket.Conn) { + for { + mType, msg, err := src.ReadMessage() + if err != nil { + log.Println(err) + break + } + + err = dest.WriteMessage(mType, msg) + if err != nil { + log.Println(err) + break + } + } + + src.Close() + dest.Close() + } + + header := r.Header.Clone() + // log.Println(target.String()) + header.Del("Sec-Websocket-Extensions") + header.Del("Upgrade") + header.Del("Sec-Websocket-Key") + header.Del("Sec-Websocket-Version") + header.Del("Connection") + // header.Del("Origin") + proxyConn, _, err := backend.Dialer.Dial(target.String(), header) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // defer proxyConn.Close() + + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + // defer conn.Close() + + go transfer(proxyConn, conn) + // go transfer(conn, proxyConn) + return + } + // 解析目标URL,包含了查询参数 targetURL, err := url.Parse(httpAddress + r.URL.String()) if err != nil { diff --git a/server/render/consumer/assemble_render_data.go b/server/render/consumer/assemble_render_data.go index 8fb6f82b..3c7f1ffa 100644 --- a/server/render/consumer/assemble_render_data.go +++ b/server/render/consumer/assemble_render_data.go @@ -18,6 +18,7 @@ import ( "gorm.io/gorm" "io/ioutil" "strconv" + "time" ) // 这里请求的py接口返回数据 @@ -40,11 +41,13 @@ func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { } val := ctx.Value("svcctx") if val == nil { - return errors.New("svcctx is nil") + logx.Error("svcctx is nil") + return nil //不返回错误就删除消息 } svcCtx, ok := val.(*svc.ServiceContext) if !ok { - return errors.New("svcctx is nil!!") + logx.Error("svcctx is nil!!") + return nil //不返回错误就删除消息 } rabbitmq := initalize.RabbitMqHandle{} //获取模板(产品第一个sku的模板) @@ -52,10 +55,10 @@ func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logx.Error("template info is not found") - return nil + return nil //不返回错误就删除消息 } logx.Error("failed to get template info:", err) - return err + return nil //不返回错误就删除消息 } combineImage := "" //刀版图 combineHash := hash.JsonHashKey(parseInfo) //区别于云渲染的taskid,这个用获取刀版图缓存 @@ -63,13 +66,13 @@ func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { resource, err := svcCtx.AllModels.FsResource.FindOneById(ctx, combineHash) if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { logx.Error("failed to get resource :", err) - return err + return nil //不返回错误就删除消息 } //如果不存在,则请求生成刀版图 if errors.Is(err, gorm.ErrRecordNotFound) { combineImage, err = getCombineImage(ctx, svcCtx, parseInfo, productTemplate, combineHash) if err != nil { - return err + return nil //不返回错误就删除消息 } } else { combineImage = *resource.ResourceUrl @@ -79,10 +82,10 @@ func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logx.Error("element info is not found,model_id = ?", *productTemplate.ModelId) - return nil + return nil //不返回错误就删除消息 } logx.Error("failed to get element list,", err) - return err + return nil //不返回错误就删除消息 } //组装数据 refletion := -1 @@ -94,7 +97,7 @@ func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { if element.Mode != nil && *element.Mode != "" { if err = json.Unmarshal([]byte(*element.Mode), &mode); err != nil { logx.Error("faile to parse element mode json:", err) - return err + return nil //不返回错误就删除消息 } } tempData := make([]map[string]interface{}, 0, 3) @@ -154,7 +157,7 @@ func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { b, _ := json.Marshal(sendData) if err = rabbitmq.SendMsg(constants.RABBIT_MQ_TO_UNITY, b); err != nil { logx.Error("发送渲染组装数据到rabbitmq失败:", err) - return err + return nil //不返回错误就删除消息 } logx.Info("发送渲染组装数据到unity成功") return nil @@ -180,7 +183,7 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo "address": "", "phone": "", "colors": []string{}, - "template_tagid": []string{}, + "template_tagid": []string{"b1a"}, "is_crop": false, "shape": "", "ratio": 0, @@ -222,7 +225,10 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo url := svcCtx.Config.PythonApi.CombineImageUrl header := make(map[string]string) header["content-type"] = "application/json" - httpRsp, err := curl.ApiCall(url, "POST", header, bytes.NewReader(postData), 20) + /*f, _ := os.Create("a.txt") + defer f.Close() + f.Write(postData)*/ + httpRsp, err := curl.ApiCall(url, "POST", header, bytes.NewReader(postData), time.Second*20) if err != nil { logx.Error("failed to combine logo:", err) return "", err diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 6e6f74ae..1aadd993 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -99,10 +99,10 @@ func (w *wsConnectItem) renderImage(data []byte) { d, _ := json.Marshal(tmpData) //发送给对应的流水线组装数据 if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil { - logx.Error("发送渲染任务数据到MQ失败:", string(data), "err:", err) + logx.Error("发送渲染任务数据到MQ失败:", string(d), "err:", err) return } - logx.Info("发送渲染数据到rabbitmq成功:", string(data)) + logx.Info("发送渲染数据到rabbitmq成功:", string(d)) } // 操作连接中渲染任务的增加/删除