From 1af5dba6e57732bbe99f81ee7019221304083ab8 Mon Sep 17 00:00:00 2001 From: eson <9673575+githubcontent@user.noreply.gitee.com> Date: Wed, 2 Aug 2023 18:16:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=9C=80=E6=96=B0=E7=89=88?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- command.go | 66 ++++++++++++++++ go.mod | 1 + go.sum | 2 + main.go | 19 ++--- sm.go | 207 +++++++++++++++++++++----------------------------- start_test.go | 19 ++--- 6 files changed, 173 insertions(+), 141 deletions(-) create mode 100644 command.go diff --git a/command.go b/command.go new file mode 100644 index 0000000..27bd976 --- /dev/null +++ b/command.go @@ -0,0 +1,66 @@ +package fusenrender + +import ( + "encoding/json" + "fmt" + "time" +) + +type CmdEnqueue struct { + Command + Item *QueueItem `json:"item"` +} + +type CmdDequeue struct { + Command +} + +type Command struct { + Group string `json:"group"` +} + +// func (cmd *Command) Encode() ([]byte, error) { +// val, err := json.Marshal(cmd) +// if err != nil { +// return nil, err +// } +// return val, nil +// } + +// func (cmd *Command) Decode(data []byte) error { + +// err := json.Unmarshal(data, cmd) +// return err +// } + +// QueueItem表示队列中的一个项 +type QueueItem struct { + // 队列组名 + Group string `json:"group"` // 组名 + // 优先级 + Priority uint32 `json:"priority"` // 处理的优先级 + // 创建时间 + CreateAt time.Time `json:"create_at"` // 创建时间 统一utc + // 要排队的数据 + Data any `json:"data"` // 操作的数据结构 +} + +func (item *QueueItem) Encode() ([]byte, error) { + val, err := json.Marshal(item) + if err != nil { + return nil, err + } + return val, nil +} + +func (item *QueueItem) Decode(buf []byte) error { + err := json.Unmarshal(buf, item) + if err != nil { + return err + } + return nil +} + +func (item *QueueItem) GetKey() []byte { + return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix())) +} diff --git a/go.mod b/go.mod index 276d283..9abd2a8 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( ) require ( + github.com/474420502/passer v0.0.1 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/VictoriaMetrics/metrics v1.18.1 // indirect diff --git a/go.sum b/go.sum index 1681bf9..2b64316 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/474420502/execute v0.1.1 h1:lMG/f/NOSScD10Yyqkazd2uAgW8Ogj0ZLG/Pm7lsYE8= github.com/474420502/execute v0.1.1/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= +github.com/474420502/passer v0.0.1 h1:ZWnt7hpFzsYDV7LHSEyLvLUvW5mRxrnDmgFdIl17q3w= +github.com/474420502/passer v0.0.1/go.mod h1:MmnnrF9d51sPkFzdRq2pQtxQKqyjburVM1LjMbOCezE= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/main.go b/main.go index f98ca03..cecfc10 100644 --- a/main.go +++ b/main.go @@ -22,21 +22,22 @@ func main() { } +func init() { + +} + var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) { + var nh *dragonboat.NodeHost params.Shared.Value(func(v any) { nh = v.(*dragonboat.NodeHost) }) cs := nh.GetNoOPSession(128) - for { + for i := 0; ; i++ { - cmd := Command{ - Name: "dequeue", - Group: "test", - } - - data, err := cmd.Encode() + cmd := &CmdDequeue{Command: Command{Group: "test"}} + data, err := FsPasser.PackToBytes(cmd) if err != nil { log.Println(err) } @@ -49,7 +50,7 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) break } else { if len(result.Data) == 0 { - log.Println("wait 10 second") + // log.Println("wait 10 second") var m runtime.MemStats runtime.ReadMemStats(&m) @@ -58,7 +59,7 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 sysMB := float64(m.Sys) / 1024 / 1024 - fmt.Printf("Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", allocMB, totalAllocMB, sysMB) + fmt.Printf("dequeue count %d, Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", i, allocMB, totalAllocMB, sysMB) time.Sleep(time.Second * 10) break } else { diff --git a/sm.go b/sm.go index 059e3e3..476f8c2 100644 --- a/sm.go +++ b/sm.go @@ -1,18 +1,94 @@ package fusenrender import ( + "context" "encoding/gob" - "encoding/json" - "fmt" "io" "log" "sync" "time" "github.com/474420502/execute/triggered" + "github.com/474420502/passer" sm "github.com/lni/dragonboat/v4/statemachine" ) +var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { + + fsPasser := passer.NewPasser[sm.Result]() + fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) { + + var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) + var consumer = cxt.Value(ctxConsumption{}).(*triggered.EventExecute[bool]) + + cmd := obj.(*CmdEnqueue) + key := cmd.Item.GetKey() + + smqueue.mu.Lock() + var queue *PriorityQueue[QueueItem] + var ok bool + if queue, ok = smqueue.queues[cmd.Group]; !ok { + queue = NewPriorityQueue[QueueItem]() + smqueue.queues[cmd.Group] = queue + } + smqueue.mu.Unlock() + + queue.Push(&Slice[QueueItem]{ + Key: key, + Value: cmd.Item, + }) + + var result sm.Result + consumer.Notify(consumer.NULL) // 通知可以执行update + return result, nil + }) + + fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) { + + var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) + var e = cxt.Value(ctxEntry{}).(*sm.Entry) + + var queue *PriorityQueue[QueueItem] + var ok bool + cmd := obj.(*CmdDequeue) + smqueue.mu.Lock() + if queue, ok = smqueue.queues[cmd.Group]; !ok { + queue = NewPriorityQueue[QueueItem]() + smqueue.queues[cmd.Group] = queue + } + smqueue.mu.Unlock() + + var result sm.Result + if queue.Empty() { + return result, nil + } + + item := queue.Pop() + if item == nil { + result.Value = 0 + return result, nil + } + + if item != nil { + d, err := item.Encode() + if err != nil { + return result, err + } + e.Result.Data = d + result.Data = d + queue.Empty() + size := queue.Size() + + smqueue.counter.Notify(size) + // log.Println("queue remain:", queue.Size()) + } + + return result, nil + }) + + return fsPasser +}() + // SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列 type SMQueue struct { // 所属的Shard ID @@ -28,53 +104,6 @@ type SMQueue struct { counter *triggered.EventExecute[int64] } -type Command struct { - Name string `json:"name"` - Group string `json:"group"` - - Item *QueueItem `json:"item"` -} - -func (cmd *Command) Encode() ([]byte, error) { - val, err := json.Marshal(cmd) - if err != nil { - return nil, err - } - return val, nil -} - -// QueueItem表示队列中的一个项 -type QueueItem struct { - // 队列组名 - Group string `json:"group"` // 组名 - // 优先级 - Priority uint32 `json:"priority"` // 处理的优先级 - // 创建时间 - CreateAt time.Time `json:"create_at"` // 创建时间 统一utc - // 要排队的数据 - Data any `json:"data"` // 操作的数据结构 -} - -func (item *QueueItem) Encode() ([]byte, error) { - val, err := json.Marshal(item) - if err != nil { - return nil, err - } - return val, nil -} - -func (item *QueueItem) Decode(buf []byte) error { - err := json.Unmarshal(buf, item) - if err != nil { - return err - } - return nil -} - -func (item *QueueItem) GetKey() []byte { - return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix())) -} - // // NewSMQueue creates and return a new ExampleStateMachine object. func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { mu := &sync.Mutex{} @@ -100,81 +129,19 @@ func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) { return item, nil } +type ctxEntry struct{} +type ctxSMQueue struct{} +type ctxConsumption struct{} + // Update处理Entry中的更新命令 // Update updates the object using the specified committed raft entry. func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { + ctx := context.TODO() - var cmd Command - - err = json.Unmarshal(e.Cmd, &cmd) - if err != nil { - return result, err - } - - switch cmd.Name { - case "enqueue": - d, err := cmd.Item.Encode() - if err != nil { - return result, err - } - key := cmd.Item.GetKey() - - s.mu.Lock() - var queue *PriorityQueue[QueueItem] - var ok bool - if queue, ok = s.queues[cmd.Group]; !ok { - queue = NewPriorityQueue[QueueItem]() - s.queues[cmd.Group] = queue - } - s.mu.Unlock() - - queue.Push(&Slice[QueueItem]{ - Key: key, - Value: cmd.Item, - }) - result.Value = uint64(len(d)) - Consumption.Notify(Consumption.NULL) // 通知可以执行update - return result, err - case "dequeue": - var queue *PriorityQueue[QueueItem] - var ok bool - - s.mu.Lock() - if queue, ok = s.queues[cmd.Group]; !ok { - queue = NewPriorityQueue[QueueItem]() - s.queues[cmd.Group] = queue - } - s.mu.Unlock() - - if queue.Empty() { - return result, nil - } - - item := queue.Pop() - if item == nil { - result.Value = 0 - return result, nil - } - - if item != nil { - d, err := item.Encode() - if err != nil { - return result, err - } - e.Result.Data = d - result.Data = d - queue.Empty() - size := queue.Size() - - s.counter.Notify(size) - // log.Println("queue remain:", queue.Size()) - } - - return result, err - default: - return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name) - } - + ctx = context.WithValue(ctx, ctxEntry{}, &e) + ctx = context.WithValue(ctx, ctxSMQueue{}, s) + ctx = context.WithValue(ctx, ctxConsumption{}, Consumption) + return FsPasser.ExecuteWithBytes(ctx, e.Cmd) } // SaveSnapshot saves the current IStateMachine state into a snapshot using the diff --git a/start_test.go b/start_test.go index 46bde4d..44c40e6 100644 --- a/start_test.go +++ b/start_test.go @@ -44,13 +44,10 @@ func TestStartNodeA(t *testing.T) { Data: uuid.New().String(), } - cmd := fusenrender.Command{ - Name: "enqueue", - Group: "test", - Item: item, - } + cmd := &fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}} + cmd.Item = item + data, err := fusenrender.FsPasser.PackToBytes(cmd) - data, err := cmd.Encode() if err != nil { log.Println(err) } @@ -101,13 +98,11 @@ func TestStartNodeB(t *testing.T) { CreateAt: time.Now(), Data: uuid.New().String(), } - cmd := fusenrender.Command{ - Name: "enqueue", - Group: "test", - Item: item, - } - data, err := cmd.Encode() + cmd := fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}} + cmd.Item = item + data, err := fusenrender.FsPasser.PackToBytes(&cmd) + if err != nil { log.Println(err) }