185 lines
4.7 KiB
Go
185 lines
4.7 KiB
Go
package fusenrender
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/474420502/execute/triggered"
|
|
"github.com/474420502/passer"
|
|
sm "github.com/lni/dragonboat/v4/statemachine"
|
|
)
|
|
|
|
// 结构体异步传递后, 执行的注册函数
|
|
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
|
|
|
|
fsPasser := passer.NewPasser[sm.Result]()
|
|
fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) {
|
|
|
|
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
|
|
var consumer = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool])
|
|
|
|
cmd := obj.(*CmdEnqueue)
|
|
key := cmd.Item.GetKey()
|
|
|
|
smqueue.mu.Lock()
|
|
var queue *PriorityQueue[QueueItem]
|
|
var ok bool
|
|
if queue, ok = smqueue.queues[cmd.Group]; !ok {
|
|
queue = NewPriorityQueue[QueueItem]()
|
|
smqueue.queues[cmd.Group] = queue
|
|
}
|
|
smqueue.mu.Unlock()
|
|
|
|
queue.Push(&Slice[QueueItem]{
|
|
Key: key,
|
|
Value: cmd.Item,
|
|
})
|
|
|
|
var result sm.Result
|
|
consumer.Notify(consumer.NULL) // 通知可以执行update
|
|
return result, nil
|
|
})
|
|
|
|
fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) {
|
|
|
|
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
|
|
var e = cxt.Value(ctxEntry{}).(*sm.Entry)
|
|
|
|
var queue *PriorityQueue[QueueItem]
|
|
var ok bool
|
|
cmd := obj.(*CmdDequeue)
|
|
smqueue.mu.Lock()
|
|
if queue, ok = smqueue.queues[cmd.Group]; !ok {
|
|
queue = NewPriorityQueue[QueueItem]()
|
|
smqueue.queues[cmd.Group] = queue
|
|
}
|
|
smqueue.mu.Unlock()
|
|
|
|
var result sm.Result
|
|
if queue.Empty() {
|
|
return result, nil
|
|
}
|
|
|
|
item := queue.Pop()
|
|
if item == nil {
|
|
result.Value = 0
|
|
return result, nil
|
|
}
|
|
|
|
if item != nil {
|
|
d, err := item.Encode()
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
e.Result.Data = d
|
|
result.Data = d
|
|
queue.Empty()
|
|
size := queue.Size()
|
|
|
|
smqueue.counter.Notify(size)
|
|
// log.Println("queue remain:", queue.Size())
|
|
}
|
|
|
|
return result, nil
|
|
})
|
|
|
|
return fsPasser
|
|
}()
|
|
|
|
// SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列
|
|
type SMQueue struct {
|
|
// 所属的Shard ID
|
|
shardID uint64
|
|
// Replica ID
|
|
replicaID uint64
|
|
|
|
// 互斥锁,保护队列Map的并发访问
|
|
mu *sync.Mutex
|
|
// 组名到队列的映射
|
|
queues map[string]*PriorityQueue[QueueItem]
|
|
|
|
counter *triggered.EventExecute[int64]
|
|
}
|
|
|
|
// // NewSMQueue creates and return a new ExampleStateMachine object.
|
|
func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
|
|
mu := &sync.Mutex{}
|
|
return &SMQueue{
|
|
shardID: shardID,
|
|
replicaID: replicaID,
|
|
|
|
mu: mu,
|
|
queues: make(map[string]*PriorityQueue[QueueItem]),
|
|
|
|
counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) {
|
|
if params.Value != 0 {
|
|
log.Printf("queue remain: %d\n", params.Value)
|
|
time.Sleep(time.Second * 5)
|
|
} else {
|
|
time.Sleep(time.Second * 15)
|
|
}
|
|
}),
|
|
}
|
|
}
|
|
|
|
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
|
|
// we always return the Count value as a little endian binary encoded byte
|
|
// slice.
|
|
func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) {
|
|
|
|
return item, nil
|
|
}
|
|
|
|
type ctxEntry struct{}
|
|
type ctxSMQueue struct{}
|
|
type ctxDequeueHandler struct{}
|
|
|
|
// Update处理Entry中的更新命令
|
|
// Update updates the object using the specified committed raft entry.
|
|
func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
|
|
ctx := context.TODO()
|
|
|
|
ctx = context.WithValue(ctx, ctxEntry{}, &e)
|
|
ctx = context.WithValue(ctx, ctxSMQueue{}, s)
|
|
ctx = context.WithValue(ctx, ctxDequeueHandler{}, DequeueHandler)
|
|
return FsPasser.ExecuteWithBytes(ctx, e.Cmd)
|
|
}
|
|
|
|
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
|
|
// specified io.Writer object.
|
|
func (s *SMQueue) SaveSnapshot(w io.Writer,
|
|
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
|
|
// as shown above, the only state that can be saved is the Count variable
|
|
// there is no external file in this IStateMachine example, we thus leave
|
|
// the fc untouched
|
|
|
|
// s.mu.Lock()
|
|
// defer s.mu.Unlock()
|
|
|
|
// return gob.NewEncoder(w).Encode(&s.queues)
|
|
return nil
|
|
}
|
|
|
|
// RecoverFromSnapshot recovers the state using the provided snapshot.
|
|
func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
|
|
files []sm.SnapshotFile,
|
|
done <-chan struct{}) error {
|
|
// restore the Count variable, that is the only state we maintain in this
|
|
// example, the input files is expected to be empty
|
|
|
|
// err := gob.NewDecoder(r).Decode(&s.queues)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the IStateMachine instance. There is nothing for us to cleanup
|
|
// or release as this is a pure in memory data store. Note that the Close
|
|
// method is not guaranteed to be called as node can crash at any time.
|
|
func (s *SMQueue) Close() error { return nil }
|