package fusenrender import ( "context" "io" "log" "sync" "time" "github.com/474420502/execute/triggered" "github.com/474420502/passer" sm "github.com/lni/dragonboat/v4/statemachine" ) var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { fsPasser := passer.NewPasser[sm.Result]() fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) { var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) var consumer = cxt.Value(ctxConsumption{}).(*triggered.EventExecute[bool]) cmd := obj.(*CmdEnqueue) key := cmd.Item.GetKey() smqueue.mu.Lock() var queue *PriorityQueue[QueueItem] var ok bool if queue, ok = smqueue.queues[cmd.Group]; !ok { queue = NewPriorityQueue[QueueItem]() smqueue.queues[cmd.Group] = queue } smqueue.mu.Unlock() queue.Push(&Slice[QueueItem]{ Key: key, Value: cmd.Item, }) var result sm.Result consumer.Notify(consumer.NULL) // 通知可以执行update return result, nil }) fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) { var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) var e = cxt.Value(ctxEntry{}).(*sm.Entry) var queue *PriorityQueue[QueueItem] var ok bool cmd := obj.(*CmdDequeue) smqueue.mu.Lock() if queue, ok = smqueue.queues[cmd.Group]; !ok { queue = NewPriorityQueue[QueueItem]() smqueue.queues[cmd.Group] = queue } smqueue.mu.Unlock() var result sm.Result if queue.Empty() { return result, nil } item := queue.Pop() if item == nil { result.Value = 0 return result, nil } if item != nil { d, err := item.Encode() if err != nil { return result, err } e.Result.Data = d result.Data = d queue.Empty() size := queue.Size() smqueue.counter.Notify(size) // log.Println("queue remain:", queue.Size()) } return result, nil }) return fsPasser }() // SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列 type SMQueue struct { // 所属的Shard ID shardID uint64 // Replica ID replicaID uint64 // 互斥锁,保护队列Map的并发访问 mu *sync.Mutex // 组名到队列的映射 queues map[string]*PriorityQueue[QueueItem] counter *triggered.EventExecute[int64] } // // NewSMQueue creates and return a new ExampleStateMachine object. func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { mu := &sync.Mutex{} return &SMQueue{ shardID: shardID, replicaID: replicaID, mu: mu, queues: make(map[string]*PriorityQueue[QueueItem]), counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { log.Printf("queue remain: %d\n", params.Value) time.Sleep(time.Second * 5) }), } } // Lookup performs local lookup on the ExampleStateMachine instance. In this example, // we always return the Count value as a little endian binary encoded byte // slice. func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) { return item, nil } type ctxEntry struct{} type ctxSMQueue struct{} type ctxConsumption struct{} // Update处理Entry中的更新命令 // Update updates the object using the specified committed raft entry. func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { ctx := context.TODO() ctx = context.WithValue(ctx, ctxEntry{}, &e) ctx = context.WithValue(ctx, ctxSMQueue{}, s) ctx = context.WithValue(ctx, ctxConsumption{}, Consumption) return FsPasser.ExecuteWithBytes(ctx, e.Cmd) } // SaveSnapshot saves the current IStateMachine state into a snapshot using the // specified io.Writer object. func (s *SMQueue) SaveSnapshot(w io.Writer, fc sm.ISnapshotFileCollection, done <-chan struct{}) error { // as shown above, the only state that can be saved is the Count variable // there is no external file in this IStateMachine example, we thus leave // the fc untouched // s.mu.Lock() // defer s.mu.Unlock() // return gob.NewEncoder(w).Encode(&s.queues) return nil } // RecoverFromSnapshot recovers the state using the provided snapshot. func (s *SMQueue) RecoverFromSnapshot(r io.Reader, files []sm.SnapshotFile, done <-chan struct{}) error { // restore the Count variable, that is the only state we maintain in this // example, the input files is expected to be empty // err := gob.NewDecoder(r).Decode(&s.queues) // if err != nil { // return err // } return nil } // Close closes the IStateMachine instance. There is nothing for us to cleanup // or release as this is a pure in memory data store. Note that the Close // method is not guaranteed to be called as node can crash at any time. func (s *SMQueue) Close() error { return nil }