426 lines
9.3 KiB
426 lines
9.3 KiB
package fusenrender
import (
sm "github.com/lni/dragonboat/v4/statemachine"
type SMQueue struct {
shardID uint64
replicaID uint64
db *badger.DB
queue PriorityQueue[QueueItem]
type Command struct {
Name string `json:"name"`
Group *string `json:"group"`
Item *QueueItem `json:"item"`
type QueueItem struct {
Group string `json:"group"` // 组名
Priority uint32 `json:"priority"` // 处理的优先级
CreateAt time.Time `json:"create_at"` // 创建时间 统一utc
Data any `json:"data"` // 操作的数据结构
func (item *QueueItem) Encode() ([]byte, error) {
val, err := json.Marshal(item)
if err != nil {
return nil, err
return val, nil
func (item *QueueItem) GetKey() []byte {
return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
// // NewSMQueue creates and return a new ExampleStateMachine object.
func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
// opts := badger.DefaultOptions(datapath)
// db, err := badger.Open(opts)
// if err != nil {
// panic(err)
// }
return &SMQueue{
shardID: shardID,
replicaID: replicaID,
// db: db,
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) {
return item, nil
// func (q *SMQueue) GetKey(item *QueueItem) []byte {
// return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
// }
// Update updates the object using the specified committed raft entry.
func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
var cmd Command
err = json.Unmarshal(e.Cmd, &cmd)
if err != nil {
return result, err
switch cmd.Name {
case "enqueue":
d, err := cmd.Item.Encode()
if err != nil {
return result, err
err = s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(s.GetKey(cmd.Item)), d)
return sm.Result{Value: uint64(len(d))}, err
case "dequeue":
prefix := []byte(fmt.Sprintf("%s_", *cmd.Group))
err = s.db.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
if !it.ValidForPrefix(prefix) {
return nil
itemKey := it.Item().Key()
err = it.Item().Value(func(val []byte) error {
result.Data = val
return nil
if err != nil {
return txn.Delete(itemKey)
return result, err
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name)
// type KeyValue struct {
// Key []byte
// Val []byte
// }
// // SaveSnapshot saves the current IStateMachine state into a snapshot using the
// // specified io.Writer object.
// func (s *SMQueue) SaveSnapshot(w io.Writer,
// fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// // as shown above, the only state that can be saved is the Count variable
// // there is no external file in this IStateMachine example, we thus leave
// // the fc untouched
// // 创建一个只读事务
// txn := s.db.NewTransaction(false)
// // 在事务中读取数据
// iter := txn.NewIterator(badger.IteratorOptions{})
// var items []*KeyValue
// for iter.Rewind(); iter.Valid(); iter.Next() {
// item := iter.Item()
// err := item.Value(func(val []byte) error {
// // _, err := w.Write(val)
// items = append(items, &KeyValue{
// Key: item.Key(),
// Val: val,
// })
// return nil
// })
// if err != nil {
// return err
// }
// // 处理key-value
// }
// // 释放迭代器和事务
// iter.Close()
// txn.Discard()
// return gob.NewEncoder(w).Encode(&items)
// }
// // RecoverFromSnapshot recovers the state using the provided snapshot.
// func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
// files []sm.SnapshotFile,
// done <-chan struct{}) error {
// // restore the Count variable, that is the only state we maintain in this
// // example, the input files is expected to be empty
// var items []*KeyValue
// err := gob.NewDecoder(r).Decode(&items)
// if err != nil {
// return err
// }
// s.db.Update(func(txn *badger.Txn) error {
// for _, item := range items {
// if err := txn.Set(item.Key, item.Val); err != nil {
// return err
// }
// }
// return nil
// })
// return nil
// }
// // Close closes the IStateMachine instance. There is nothing for us to cleanup
// // or release as this is a pure in memory data store. Note that the Close
// // method is not guaranteed to be called as node can crash at any time.
// func (s *SMQueue) Close() error { return nil }
// import (
// "encoding/json"
// "fmt"
// "io"
// "log"
// "sync"
// "time"
// "github.com/dgraph-io/badger/v3"
// "github.com/hashicorp/raft"
// )
// type SMQueue struct {
// mu sync.Mutex
// queue *Queue
// ra *raft.Raft
// }
// func NewQueueFSM(datapath string) *SMQueue {
// q, err := NewQueue(datapath)
// if err != nil {
// panic(err)
// }
// return &SMQueue{
// queue: q,
// }
// }
// const (
// Enqueue raft.LogType = 20
// Dequeue raft.LogType = 21
// )
// func (q *SMQueue) LogInfo() {
// log.Println(q.ra.GetConfiguration().Configuration(), q.ra.State())
// }
// func (q *SMQueue) Put(item *QueueItem) error {
// data, err := item.Encode()
// if err != nil {
// return err
// }
// return q.PutJsonString(data)
// }
// func (q *SMQueue) PutJsonString(item []byte) error {
// task := ApplyTask{
// Name: "enqueue",
// Object: item,
// }
// d, err := task.Encode()
// if err != nil {
// return err
// }
// future := q.ra.Apply(d, time.Second*15)
// if future.Error() != nil {
// return future.Error()
// }
// resp := future.Response()
// if resp == nil {
// return nil
// }
// return resp.(error)
// }
// func (q *SMQueue) Pop(group string) (*QueueItem, error) {
// task := ApplyTask{
// Name: "dequeue",
// Object: []byte(group),
// }
// d, err := task.Encode()
// if err != nil {
// return nil, err
// }
// ierr := q.ra.Apply(d, time.Second*15)
// if ierr.Error() != nil {
// return nil, ierr.Error()
// }
// switch v := ierr.Response().(type) {
// case error:
// return nil, v
// case *QueueItem:
// return v, nil
// default:
// return nil, fmt.Errorf("unknown %v", v)
// }
// }
// func (q *SMQueue) Apply(log *raft.Log) interface{} {
// leader, id := q.ra.LeaderWithID()
// q.mu.Lock()
// defer q.mu.Unlock()
// var task ApplyTask
// err := task.Decode(log.Data)
// if err != nil {
// return err
// }
// switch task.Name {
// case "enqueue":
// var item QueueItem
// if err := json.Unmarshal(task.Object, &item); err != nil {
// return err
// }
// return q.queue.Enqueue(&item)
// case "dequeue":
// // log.Data 传入group
// item, err := q.queue.Dequeue(string(task.Object))
// if err != nil {
// return err
// }
// return item
// default:
// return fmt.Errorf("unknown type: %v", log.Type)
// }
// }
// type QueueSnapshot struct {
// Items []*QueueItem
// }
// // Persist writes the snapshot to the provided sink.
// func (snapshot *QueueSnapshot) Persist(sink raft.SnapshotSink) error {
// // The example has been simplified. In a production environment, you would
// // need to handle this operation with more care.
// return nil
// }
// // Release is invoked when the Raft instance is finished with the snapshot.
// func (snapshot *QueueSnapshot) Release() {
// // Normally you would put any cleanup here.
// }
// // Snapshot 返回队列快照
// func (fsm *SMQueue) Snapshot() (raft.FSMSnapshot, error) {
// var items []*QueueItem
// // 使用 Badger 读取所有队列项
// fsm.queue.db.View(func(txn *badger.Txn) error {
// opts := badger.DefaultIteratorOptions
// opts.PrefetchValues = false // 只需要key
// it := txn.NewIterator(opts)
// defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
// err := it.Item().Value(func(val []byte) error {
// item := &QueueItem{}
// err := json.Unmarshal(val, item)
// if err != nil {
// return err
// }
// items = append(items, item)
// return nil
// })
// if err != nil {
// log.Println(err)
// }
// }
// return nil
// })
// snapshot := &QueueSnapshot{Items: items}
// return snapshot, nil
// }
// // Restore 恢复队列状态
// func (fsm *SMQueue) Restore(rc io.ReadCloser) error {
// snapshot := &QueueSnapshot{}
// if err := json.NewDecoder(rc).Decode(snapshot); err != nil {
// return err
// }
// // 用快照数据重建队列
// fsm.queue.db.Update(func(txn *badger.Txn) error {
// for _, item := range snapshot.Items {
// val, err := item.Encode()
// if err != nil {
// log.Println(err)
// continue
// }
// if err := txn.Set(fsm.queue.GetKey(item), val); err != nil {
// return err
// }
// }
// return nil
// })
// return nil
// }
// func waitForCluster(ra *raft.Raft) {
// ticker := time.NewTicker(500 * time.Millisecond)
// defer ticker.Stop()
// for range ticker.C {
// state := ra.State()
// if state == raft.Leader || state == raft.Follower {
// log.Println("Raft cluster is running")
// return
// } else {
// log.Println("Still waiting for the cluster to start...")
// }
// }
// }