package fusenrender import ( "encoding/gob" "encoding/json" "fmt" "io" "log" "sync" "time" "github.com/474420502/batchexecute" sm "github.com/lni/dragonboat/v4/statemachine" ) // SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列 type SMQueue struct { // 所属的Shard ID shardID uint64 // Replica ID replicaID uint64 // 互斥锁,保护队列Map的并发访问 mu *sync.Mutex // 组名到队列的映射 queues map[string]*PriorityQueue[QueueItem] } type Command struct { Name string `json:"name"` Group string `json:"group"` Item *QueueItem `json:"item"` } func (cmd *Command) Encode() ([]byte, error) { val, err := json.Marshal(cmd) if err != nil { return nil, err } return val, nil } // QueueItem表示队列中的一个项 type QueueItem struct { // 队列组名 Group string `json:"group"` // 组名 // 优先级 Priority uint32 `json:"priority"` // 处理的优先级 // 创建时间 CreateAt time.Time `json:"create_at"` // 创建时间 统一utc // 要排队的数据 Data any `json:"data"` // 操作的数据结构 } func (item *QueueItem) Encode() ([]byte, error) { val, err := json.Marshal(item) if err != nil { return nil, err } return val, nil } func (item *QueueItem) Decode(buf []byte) error { err := json.Unmarshal(buf, item) if err != nil { return err } return nil } func (item *QueueItem) GetKey() []byte { return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix())) } // // NewSMQueue creates and return a new ExampleStateMachine object. func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { mu := &sync.Mutex{} return &SMQueue{ shardID: shardID, replicaID: replicaID, counter: batchexecute.NewBatchExecute[int64](func(params *int64) { log.Printf("queue remain: %d\n", *params) time.Sleep(time.Second * 5) }), mu: mu, queues: make(map[string]*PriorityQueue[QueueItem]), } } // Lookup performs local lookup on the ExampleStateMachine instance. In this example, // we always return the Count value as a little endian binary encoded byte // slice. func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) { return item, nil } // Update处理Entry中的更新命令 // Update updates the object using the specified committed raft entry. func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { var cmd Command err = json.Unmarshal(e.Cmd, &cmd) if err != nil { return result, err } switch cmd.Name { case "enqueue": d, err := cmd.Item.Encode() if err != nil { return result, err } key := cmd.Item.GetKey() s.mu.Lock() var queue *PriorityQueue[QueueItem] var ok bool if queue, ok = s.queues[cmd.Group]; !ok { queue = NewPriorityQueue[QueueItem]() s.queues[cmd.Group] = queue } s.mu.Unlock() queue.Push(&Slice[QueueItem]{ Key: key, Value: cmd.Item, }) result.Value = uint64(len(d)) BatchQueueExecute.Notify(Consumption, nil) // 通知可以执行update return result, err case "dequeue": var queue *PriorityQueue[QueueItem] var ok bool s.mu.Lock() if queue, ok = s.queues[cmd.Group]; !ok { queue = NewPriorityQueue[QueueItem]() s.queues[cmd.Group] = queue } s.mu.Unlock() if queue.Empty() { return result, nil } item := queue.Pop() if item == nil { result.Value = 0 return result, nil } if item != nil { d, err := item.Encode() if err != nil { return result, err } e.Result.Data = d result.Data = d queue.Empty() size := queue.Size() go s.counter.Execute(&size) // log.Println("queue remain:", queue.Size()) } return result, err default: return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name) } } // SaveSnapshot saves the current IStateMachine state into a snapshot using the // specified io.Writer object. func (s *SMQueue) SaveSnapshot(w io.Writer, fc sm.ISnapshotFileCollection, done <-chan struct{}) error { // as shown above, the only state that can be saved is the Count variable // there is no external file in this IStateMachine example, we thus leave // the fc untouched s.mu.Lock() defer s.mu.Unlock() return gob.NewEncoder(w).Encode(&s.queues) } // RecoverFromSnapshot recovers the state using the provided snapshot. func (s *SMQueue) RecoverFromSnapshot(r io.Reader, files []sm.SnapshotFile, done <-chan struct{}) error { // restore the Count variable, that is the only state we maintain in this // example, the input files is expected to be empty err := gob.NewDecoder(r).Decode(&s.queues) if err != nil { return err } return nil } // Close closes the IStateMachine instance. There is nothing for us to cleanup // or release as this is a pure in memory data store. Note that the Close // method is not guaranteed to be called as node can crash at any time. func (s *SMQueue) Close() error { return nil }