fusenapi/shared/sm.go
2023-08-22 13:43:13 +08:00

91 lines
2.5 KiB
Go

package shared
import (
"context"
"encoding/gob"
"io"
"sync"
sm "github.com/lni/dragonboat/v4/statemachine"
"gorm.io/gorm"
)
// FsStateMachine is a simple key-value store as an FSM.
type FsStateMachine struct {
shardID uint64
replicaID uint64
mu sync.Mutex
store map[int64]*UserState
gdb *gorm.DB
// waiter *WaitCallback
// ra *raft.Raft // The consensus mechanism
}
func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine {
return &FsStateMachine{
shardID: shardID,
replicaID: replicaID,
store: make(map[int64]*UserState),
// gdb: initalize.InitMysql("fsreaderwriter:XErSYmLELKMnf3Dh@tcp(fusen.cdmigcvz3rle.us-east-2.rds.amazonaws.com:3306)/fusen"),
}
}
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error) {
s.mu.Lock()
defer s.mu.Unlock()
userid := query.(int64)
if us, ok := s.store[userid]; ok {
return us, nil
}
return nil, nil
}
// Update updates the object using the specified committed raft entry.
func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) {
ctx := context.TODO()
ctx = context.WithValue(ctx, ctxEntry{}, &e)
ctx = context.WithValue(ctx, ctxSM{}, s)
return FsPasser.ExecuteWithBytes(ctx, e.Cmd)
}
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
func (s *FsStateMachine) SaveSnapshot(w io.Writer,
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
// the fc untouched
s.mu.Lock()
defer s.mu.Unlock()
return gob.NewEncoder(w).Encode(&s.store)
}
// RecoverFromSnapshot recovers the state using the provided snapshot.
func (s *FsStateMachine) RecoverFromSnapshot(r io.Reader,
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty
err := gob.NewDecoder(r).Decode(&s.store)
if err != nil {
return err
}
return nil
}
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
func (s *FsStateMachine) Close() error { return nil }