package fusenrender import ( "context" "encoding/gob" "fmt" "io" "runtime" "sync" "github.com/474420502/execute/triggered" sm "github.com/lni/dragonboat/v4/statemachine" ) // SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列 type SMQueue struct { // 所属的Shard ID shardID uint64 // Replica ID replicaID uint64 // 互斥锁,保护队列Map的并发访问 mu *sync.Mutex // 组名到队列的映射 queues map[string]*PriorityQueue[QueueItem] counter *triggered.EventExecute[int64] } // // NewSMQueue creates and return a new ExampleStateMachine object. func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { mu := &sync.Mutex{} return &SMQueue{ shardID: shardID, replicaID: replicaID, mu: mu, queues: make(map[string]*PriorityQueue[QueueItem]), counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { if params.Value != 0 { var m runtime.MemStats runtime.ReadMemStats(&m) allocMB := float64(m.Alloc) / 1024 / 1024 // totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 sysMB := float64(m.Sys) / 1024 / 1024 fmt.Printf("队列堆积数据: %d, Alloc = %.2f MB, Sys = %.2fMB\n", params.Value, allocMB, sysMB) } }), } } // Lookup performs local lookup on the ExampleStateMachine instance. In this example, // we always return the Count value as a little endian binary encoded byte // slice. func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) { return item, nil } type ctxEntry struct{} type ctxSMQueue struct{} type ctxDequeueHandler struct{} // Update处理Entry中的更新命令 // Update updates the object using the specified committed raft entry. func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { ctx := context.TODO() ctx = context.WithValue(ctx, ctxEntry{}, &e) ctx = context.WithValue(ctx, ctxSMQueue{}, s) ctx = context.WithValue(ctx, ctxDequeueHandler{}, DequeueHandler) return FsPasser.ExecuteWithBytes(ctx, e.Cmd) } // SaveSnapshot saves the current IStateMachine state into a snapshot using the // specified io.Writer object. func (s *SMQueue) SaveSnapshot(w io.Writer, fc sm.ISnapshotFileCollection, done <-chan struct{}) error { // as shown above, the only state that can be saved is the Count variable // there is no external file in this IStateMachine example, we thus leave // the fc untouched s.mu.Lock() defer s.mu.Unlock() return gob.NewEncoder(w).Encode(&s.queues) // return nil } // RecoverFromSnapshot recovers the state using the provided snapshot. func (s *SMQueue) RecoverFromSnapshot(r io.Reader, files []sm.SnapshotFile, done <-chan struct{}) error { // restore the Count variable, that is the only state we maintain in this // example, the input files is expected to be empty err := gob.NewDecoder(r).Decode(&s.queues) if err != nil { return err } return nil } // Close closes the IStateMachine instance. There is nothing for us to cleanup // or release as this is a pure in memory data store. Note that the Close // method is not guaranteed to be called as node can crash at any time. func (s *SMQueue) Close() error { return nil }