From 4195de38181e380f3da704bed2da21037876a728 Mon Sep 17 00:00:00 2001 From: eson <474420502@qq.com> Date: Sun, 6 Aug 2023 04:26:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=88=B0=E5=90=88=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 7 +++- go.sum | 4 ++ main.go | 76 ++++++++++++-------------------------- main_test.go | 2 +- sm.go | 89 +++++---------------------------------------- sm_upate_handler.go | 86 +++++++++++++++++++++++++++++++++++++++++++ websocket.go | 49 ++++++++++++++++++++++--- websocket_test.go | 12 +++--- 8 files changed, 178 insertions(+), 147 deletions(-) create mode 100644 sm_upate_handler.go diff --git a/go.mod b/go.mod index b471861..9b1b465 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,8 @@ go 1.20 require ( github.com/474420502/execute v0.2.2 github.com/474420502/passer v0.0.1 - github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be - github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 ) require ( @@ -23,8 +21,10 @@ require ( github.com/cockroachdb/redact v1.1.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/getsentry/sentry-go v0.12.0 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/go-cmp v0.5.9 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.0.0 // indirect github.com/hashicorp/go-msgpack v0.5.3 // indirect @@ -34,6 +34,7 @@ require ( github.com/hashicorp/memberlist v0.3.1 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect + github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 // indirect github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 // indirect github.com/miekg/dns v1.1.26 // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect @@ -49,6 +50,7 @@ require ( github.com/valyala/histogram v1.2.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 // indirect + google.golang.org/protobuf v1.26.0 // indirect ) require ( @@ -57,6 +59,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/compress v1.13.5 // indirect + github.com/lni/dragonboat v2.1.7+incompatible github.com/pkg/errors v0.9.1 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/go.sum b/go.sum index 716c885..4746832 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -216,6 +217,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= +github.com/lni/dragonboat v2.1.7+incompatible h1:U04ZmiQKXcdRWNE6jccSiYJK7k+ECwN7XIw1xpTiJeE= +github.com/lni/dragonboat v2.1.7+incompatible/go.mod h1:eI3naIUzh2+DKqtOFwdZ2t0Z/BjlGEjWZgS07x7x8oU= github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be h1:0K1suLIEoHH6f2OnWF9IIlVAjR8OhFt+VuC9ZKpEJCk= github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be/go.mod h1:DbE6sDHHvYPZvJPgP5K82+HHn6OMTgbAWy/ISa42VEk= github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 h1:6gzI38ZJmbzZ7oZebXz6jII0uVK+MZ3+ds+7mIt1ioI= @@ -494,6 +497,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go index c076698..4b6f523 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package fusenrender import ( - "context" "flag" "fmt" "log" @@ -10,7 +9,6 @@ import ( "path/filepath" "runtime" "syscall" - "time" "github.com/474420502/execute/triggered" "github.com/lni/dragonboat/v4" @@ -18,56 +16,29 @@ import ( "github.com/lni/dragonboat/v4/logger" ) -func main() { - -} +var shardID uint64 = 128 var DequeueHandler = triggered.RegisterExecute(func(params *triggered.Params[bool]) { - var nh *dragonboat.NodeHost params.Shared.Value(func(v any) { - nh = v.(*dragonboat.NodeHost) + + cs := stateClient.GetNoOPSession() + for i := 0; ; i++ { + + item, err := stateClient.PopItem(cs, "test") + if err != nil { + log.Println(err) + break + } + if item == nil { + return + } + + PopChannel <- []byte(item.Data.(string)) + + } }) - cs := nh.GetNoOPSession(128) - for i := 0; ; i++ { - - cmd := &CmdDequeue{Command{Group: "test"}} - data, err := FsPasser.PackToBytes(cmd) - if err != nil { - log.Println(err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - result, err := nh.SyncPropose(ctx, cs, data) - cancel() - if err != nil { - log.Println(err) - break - } else { - if len(result.Data) == 0 { - // log.Println("wait 10 second") - var m runtime.MemStats - runtime.ReadMemStats(&m) - allocMB := float64(m.Alloc) / 1024 / 1024 - totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 - sysMB := float64(m.Sys) / 1024 / 1024 - fmt.Printf("dequeue count %d, Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", i, allocMB, totalAllocMB, sysMB) - break - - } else { - var item QueueItem - err := item.Decode(result.Data) - if err != nil { - log.Println(err) - } - // log.Println(item) - - PopChannel <- []byte(item.Data.(string)) - // log.Println(item) - } - } - } }) var addresses []string = []string{ @@ -76,8 +47,8 @@ var addresses []string = []string{ "localhost:5502", } -func StartNode(cfg *ConfigServer) *dragonboat.NodeHost { - var exampleShardID uint64 = 128 +func StartNode(cfg *ConfigServer) { + replicaID := cfg.ServerID addr := cfg.Address() // addr := "localhost" @@ -127,7 +98,7 @@ func StartNode(cfg *ConfigServer) *dragonboat.NodeHost { rc := config.Config{ // ShardID and ReplicaID of the raft node ReplicaID: uint64(replicaID), - ShardID: exampleShardID, + ShardID: shardID, ElectionRTT: 10, @@ -162,6 +133,8 @@ func StartNode(cfg *ConfigServer) *dragonboat.NodeHost { panic(err) } + stateClient = &StateClient{nh: nh} + // 把引用计数设置为0 DequeueHandler.RefCountAdd(-1) // 设置共享的参数 @@ -172,8 +145,5 @@ func StartNode(cfg *ConfigServer) *dragonboat.NodeHost { os.Exit(1) } - wsPort := cfg.Port - 1000 - HttpStart(nh, wsPort) - - return nh + HttpListen(nh, cfg.Port-1000) } diff --git a/main_test.go b/main_test.go index bd649c4..bc290cf 100644 --- a/main_test.go +++ b/main_test.go @@ -3,5 +3,5 @@ package fusenrender import "testing" func TestMain(t *testing.T) { - main() + } diff --git a/sm.go b/sm.go index 44399b5..862a59a 100644 --- a/sm.go +++ b/sm.go @@ -2,93 +2,16 @@ package fusenrender import ( "context" + "fmt" "io" - "log" + "runtime" "sync" "time" "github.com/474420502/execute/triggered" - "github.com/474420502/passer" sm "github.com/lni/dragonboat/v4/statemachine" ) -// 结构体异步传递后, 执行的注册函数 -var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { - - fsPasser := passer.NewPasser[sm.Result]() - fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) { - - var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) - var consumer = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool]) - - cmd := obj.(*CmdEnqueue) - key := cmd.Item.GetKey() - - smqueue.mu.Lock() - var queue *PriorityQueue[QueueItem] - var ok bool - if queue, ok = smqueue.queues[cmd.Group]; !ok { - queue = NewPriorityQueue[QueueItem]() - smqueue.queues[cmd.Group] = queue - } - smqueue.mu.Unlock() - - queue.Push(&Slice[QueueItem]{ - Key: key, - Value: cmd.Item, - }) - - var result sm.Result - consumer.Notify(consumer.NULL) // 通知可以执行update - return result, nil - }) - - fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) { - - var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) - var e = cxt.Value(ctxEntry{}).(*sm.Entry) - - var queue *PriorityQueue[QueueItem] - var ok bool - cmd := obj.(*CmdDequeue) - smqueue.mu.Lock() - if queue, ok = smqueue.queues[cmd.Group]; !ok { - queue = NewPriorityQueue[QueueItem]() - smqueue.queues[cmd.Group] = queue - } - smqueue.mu.Unlock() - - var result sm.Result - if queue.Empty() { - return result, nil - } - - item := queue.Pop() - if item == nil { - result.Value = 0 - return result, nil - } - - if item != nil { - d, err := item.Encode() - if err != nil { - return result, err - } - e.Result.Data = d - result.Data = d - queue.Empty() - size := queue.Size() - - smqueue.counter.Notify(size) - // log.Println("queue remain:", queue.Size()) - } - - return result, nil - }) - - return fsPasser -}() - // SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列 type SMQueue struct { // 所属的Shard ID @@ -116,7 +39,13 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { if params.Value != 0 { - log.Printf("queue remain: %d\n", params.Value) + var m runtime.MemStats + runtime.ReadMemStats(&m) + allocMB := float64(m.Alloc) / 1024 / 1024 + totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 + sysMB := float64(m.Sys) / 1024 / 1024 + fmt.Printf("queue remain: %d, Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", params.Value, allocMB, totalAllocMB, sysMB) + time.Sleep(time.Second * 5) } else { time.Sleep(time.Second * 15) diff --git a/sm_upate_handler.go b/sm_upate_handler.go new file mode 100644 index 0000000..c13b28d --- /dev/null +++ b/sm_upate_handler.go @@ -0,0 +1,86 @@ +package fusenrender + +import ( + "context" + + "github.com/474420502/execute/triggered" + "github.com/474420502/passer" + sm "github.com/lni/dragonboat/v4/statemachine" +) + +// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler +var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { + + fsPasser := passer.NewPasser[sm.Result]() + fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) { + + var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) + var dequeueHandler = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool]) + + cmd := obj.(*CmdEnqueue) + key := cmd.Item.GetKey() + + smqueue.mu.Lock() + var queue *PriorityQueue[QueueItem] + var ok bool + if queue, ok = smqueue.queues[cmd.Group]; !ok { + queue = NewPriorityQueue[QueueItem]() + smqueue.queues[cmd.Group] = queue + } + smqueue.mu.Unlock() + + queue.Push(&Slice[QueueItem]{ + Key: key, + Value: cmd.Item, + }) + + var result sm.Result + dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update + return result, nil + }) + + fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) { + + var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) + var e = cxt.Value(ctxEntry{}).(*sm.Entry) + + var queue *PriorityQueue[QueueItem] + var ok bool + cmd := obj.(*CmdDequeue) + smqueue.mu.Lock() + if queue, ok = smqueue.queues[cmd.Group]; !ok { + queue = NewPriorityQueue[QueueItem]() + smqueue.queues[cmd.Group] = queue + } + smqueue.mu.Unlock() + + var result sm.Result + if queue.Empty() { + return result, nil + } + + item := queue.Pop() + if item == nil { + result.Value = 0 + return result, nil + } + + if item != nil { + d, err := item.Encode() + if err != nil { + return result, err + } + e.Result.Data = d + result.Data = d + queue.Empty() + size := queue.Size() + + smqueue.counter.Notify(size) + // log.Println("queue remain:", queue.Size()) + } + + return result, nil + }) + + return fsPasser +}() diff --git a/websocket.go b/websocket.go index 2689afb..c1a988f 100644 --- a/websocket.go +++ b/websocket.go @@ -11,6 +11,7 @@ import ( "github.com/gorilla/websocket" "github.com/lni/dragonboat/v4" + "github.com/lni/dragonboat/v4/client" ) var UidCreater = NewUniqueId(1) @@ -21,7 +22,7 @@ var upgrader = websocket.Upgrader{ WriteBufferSize: 1024, } -func HttpStart(ns *dragonboat.NodeHost, port int) { +func HttpListen(ns *dragonboat.NodeHost, port int) { http.HandleFunc("/api/queue/push", pushRenderTaskHandler) http.HandleFunc("/ws/pop/queue", queueHandler) http.HandleFunc("/ws/callback", callbackHandler) @@ -65,7 +66,7 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { return } log.Println("重新回队") - stateClient.PushItem(&item) + stateClient.PushItem(nil, &item) return } // 打印消息 @@ -120,7 +121,7 @@ func pushRenderTaskHandler(w http.ResponseWriter, r *http.Request) { panic(err) } - stateClient.PushItem(&item) + stateClient.PushItem(nil, &item) } type StateClient struct { @@ -129,8 +130,15 @@ type StateClient struct { var stateClient *StateClient -func (cli *StateClient) PushItem(item *QueueItem) { - cs := cli.nh.GetNoOPSession(128) +func (cli *StateClient) GetNoOPSession() *client.Session { + return cli.nh.GetNoOPSession(shardID) +} + +func (cli *StateClient) PushItem(cs *client.Session, item *QueueItem) { + if cs == nil { + cs = cli.GetNoOPSession() + } + cmd := &CmdEnqueue{Command: Command{Group: "test"}} cmd.Item = item data, err := FsPasser.PackToBytes(cmd) @@ -148,3 +156,34 @@ func (cli *StateClient) PushItem(item *QueueItem) { // log.Println("enqueue", len(result.Data)) cancel() } + +func (cli *StateClient) PopItem(cs *client.Session, group string) (*QueueItem, error) { + if cs == nil { + cs = cli.GetNoOPSession() + } + + cmd := &CmdDequeue{Command{Group: "test"}} + data, err := FsPasser.PackToBytes(cmd) + if err != nil { + log.Println(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + result, err := cli.nh.SyncPropose(ctx, cs, data) + cancel() + + if err != nil { + return nil, err + } + + if len(result.Data) == 0 { + return nil, nil + } + + var item QueueItem + err = item.Decode(result.Data) + if err != nil { + log.Println(err) + } + return &item, nil +} diff --git a/websocket_test.go b/websocket_test.go index 7d21a7b..52ef977 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -18,8 +18,8 @@ func TestWebsocketA(t *testing.T) { panic(err) } - nh := fusenrender.StartNode(svc) - fusenrender.HttpStart(nh, svc.Port-1000) + fusenrender.StartNode(svc) + } func TestWebsocketB(t *testing.T) { @@ -28,8 +28,8 @@ func TestWebsocketB(t *testing.T) { panic(err) } - nh := fusenrender.StartNode(svc) - fusenrender.HttpStart(nh, svc.Port-1000) + fusenrender.StartNode(svc) + } func TestWebsocketC(t *testing.T) { @@ -38,8 +38,8 @@ func TestWebsocketC(t *testing.T) { panic(err) } - nh := fusenrender.StartNode(svc) - fusenrender.HttpStart(nh, svc.Port-1000) + fusenrender.StartNode(svc) + } var addr = "localhost:4052"