fusen-render/sm.go

108 lines
3.1 KiB
Go
Raw Permalink Normal View History

2023-07-29 13:58:11 +00:00
package fusenrender
import (
2023-08-02 10:16:05 +00:00
"context"
2023-08-07 11:03:22 +00:00
"encoding/gob"
2023-07-29 22:50:58 +00:00
"io"
2023-10-07 06:02:59 +00:00
"log"
2023-08-05 20:26:57 +00:00
"runtime"
2023-07-29 22:50:58 +00:00
"sync"
2023-11-14 03:45:46 +00:00
"time"
2023-07-29 13:58:11 +00:00
2023-08-01 03:14:20 +00:00
"github.com/474420502/execute/triggered"
2023-07-29 13:58:11 +00:00
sm "github.com/lni/dragonboat/v4/statemachine"
)
2023-08-02 10:16:05 +00:00
// SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列
type SMQueue struct {
// 所属的Shard ID
shardID uint64
// Replica ID
replicaID uint64
// 互斥锁,保护队列Map的并发访问
mu *sync.Mutex
// 组名到队列的映射
2023-08-09 11:02:32 +00:00
queues map[string]*PriorityQueue[QueueItem]
2023-08-02 10:16:05 +00:00
counter *triggered.EventExecute[int64]
}
// // NewSMQueue creates and return a new ExampleStateMachine object.
func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
mu := &sync.Mutex{}
return &SMQueue{
shardID: shardID,
replicaID: replicaID,
mu: mu,
queues: make(map[string]*PriorityQueue[QueueItem]),
counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) {
2023-11-14 03:45:46 +00:00
var m runtime.MemStats
runtime.ReadMemStats(&m)
allocMB := float64(m.Alloc) / 1024 / 1024
// totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024
sysMB := float64(m.Sys) / 1024 / 1024
log.Printf("队列堆积数据: %d, Alloc = %.2f MB, Sys = %.2fMB\n", params.Value, allocMB, sysMB)
time.Sleep(time.Second * 10) // 每超过10秒 统计一次
2023-08-02 10:16:05 +00:00
}),
2023-07-29 13:58:11 +00:00
}
2023-08-02 10:16:05 +00:00
}
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) {
return item, nil
}
type ctxEntry struct{}
type ctxSMQueue struct{}
type ctxDequeueHandler struct{}
2023-08-02 10:16:05 +00:00
// Update处理Entry中的更新命令
// Update updates the object using the specified committed raft entry.
func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
ctx := context.TODO()
2023-07-29 13:58:11 +00:00
2023-08-02 10:16:05 +00:00
ctx = context.WithValue(ctx, ctxEntry{}, &e)
ctx = context.WithValue(ctx, ctxSMQueue{}, s)
ctx = context.WithValue(ctx, ctxDequeueHandler{}, DequeueHandler)
2023-08-02 10:16:05 +00:00
return FsPasser.ExecuteWithBytes(ctx, e.Cmd)
2023-07-29 13:58:11 +00:00
}
2023-07-29 22:50:58 +00:00
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
func (s *SMQueue) SaveSnapshot(w io.Writer,
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
// the fc untouched
2023-08-07 11:03:22 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2023-07-29 22:50:58 +00:00
2023-08-07 11:03:22 +00:00
return gob.NewEncoder(w).Encode(&s.queues)
// return nil
2023-07-29 22:50:58 +00:00
}
// RecoverFromSnapshot recovers the state using the provided snapshot.
func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty
2023-08-07 11:03:22 +00:00
err := gob.NewDecoder(r).Decode(&s.queues)
if err != nil {
return err
}
2023-07-29 22:50:58 +00:00
return nil
}
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
func (s *SMQueue) Close() error { return nil }