diff --git a/1.json b/1.json new file mode 100644 index 0000000..a6c45bc --- /dev/null +++ b/1.json @@ -0,0 +1,117 @@ +{ + "id": "c82cff55f2cc5cee3f71300fd8c5d953", + "order_id": 0, + "user_id": null, + "sku_ids": [ + 16, + 19, + 25, + 13 + ], + "tids": [ + "FSN-PPBX-1", + "FSN-PLUP-4", + "FSN-PPCCT-2" + ], + "data": [ + { + "light": 2, + "refletion": -1, + "scale": "22", + "sku_id": 25, + "tid": "FSN-PPBX-1", + "rotation": "5.4,-41,-4.7", + "filePath": "", + "data": [ + { + "name": "model", + "data": "0,https:\/\/fusenh5.kayue.cn:8011\/storage\/test\/final_JntYgQf85p_temp.png,FSN-PPBX-1_MaskMap,FSN-PPBX-1_Normal,FSN-PPBX-1_BaseMap", + "type": "other", + "layer": "0", + "is_update": 1, + "mode": "Opaque" + }, + { + "name": "shadow", + "data": "0,FSN-PPBX-1,0,0,0", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + } + ] + }, + { + "light": 8, + "refletion": -1, + "scale": "12.5", + "sku_id": 19, + "tid": "FSN-PPCCT-2", + "rotation": "-2,25.5,-1", + "filePath": "", + "data": [ + { + "name": "model", + "data": "0,https:\/\/fusenh5.kayue.cn:8011\/storage\/test\/final_xNf7MC38vc_temp.png,FSN-PPCCT-2_MaskMap,FSN-PPCCT-2_Normal,FSN-PPCCT-2_BaseMap", + "type": "other", + "layer": "0", + "is_update": 1, + "mode": "Opaque" + }, + { + "name": "shadow", + "data": "0,FSN-PPCCT-2,0,0,0", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + }, + { + "name": "model_P", + "data": "0,FSN-PPCCT-2-P,FSN-PPCCT-2-P_MaskMap,FSN-PPCCT-2-P_Normal,FSN-PPCCT-2-P_BaseMap", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + } + ] + }, + { + "light": 7, + "refletion": 7, + "scale": "14", + "sku_id": 16, + "tid": "FSN-PLUP-4", + "rotation": "0,0,0", + "filePath": "", + "data": [ + { + "name": "model", + "data": "0,https:\/\/fusenh5.kayue.cn:8011\/storage\/test\/final_ImDCgYaw9o_temp.png,FSN-PLUP-4_MaskMap,FSN-PLUP-4_Normal,FSN-PLUP-4_BaseMap", + "type": "other", + "layer": "0", + "is_update": 1, + "mode": "Opaque" + }, + { + "name": "shadow", + "data": "0,FSN-PLUP-4,0,0,0", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + }, + { + "name": "model_P", + "data": "0,FSN-PLUP-4-P,FSN-PLUP-4-P_MaskMap,FSN-PLUP-4-P_Normal,FSN-PLUP-4-P_BaseMap", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Opaque" + } + ] + } + ], + "is_thousand_face": 0, + "folder": "" +} \ No newline at end of file diff --git a/2.json b/2.json new file mode 100644 index 0000000..4950fe0 --- /dev/null +++ b/2.json @@ -0,0 +1,52 @@ +{ + "id": 6, + "name": "\u9910\u5177\u5957\u88c5\u56db\u4ef6\u5957\u914d\u4ef6\u6a21\u677f\uff08\u9ed1\u767d\uff09", + "cover": "", + "material": "\/fbx\/b5b831449ac0aa1dc527a08fe11f7a23.png", + "materialList": [ + { + "id": "c16c15bd-3bbd-47a1-87eb-2739a95d8063", + "tag": "other", + "title": "\u8d34\u56fe1", + "type": "image", + "text": "", + "fill": "#000000", + "fontSize": 20, + "fontFamily": "Aqum2SmallCaps3", + "ifBr": false, + "ifShow": true, + "ifGroup": false, + "maxNum": 50, + "rotation": 0, + "align": "center", + "verticalAlign": "middle", + "material": "\/image\/3j2i3q7k.png", + "width": 1024, + "height": 1024, + "x": 0, + "y": 0, + "opacity": 1, + "optionalColor": [ + { + "color": "#000000", + "name": "Black", + "default": true + } + ], + "zIndex": 0, + "svgPath": "", + "follow": { + "fill": "", + "ifShow": "", + "content": "" + }, + "group": [], + "cameraStand": { + "x": 0, + "y": 0, + "z": 0 + } + } + ], + "isPublic": false +} \ No newline at end of file diff --git a/command.go b/command.go index 0b27457..e8f6c7d 100644 --- a/command.go +++ b/command.go @@ -38,6 +38,8 @@ type Command struct { type QueueItem struct { // 队列组名 Group string `json:"group"` // 组名 + + Wid uint64 // 唯一标识id // 优先级 Priority uint32 `json:"priority"` // 处理的优先级 // 创建时间 diff --git a/go.mod b/go.mod index 9abd2a8..c2bbb83 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/getsentry/sentry-go v0.12.0 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/go-cmp v0.5.9 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.0.0 // indirect github.com/hashicorp/go-msgpack v0.5.3 // indirect diff --git a/go.sum b/go.sum index 2b64316..493e021 100644 --- a/go.sum +++ b/go.sum @@ -140,6 +140,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= diff --git a/main.go b/main.go index 6357830..dd36e64 100644 --- a/main.go +++ b/main.go @@ -48,24 +48,22 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) if len(result.Data) == 0 { // log.Println("wait 10 second") var m runtime.MemStats - runtime.ReadMemStats(&m) - allocMB := float64(m.Alloc) / 1024 / 1024 totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 sysMB := float64(m.Sys) / 1024 / 1024 - fmt.Printf("dequeue count %d, Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", i, allocMB, totalAllocMB, sysMB) - time.Sleep(time.Second * 5) break + } else { var item QueueItem err := item.Decode(result.Data) if err != nil { log.Println(err) } + log.Println(item) + PopChannel <- result.Data // log.Println(item) - } } } diff --git a/start_test.go b/start_test.go index 33b0602..6bfb2bd 100644 --- a/start_test.go +++ b/start_test.go @@ -39,6 +39,7 @@ func TestStartNodeA(t *testing.T) { item := &fusenrender.QueueItem{ Group: "test", + Wid: fusenrender.UidCreater.Get(), Priority: uint32(2), CreateAt: time.Now(), Data: uuid.New().String(), diff --git a/unique_id.go b/unique_id.go index b1dab30..51d7b34 100644 --- a/unique_id.go +++ b/unique_id.go @@ -15,7 +15,7 @@ func (uid *UniqueId) Get() uint64 { return (uid.count << 8) | uid.nodeId } -func NewUniqueIdd(NodeId uint8) *UniqueId { +func NewUniqueId(NodeId uint8) *UniqueId { return &UniqueId{ nodeId: uint64(NodeId), count: 0, diff --git a/websocket.go b/websocket.go new file mode 100644 index 0000000..219622c --- /dev/null +++ b/websocket.go @@ -0,0 +1,120 @@ +package fusenrender + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/lni/dragonboat/v4" +) + +var UidCreater = NewUniqueId(1) +var PopChannel chan []byte = make(chan []byte, 3) // chan *QueueItem = make(chan *QueueItem, 1) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +func queueHandler(w http.ResponseWriter, r *http.Request) { + conn, _ := upgrader.Upgrade(w, r, nil) + var err error + for { + + itemdata := <-PopChannel + + // 写回消息 + err = conn.WriteMessage(websocket.BinaryMessage, itemdata) + if err != nil { + return + } + // 打印消息 + fmt.Printf("%s len=%d sent: \n%s\n", conn.RemoteAddr(), len(itemdata), string(itemdata)) + + // 读取消息 + // msgType, msg, err := conn.ReadMessage() + // if err != nil { + // return + // } + } +} + +func callbackHandler(w http.ResponseWriter, r *http.Request) { + conn, _ := upgrader.Upgrade(w, r, nil) + + for { + // 读取消息 + msgType, msg, err := conn.ReadMessage() + if err != nil { + return + } + + // 打印消息 + fmt.Printf("%s sent: %s\n", conn.RemoteAddr(), string(msg)) + + // 写回消息 + err = conn.WriteMessage(msgType, msg) + if err != nil { + return + } + } +} + +func queuePushHandler(w http.ResponseWriter, r *http.Request) { + // 1. 读取Body内容 + body, err := io.ReadAll(r.Body) + if err != nil { + panic(err) + } + + // 2. 定义结构体 + + item := &QueueItem{ + Group: "test", + CreateAt: time.Now(), + Data: "saddas", + } + + // 3. 解析JSON到结构体 + err = json.Unmarshal(body, item) + if err != nil { + panic(err) + } +} + +var wns *dragonboat.NodeHost + +func PushItem(nh *dragonboat.NodeHost, item *QueueItem) { + cs := nh.GetNoOPSession(128) + cmd := &CmdEnqueue{Command: Command{Group: "test"}} + cmd.Item = item + data, err := FsPasser.PackToBytes(cmd) + + if err != nil { + log.Println(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + _, err = nh.SyncPropose(ctx, cs, data) + if err != nil { + log.Println(err) + } + + // log.Println("enqueue", len(result.Data)) + cancel() +} + +func HttpStart(ns *dragonboat.NodeHost, port int) { + wns = ns + + http.HandleFunc("/api/queue/push", queuePushHandler) + http.HandleFunc("/ws/pop/queue", queueHandler) + http.HandleFunc("/ws/callback", callbackHandler) + log.Println(fmt.Sprintf(":%d", port)) + http.ListenAndServe(fmt.Sprintf(":%d", port), nil) +} diff --git a/websocket_test.go b/websocket_test.go new file mode 100644 index 0000000..75ec059 --- /dev/null +++ b/websocket_test.go @@ -0,0 +1,61 @@ +package fusenrender_test + +import ( + "fmt" + "fusenrender" + "log" + "net/url" + "testing" + + "github.com/gorilla/websocket" +) + +func TestWebsocketA(t *testing.T) { + svc, err := fusenrender.LoadConfig("etc/etc_a.yaml") + if err != nil { + panic(err) + } + + fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + fusenrender.HttpStart(svc.Port - 1000) +} + +func TestWebsocketB(t *testing.T) { + svc, err := fusenrender.LoadConfig("etc/etc_b.yaml") + if err != nil { + panic(err) + } + + fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + fusenrender.HttpStart(svc.Port - 1000) +} + +func TestWebsocketC(t *testing.T) { + svc, err := fusenrender.LoadConfig("etc/etc_c.yaml") + if err != nil { + panic(err) + } + + fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + fusenrender.HttpStart(svc.Port - 1000) +} + +var addr = "localhost:4051" + +func TestWebsocketCient(t *testing.T) { + u := url.URL{Scheme: "ws", Host: addr, Path: "/ws"} + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + log.Println(err) + defer conn.Close() + + for { + err = conn.WriteMessage(websocket.BinaryMessage, []byte("123")) + if err != nil { + return + } + + mt, msg, err := conn.ReadMessage() + fmt.Sprintln(mt, err) + fmt.Printf("Received: %s\n", string(msg)) + } +}