From 354824bc68e93a66c876e8e2de9f1e9b3ebcdd03 Mon Sep 17 00:00:00 2001 From: eson <474420502@qq.com> Date: Sun, 6 Aug 2023 03:54:11 +0800 Subject: [PATCH] =?UTF-8?q?TODO:=20addresses=20=E9=94=99=E8=AF=AF,=20?= =?UTF-8?q?=E9=9C=80=E8=A6=81=E4=BC=A0=E5=85=A5=E6=89=80=E6=9C=89=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E7=9A=84config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datajson_test.go | 119 +++++++++++++++++ main.go | 22 ++-- sm.go | 15 ++- start_test.go | 317 +++++++++++++++++++++++----------------------- websocket.go | 46 +++---- websocket_test.go | 7 +- 6 files changed, 326 insertions(+), 200 deletions(-) create mode 100644 datajson_test.go diff --git a/datajson_test.go b/datajson_test.go new file mode 100644 index 0000000..2c0d2e1 --- /dev/null +++ b/datajson_test.go @@ -0,0 +1,119 @@ +package fusenrender_test + +var sendjson = `{ + "id": "c82cff55f2cc5cee3f71300fd8c5d953", + "order_id": 0, + "user_id": null, + "sku_ids": [ + 16, + 19, + 25, + 13 + ], + "tids": [ + "FSN-PPBX-1", + "FSN-PLUP-4", + "FSN-PPCCT-2" + ], + "data": [ + { + "light": 2, + "refletion": -1, + "scale": "22", + "sku_id": 25, + "tid": "FSN-PPBX-1", + "rotation": "5.4,-41,-4.7", + "filePath": "", + "data": [ + { + "name": "model", + "data": "0,https:\/\/fusenh5.kayue.cn:8011\/storage\/test\/final_JntYgQf85p_temp.png,FSN-PPBX-1_MaskMap,FSN-PPBX-1_Normal,FSN-PPBX-1_BaseMap", + "type": "other", + "layer": "0", + "is_update": 1, + "mode": "Opaque" + }, + { + "name": "shadow", + "data": "0,FSN-PPBX-1,0,0,0", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + } + ] + }, + { + "light": 8, + "refletion": -1, + "scale": "12.5", + "sku_id": 19, + "tid": "FSN-PPCCT-2", + "rotation": "-2,25.5,-1", + "filePath": "", + "data": [ + { + "name": "model", + "data": "0,https:\/\/fusenh5.kayue.cn:8011\/storage\/test\/final_xNf7MC38vc_temp.png,FSN-PPCCT-2_MaskMap,FSN-PPCCT-2_Normal,FSN-PPCCT-2_BaseMap", + "type": "other", + "layer": "0", + "is_update": 1, + "mode": "Opaque" + }, + { + "name": "shadow", + "data": "0,FSN-PPCCT-2,0,0,0", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + }, + { + "name": "model_P", + "data": "0,FSN-PPCCT-2-P,FSN-PPCCT-2-P_MaskMap,FSN-PPCCT-2-P_Normal,FSN-PPCCT-2-P_BaseMap", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + } + ] + }, + { + "light": 7, + "refletion": 7, + "scale": "14", + "sku_id": 16, + "tid": "FSN-PLUP-4", + "rotation": "0,0,0", + "filePath": "", + "data": [ + { + "name": "model", + "data": "0,https:\/\/fusenh5.kayue.cn:8011\/storage\/test\/final_ImDCgYaw9o_temp.png,FSN-PLUP-4_MaskMap,FSN-PLUP-4_Normal,FSN-PLUP-4_BaseMap", + "type": "other", + "layer": "0", + "is_update": 1, + "mode": "Opaque" + }, + { + "name": "shadow", + "data": "0,FSN-PLUP-4,0,0,0", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Fade" + }, + { + "name": "model_P", + "data": "0,FSN-PLUP-4-P,FSN-PLUP-4-P_MaskMap,FSN-PLUP-4-P_Normal,FSN-PLUP-4-P_BaseMap", + "type": "other", + "layer": "0", + "is_update": 0, + "mode": "Opaque" + } + ] + } + ], + "is_thousand_face": 0, + "folder": "" +}` diff --git a/main.go b/main.go index bd05447..c076698 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,7 @@ func main() { } -var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) { +var DequeueHandler = triggered.RegisterExecute(func(params *triggered.Params[bool]) { var nh *dragonboat.NodeHost params.Shared.Value(func(v any) { @@ -32,7 +32,7 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) cs := nh.GetNoOPSession(128) for i := 0; ; i++ { - cmd := &CmdDequeue{Command: Command{Group: "test"}} + cmd := &CmdDequeue{Command{Group: "test"}} data, err := FsPasser.PackToBytes(cmd) if err != nil { log.Println(err) @@ -61,8 +61,9 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) if err != nil { log.Println(err) } - log.Println(item) - PopChannel <- result.Data + // log.Println(item) + + PopChannel <- []byte(item.Data.(string)) // log.Println(item) } } @@ -75,8 +76,10 @@ var addresses []string = []string{ "localhost:5502", } -func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat.NodeHost { - +func StartNode(cfg *ConfigServer) *dragonboat.NodeHost { + var exampleShardID uint64 = 128 + replicaID := cfg.ServerID + addr := cfg.Address() // addr := "localhost" // addr = fmt.Sprintf("%s:%d", addr, port) @@ -160,14 +163,17 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat } // 把引用计数设置为0 - Consumption.RefCountAdd(-1) + DequeueHandler.RefCountAdd(-1) // 设置共享的参数 - Consumption.WithShared(nh) + DequeueHandler.WithShared(nh) if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) os.Exit(1) } + wsPort := cfg.Port - 1000 + HttpStart(nh, wsPort) + return nh } diff --git a/sm.go b/sm.go index 7a03461..44399b5 100644 --- a/sm.go +++ b/sm.go @@ -12,13 +12,14 @@ import ( sm "github.com/lni/dragonboat/v4/statemachine" ) +// 结构体异步传递后, 执行的注册函数 var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { fsPasser := passer.NewPasser[sm.Result]() fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) { var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) - var consumer = cxt.Value(ctxConsumption{}).(*triggered.EventExecute[bool]) + var consumer = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool]) cmd := obj.(*CmdEnqueue) key := cmd.Item.GetKey() @@ -114,8 +115,12 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { queues: make(map[string]*PriorityQueue[QueueItem]), counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { - log.Printf("queue remain: %d\n", params.Value) - time.Sleep(time.Second * 5) + if params.Value != 0 { + log.Printf("queue remain: %d\n", params.Value) + time.Sleep(time.Second * 5) + } else { + time.Sleep(time.Second * 15) + } }), } } @@ -130,7 +135,7 @@ func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) { type ctxEntry struct{} type ctxSMQueue struct{} -type ctxConsumption struct{} +type ctxDequeueHandler struct{} // Update处理Entry中的更新命令 // Update updates the object using the specified committed raft entry. @@ -139,7 +144,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { ctx = context.WithValue(ctx, ctxEntry{}, &e) ctx = context.WithValue(ctx, ctxSMQueue{}, s) - ctx = context.WithValue(ctx, ctxConsumption{}, Consumption) + ctx = context.WithValue(ctx, ctxDequeueHandler{}, DequeueHandler) return FsPasser.ExecuteWithBytes(ctx, e.Cmd) } diff --git a/start_test.go b/start_test.go index 6bfb2bd..fbcbc3c 100644 --- a/start_test.go +++ b/start_test.go @@ -1,183 +1,176 @@ package fusenrender_test import ( - "context" - "fusenrender" "log" - - "testing" - "time" - - "github.com/google/uuid" - "github.com/lni/goutils/syncutil" ) func init() { log.SetFlags(log.Llongfile) -} - -func TestStartNodeA(t *testing.T) { - svc, err := fusenrender.LoadConfig("etc/etc_a.yaml") - if err != nil { - panic(err) - } - - nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) - - raftStopper := syncutil.NewStopper() - - // ch := make(chan string, 16) - - raftStopper.RunWorker(func() { - // this goroutine makes a linearizable read every 10 second. it returns the - // Count value maintained in IStateMachine. see datastore.go for details. - cs := nh.GetNoOPSession(128) - ticker := time.NewTicker(1 * time.Millisecond) - for { - select { - case <-ticker.C: - - item := &fusenrender.QueueItem{ - Group: "test", - Wid: fusenrender.UidCreater.Get(), - Priority: uint32(2), - CreateAt: time.Now(), - Data: uuid.New().String(), - } - - cmd := &fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}} - cmd.Item = item - data, err := fusenrender.FsPasser.PackToBytes(cmd) - - if err != nil { - log.Println(err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - _, err = nh.SyncPropose(ctx, cs, data) - if err != nil { - log.Println(err) - } - - // log.Println("enqueue", len(result.Data)) - cancel() - - case <-raftStopper.ShouldStop(): - return - } - } - }) - - raftStopper.Wait() -} - -func TestStartNodeB(t *testing.T) { - svc, err := fusenrender.LoadConfig("etc/etc_b.yaml") - if err != nil { - panic(err) - } - - nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) - - raftStopper := syncutil.NewStopper() - - // ch := make(chan string, 16) - - raftStopper.RunWorker(func() { - // this goroutine makes a linearizable read every 10 second. it returns the - // Count value maintained in IStateMachine. see datastore.go for details. - cs := nh.GetNoOPSession(128) - - ticker := time.NewTicker(1 * time.Millisecond) - for { - select { - case <-ticker.C: - - item := &fusenrender.QueueItem{ - Group: "test", - Priority: uint32(1), - CreateAt: time.Now(), - Data: uuid.New().String(), - } - - cmd := fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}} - cmd.Item = item - data, err := fusenrender.FsPasser.PackToBytes(&cmd) - - if err != nil { - log.Println(err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - _, err = nh.SyncPropose(ctx, cs, data) - if err != nil { - log.Println(err) - } else { - // log.Println("enqueue", len(result.Data)) - } - - cancel() - - case <-raftStopper.ShouldStop(): - return - } - } - }) - - raftStopper.Wait() } -func TestStartNodeC(t *testing.T) { +// func TestStartNodeA(t *testing.T) { +// svc, err := fusenrender.LoadConfig("etc/etc_a.yaml") +// if err != nil { +// panic(err) +// } - svc, err := fusenrender.LoadConfig("etc/etc_c.yaml") - if err != nil { - panic(err) - } +// nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) - nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) - log.Println(nh) +// raftStopper := syncutil.NewStopper() - raftStopper := syncutil.NewStopper() - raftStopper.RunWorker(func() { - // this goroutine makes a linearizable read every 10 second. it returns the - // Count value maintained in IStateMachine. see datastore.go for details. - // cs := nh.GetNoOPSession(128) - ticker := time.NewTicker(2 * time.Second) - for { - select { - case <-ticker.C: +// // ch := make(chan string, 16) - // for { +// raftStopper.RunWorker(func() { +// // this goroutine makes a linearizable read every 10 second. it returns the +// // Count value maintained in IStateMachine. see datastore.go for details. +// cs := nh.GetNoOPSession(128) +// ticker := time.NewTicker(1 * time.Millisecond) +// for { +// select { +// case <-ticker.C: - // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - // cmd := fusenrender.Command{ - // Name: "dequeue", - // Group: "test", - // } +// item := &fusenrender.QueueItem{ +// Group: "test", +// Wid: fusenrender.UidCreater.Get(), +// Priority: uint32(2), +// CreateAt: time.Now(), +// Data: uuid.New().String(), +// } - // data, err := cmd.Encode() - // if err != nil { - // log.Println(err) - // } +// cmd := &fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}} +// cmd.Item = item +// data, err := fusenrender.FsPasser.PackToBytes(cmd) - // result, err := nh.SyncPropose(ctx, cs, data) - // if err != nil { - // log.Println(err) - // } +// if err != nil { +// log.Println(err) +// } - // log.Println(len(result.Data), string(result.Data)) - // cancel() - // if len(result.Data) == 0 { - // break - // } - // } +// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) +// _, err = nh.SyncPropose(ctx, cs, data) +// if err != nil { +// log.Println(err) +// } - case <-raftStopper.ShouldStop(): - return - } - } - }) +// // log.Println("enqueue", len(result.Data)) +// cancel() - raftStopper.Wait() -} +// case <-raftStopper.ShouldStop(): +// return +// } +// } +// }) + +// raftStopper.Wait() +// } + +// func TestStartNodeB(t *testing.T) { +// svc, err := fusenrender.LoadConfig("etc/etc_b.yaml") +// if err != nil { +// panic(err) +// } + +// nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + +// raftStopper := syncutil.NewStopper() + +// // ch := make(chan string, 16) + +// raftStopper.RunWorker(func() { +// // this goroutine makes a linearizable read every 10 second. it returns the +// // Count value maintained in IStateMachine. see datastore.go for details. +// cs := nh.GetNoOPSession(128) + +// ticker := time.NewTicker(1 * time.Millisecond) +// for { +// select { +// case <-ticker.C: + +// item := &fusenrender.QueueItem{ +// Group: "test", +// Priority: uint32(1), +// CreateAt: time.Now(), +// Data: uuid.New().String(), +// } + +// cmd := fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}} +// cmd.Item = item +// data, err := fusenrender.FsPasser.PackToBytes(&cmd) + +// if err != nil { +// log.Println(err) +// } + +// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) +// _, err = nh.SyncPropose(ctx, cs, data) +// if err != nil { +// log.Println(err) +// } else { +// // log.Println("enqueue", len(result.Data)) +// } + +// cancel() + +// case <-raftStopper.ShouldStop(): +// return +// } +// } +// }) + +// raftStopper.Wait() + +// } + +// func TestStartNodeC(t *testing.T) { + +// svc, err := fusenrender.LoadConfig("etc/etc_c.yaml") +// if err != nil { +// panic(err) +// } + +// nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) +// log.Println(nh) + +// raftStopper := syncutil.NewStopper() +// raftStopper.RunWorker(func() { +// // this goroutine makes a linearizable read every 10 second. it returns the +// // Count value maintained in IStateMachine. see datastore.go for details. +// // cs := nh.GetNoOPSession(128) +// ticker := time.NewTicker(2 * time.Second) +// for { +// select { +// case <-ticker.C: + +// // for { + +// // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) +// // cmd := fusenrender.Command{ +// // Name: "dequeue", +// // Group: "test", +// // } + +// // data, err := cmd.Encode() +// // if err != nil { +// // log.Println(err) +// // } + +// // result, err := nh.SyncPropose(ctx, cs, data) +// // if err != nil { +// // log.Println(err) +// // } + +// // log.Println(len(result.Data), string(result.Data)) +// // cancel() +// // if len(result.Data) == 0 { +// // break +// // } +// // } + +// case <-raftStopper.ShouldStop(): +// return +// } +// } +// }) + +// raftStopper.Wait() +// } diff --git a/websocket.go b/websocket.go index b0a4bad..2689afb 100644 --- a/websocket.go +++ b/websocket.go @@ -21,6 +21,14 @@ var upgrader = websocket.Upgrader{ WriteBufferSize: 1024, } +func HttpStart(ns *dragonboat.NodeHost, port int) { + http.HandleFunc("/api/queue/push", pushRenderTaskHandler) + http.HandleFunc("/ws/pop/queue", queueHandler) + http.HandleFunc("/ws/callback", callbackHandler) + log.Printf(":%d", port) + http.ListenAndServe(fmt.Sprintf(":%d", port), nil) +} + func queueHandler(w http.ResponseWriter, r *http.Request) { var err error conn, err := upgrader.Upgrade(w, r, nil) @@ -30,9 +38,9 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { } defer conn.Close() - log.Println("建立连接", Consumption.RefCountAdd(1)) + log.Println("建立连接", DequeueHandler.RefCountAdd(1)) defer func() { - log.Println("退出连接", Consumption.RefCountAdd(-1)) + log.Println("退出连接", DequeueHandler.RefCountAdd(-1)) }() for { @@ -57,11 +65,11 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { return } log.Println("重新回队") - PushItem(wns, &item) + stateClient.PushItem(&item) return } // 打印消息 - fmt.Printf("%s len=%d sent: \n%s\n", conn.RemoteAddr(), len(itemdata), string(itemdata)) + fmt.Printf("%s len=%d sent: \n ", conn.RemoteAddr(), len(itemdata)) // 读取消息 // msgType, msg, err := conn.ReadMessage() @@ -95,7 +103,7 @@ func callbackHandler(w http.ResponseWriter, r *http.Request) { } } -func queuePushHandler(w http.ResponseWriter, r *http.Request) { +func pushRenderTaskHandler(w http.ResponseWriter, r *http.Request) { // 1. 读取Body内容 body, err := io.ReadAll(r.Body) if err != nil { @@ -104,21 +112,25 @@ func queuePushHandler(w http.ResponseWriter, r *http.Request) { // 2. 定义结构体 - item := &QueueItem{} + item := QueueItem{} // 3. 解析JSON到结构体 - err = json.Unmarshal(body, item) + err = json.Unmarshal(body, &item) if err != nil { panic(err) } - PushItem(wns, item) + stateClient.PushItem(&item) } -var wns *dragonboat.NodeHost +type StateClient struct { + nh *dragonboat.NodeHost +} -func PushItem(nh *dragonboat.NodeHost, item *QueueItem) { - cs := nh.GetNoOPSession(128) +var stateClient *StateClient + +func (cli *StateClient) PushItem(item *QueueItem) { + cs := cli.nh.GetNoOPSession(128) cmd := &CmdEnqueue{Command: Command{Group: "test"}} cmd.Item = item data, err := FsPasser.PackToBytes(cmd) @@ -128,7 +140,7 @@ func PushItem(nh *dragonboat.NodeHost, item *QueueItem) { } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - _, err = nh.SyncPropose(ctx, cs, data) + _, err = cli.nh.SyncPropose(ctx, cs, data) if err != nil { log.Println(err) } @@ -136,13 +148,3 @@ func PushItem(nh *dragonboat.NodeHost, item *QueueItem) { // log.Println("enqueue", len(result.Data)) cancel() } - -func HttpStart(ns *dragonboat.NodeHost, port int) { - wns = ns - - http.HandleFunc("/api/queue/push", queuePushHandler) - http.HandleFunc("/ws/pop/queue", queueHandler) - http.HandleFunc("/ws/callback", callbackHandler) - log.Println(fmt.Sprintf(":%d", port)) - http.ListenAndServe(fmt.Sprintf(":%d", port), nil) -} diff --git a/websocket_test.go b/websocket_test.go index 656a053..7d21a7b 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -18,7 +18,7 @@ func TestWebsocketA(t *testing.T) { panic(err) } - nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + nh := fusenrender.StartNode(svc) fusenrender.HttpStart(nh, svc.Port-1000) } @@ -28,7 +28,7 @@ func TestWebsocketB(t *testing.T) { panic(err) } - nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + nh := fusenrender.StartNode(svc) fusenrender.HttpStart(nh, svc.Port-1000) } @@ -38,7 +38,7 @@ func TestWebsocketC(t *testing.T) { panic(err) } - nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + nh := fusenrender.StartNode(svc) fusenrender.HttpStart(nh, svc.Port-1000) } @@ -57,6 +57,7 @@ func TestWebsocketCient(t *testing.T) { Group: "test", Wid: fusenrender.UidCreater.Get(), CreateAt: time.Now(), + Data: sendjson, } r, err := requests.Post(fmt.Sprintf("http://%s/api/queue/push", addr)).SetBodyJson(item).Execute() log.Println(r, err)