fusen-render/sm.go

175 lines
4.0 KiB
Go
Raw Normal View History

2023-07-29 13:58:11 +00:00
package fusenrender
import (
2023-07-29 22:50:58 +00:00
"encoding/gob"
2023-07-29 13:58:11 +00:00
"encoding/json"
"fmt"
2023-07-29 22:50:58 +00:00
"io"
2023-07-29 13:58:11 +00:00
"log"
2023-07-29 22:50:58 +00:00
"sync"
2023-07-29 13:58:11 +00:00
"time"
sm "github.com/lni/dragonboat/v4/statemachine"
)
type SMQueue struct {
2023-07-29 21:11:04 +00:00
shardID uint64
replicaID uint64
2023-07-29 22:50:58 +00:00
mu sync.Mutex
queues map[string]*PriorityQueue[QueueItem]
2023-07-29 13:58:11 +00:00
}
type Command struct {
2023-07-29 22:50:58 +00:00
Name string `json:"name"`
Group string `json:"group"`
2023-07-29 13:58:11 +00:00
2023-07-29 22:50:58 +00:00
Item *QueueItem `json:"item"`
}
func (cmd *Command) Encode() ([]byte, error) {
val, err := json.Marshal(cmd)
if err != nil {
return nil, err
}
return val, nil
2023-07-29 13:58:11 +00:00
}
type QueueItem struct {
Group string `json:"group"` // 组名
Priority uint32 `json:"priority"` // 处理的优先级
CreateAt time.Time `json:"create_at"` // 创建时间 统一utc
Data any `json:"data"` // 操作的数据结构
}
func (item *QueueItem) Encode() ([]byte, error) {
val, err := json.Marshal(item)
if err != nil {
return nil, err
}
return val, nil
}
2023-07-29 21:11:04 +00:00
func (item *QueueItem) GetKey() []byte {
return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
}
// // NewSMQueue creates and return a new ExampleStateMachine object.
func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
2023-07-29 13:58:11 +00:00
return &SMQueue{
2023-07-29 21:11:04 +00:00
shardID: shardID,
replicaID: replicaID,
2023-07-29 22:50:58 +00:00
queues: make(map[string]*PriorityQueue[QueueItem]),
2023-07-29 13:58:11 +00:00
}
}
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) {
return item, nil
}
// Update updates the object using the specified committed raft entry.
func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
var cmd Command
err = json.Unmarshal(e.Cmd, &cmd)
if err != nil {
return result, err
}
switch cmd.Name {
case "enqueue":
d, err := cmd.Item.Encode()
if err != nil {
return result, err
}
2023-07-29 22:50:58 +00:00
key := cmd.Item.GetKey()
s.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = s.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
s.queues[cmd.Group] = queue
}
defer s.mu.Unlock()
2023-07-29 13:58:11 +00:00
2023-07-29 22:50:58 +00:00
queue.Push(&Slice[QueueItem]{
Key: key,
Value: cmd.Item,
2023-07-29 13:58:11 +00:00
})
2023-07-29 22:50:58 +00:00
result.Value = uint64(len(d))
2023-07-29 13:58:11 +00:00
2023-07-29 22:50:58 +00:00
log.Printf("%#v, %d, %#v", s.queues, len(queue.Queue), queue.Queue)
2023-07-29 13:58:11 +00:00
2023-07-29 22:50:58 +00:00
return result, err
case "dequeue":
2023-07-29 13:58:11 +00:00
2023-07-29 22:50:58 +00:00
// prefix := []byte(fmt.Sprintf("%s_", cmd.Group))
s.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = s.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
s.queues[cmd.Group] = queue
}
defer s.mu.Unlock()
2023-07-29 13:58:11 +00:00
2023-07-29 22:50:58 +00:00
item := queue.Pop()
if item != nil {
d, err := item.Encode()
2023-07-29 13:58:11 +00:00
if err != nil {
2023-07-29 22:50:58 +00:00
return result, err
2023-07-29 13:58:11 +00:00
}
2023-07-29 22:50:58 +00:00
e.Result.Data = d
result.Data = d
}
log.Printf("%#v", s.queues)
2023-07-29 13:58:11 +00:00
return result, err
default:
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name)
}
}
2023-07-29 22:50:58 +00:00
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
func (s *SMQueue) SaveSnapshot(w io.Writer,
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
// the fc untouched
s.mu.Lock()
defer s.mu.Unlock()
return gob.NewEncoder(w).Encode(&s.queues)
}
// RecoverFromSnapshot recovers the state using the provided snapshot.
func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty
err := gob.NewDecoder(r).Decode(&s.queues)
if err != nil {
return err
}
return nil
}
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
func (s *SMQueue) Close() error { return nil }