fusen-render/sm_upate_handler.go
2023-10-07 11:54:55 +08:00

89 lines
1.9 KiB
Go

package fusenrender
import (
"context"
"github.com/474420502/execute/triggered"
"github.com/474420502/passer"
sm "github.com/lni/dragonboat/v4/statemachine"
)
// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
fsPasser := passer.NewPasser[sm.Result]()
fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var dequeueHandler = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool])
cmd := obj.(*CmdEnqueue)
key := cmd.Item.GetKey()
smqueue.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
queue.Push(&Slice[QueueItem]{
Key: key,
Value: cmd.Item,
})
var result sm.Result
dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update
return result, nil
})
fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var e = cxt.Value(ctxEntry{}).(*sm.Entry)
var queue *PriorityQueue[QueueItem]
var ok bool
cmd := obj.(*CmdDequeue)
smqueue.mu.Lock()
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
var result sm.Result
if queue.Empty() {
return result, nil
}
item := queue.Pop()
if item == nil {
result.Value = 0
return result, nil
}
if item != nil {
d, err := item.Encode()
if err != nil {
return result, err
}
e.Result.Data = d
result.Data = d
if !queue.Empty() {
smqueue.counter.Notify(queue.Size())
}
// log.Println("queue remain:", queue.Size())
}
return result, nil
})
return fsPasser
}()