package shared import ( "context" "encoding/gob" "io" "sync" sm "github.com/lni/dragonboat/v4/statemachine" "gorm.io/gorm" ) // FsStateMachine is a simple key-value store as an FSM. type FsStateMachine struct { shardID uint64 replicaID uint64 mu sync.Mutex store map[int64]*UserState gdb *gorm.DB // waiter *WaitCallback // ra *raft.Raft // The consensus mechanism } func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine { return &FsStateMachine{ shardID: shardID, replicaID: replicaID, store: make(map[int64]*UserState), // gdb: initalize.InitMysql("fsreaderwriter:XErSYmLELKMnf3Dh@tcp(fusen.cdmigcvz3rle.us-east-2.rds.amazonaws.com:3306)/fusen"), } } // Lookup performs local lookup on the ExampleStateMachine instance. In this example, // we always return the Count value as a little endian binary encoded byte // slice. func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error) { s.mu.Lock() defer s.mu.Unlock() userid := query.(int64) if us, ok := s.store[userid]; ok { return us, nil } return nil, nil } // Update updates the object using the specified committed raft entry. func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) { ctx := context.TODO() ctx = context.WithValue(ctx, ctxEntry{}, &e) ctx = context.WithValue(ctx, ctxSM{}, s) return FsPasser.ExecuteWithBytes(ctx, e.Cmd) } // SaveSnapshot saves the current IStateMachine state into a snapshot using the // specified io.Writer object. func (s *FsStateMachine) SaveSnapshot(w io.Writer, fc sm.ISnapshotFileCollection, done <-chan struct{}) error { // as shown above, the only state that can be saved is the Count variable // there is no external file in this IStateMachine example, we thus leave // the fc untouched s.mu.Lock() defer s.mu.Unlock() return gob.NewEncoder(w).Encode(&s.store) } // RecoverFromSnapshot recovers the state using the provided snapshot. func (s *FsStateMachine) RecoverFromSnapshot(r io.Reader, files []sm.SnapshotFile, done <-chan struct{}) error { // restore the Count variable, that is the only state we maintain in this // example, the input files is expected to be empty err := gob.NewDecoder(r).Decode(&s.store) if err != nil { return err } return nil } // Close closes the IStateMachine instance. There is nothing for us to cleanup // or release as this is a pure in memory data store. Note that the Close // method is not guaranteed to be called as node can crash at any time. func (s *FsStateMachine) Close() error { return nil }