90 lines
2.0 KiB
Go
90 lines
2.0 KiB
Go
package fusenrender
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/474420502/execute/triggered"
|
|
"github.com/474420502/passer"
|
|
sm "github.com/lni/dragonboat/v4/statemachine"
|
|
)
|
|
|
|
// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler
|
|
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
|
|
|
|
fsPasser := passer.NewPasser[sm.Result]()
|
|
|
|
fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) {
|
|
|
|
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
|
|
var dequeueHandler = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool])
|
|
|
|
cmd := obj.(*CmdEnqueue)
|
|
key := cmd.Item.GetKey()
|
|
|
|
smqueue.mu.Lock()
|
|
var queue *PriorityQueue[QueueItem]
|
|
var ok bool
|
|
if queue, ok = smqueue.queues[cmd.Group]; !ok {
|
|
queue = NewPriorityQueue[QueueItem]()
|
|
smqueue.queues[cmd.Group] = queue
|
|
}
|
|
smqueue.mu.Unlock()
|
|
|
|
queue.Push(&Slice[QueueItem]{
|
|
Key: key,
|
|
Value: cmd.Item,
|
|
})
|
|
|
|
var result sm.Result
|
|
dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update
|
|
return result, nil
|
|
})
|
|
|
|
fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) {
|
|
|
|
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
|
|
var e = cxt.Value(ctxEntry{}).(*sm.Entry)
|
|
|
|
var queue *PriorityQueue[QueueItem]
|
|
var ok bool
|
|
cmd := obj.(*CmdDequeue)
|
|
smqueue.mu.Lock()
|
|
if queue, ok = smqueue.queues[cmd.Group]; !ok {
|
|
queue = NewPriorityQueue[QueueItem]()
|
|
smqueue.queues[cmd.Group] = queue
|
|
}
|
|
smqueue.mu.Unlock()
|
|
|
|
var result sm.Result
|
|
if queue.Empty() {
|
|
return result, nil
|
|
}
|
|
|
|
item := queue.Pop()
|
|
if item == nil {
|
|
result.Value = 0
|
|
return result, nil
|
|
}
|
|
|
|
if item != nil {
|
|
d, err := item.Encode()
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
e.Result.Data = d
|
|
result.Data = d
|
|
if !queue.Empty() {
|
|
size := queue.Size()
|
|
|
|
smqueue.counter.Notify(size)
|
|
}
|
|
|
|
// log.Println("queue remain:", queue.Size())
|
|
}
|
|
|
|
return result, nil
|
|
})
|
|
|
|
return fsPasser
|
|
}()
|