178 lines
3.9 KiB
Go
178 lines
3.9 KiB
Go
package fsm
|
|
|
|
import (
|
|
"context"
|
|
"encoding/gob"
|
|
"fmt"
|
|
"fusenapi/initalize"
|
|
"fusenapi/model/gmodel"
|
|
"fusenapi/utils/auth"
|
|
"io"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/lni/dragonboat/v4"
|
|
sm "github.com/lni/dragonboat/v4/statemachine"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// FsStateMachine is a simple key-value store as an FSM.
|
|
type FsStateMachine struct {
|
|
shardID uint64
|
|
replicaID uint64
|
|
|
|
mu sync.Mutex
|
|
store map[int64]*UserState
|
|
|
|
gdb *gorm.DB
|
|
// waiter *WaitCallback
|
|
// ra *raft.Raft // The consensus mechanism
|
|
}
|
|
|
|
func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine {
|
|
return &FsStateMachine{
|
|
shardID: shardID,
|
|
replicaID: replicaID,
|
|
store: make(map[int64]*UserState),
|
|
gdb: initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"),
|
|
}
|
|
}
|
|
|
|
type SharedState struct {
|
|
shardID uint64
|
|
replicaID uint64
|
|
nh *dragonboat.NodeHost
|
|
}
|
|
|
|
func (fsm *SharedState) GetUserState(Userid int64) (us *UserState, err error) {
|
|
|
|
ius, err := nh.SyncRead(context.TODO(), fsm.shardID, Userid)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return nil, err
|
|
}
|
|
|
|
if ius != nil {
|
|
return ius.(*UserState), nil
|
|
}
|
|
|
|
cmd := &Command{
|
|
Op: OP_Update,
|
|
Key: Userid,
|
|
}
|
|
|
|
cs := nh.GetNoOPSession(128)
|
|
err = cmd.Encode(func(buf []byte) error {
|
|
result, err := nh.SyncPropose(context.TODO(), cs, buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
us = &UserState{}
|
|
err = us.Decode(result.Data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return us, err
|
|
}
|
|
|
|
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
|
|
// we always return the Count value as a little endian binary encoded byte
|
|
// slice.
|
|
func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
userid := query.(int64)
|
|
if us, ok := s.store[userid]; ok {
|
|
return us, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// Update updates the object using the specified committed raft entry.
|
|
func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) {
|
|
|
|
var cmd Command
|
|
err = cmd.Decode(e.Cmd)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
switch cmd.Op {
|
|
case OP_Update:
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if old, ok := s.store[cmd.Key]; ok {
|
|
if time.Since(old.UpdateAt) <= time.Second {
|
|
return
|
|
}
|
|
}
|
|
|
|
// log.Println("update")
|
|
models := gmodel.NewAllModels(s.gdb)
|
|
user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
userState := &UserState{
|
|
UserId: cmd.Key,
|
|
PwdHash: auth.StringToHash(*user.PasswordHash),
|
|
UpdateAt: time.Now(),
|
|
}
|
|
s.store[cmd.Key] = userState
|
|
err = userState.Encode(func(b []byte) error {
|
|
e.Result.Data = b
|
|
result.Data = b
|
|
return nil
|
|
})
|
|
|
|
return result, err
|
|
|
|
default:
|
|
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Op)
|
|
}
|
|
|
|
}
|
|
|
|
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
|
|
// specified io.Writer object.
|
|
func (s *FsStateMachine) SaveSnapshot(w io.Writer,
|
|
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
|
|
// as shown above, the only state that can be saved is the Count variable
|
|
// there is no external file in this IStateMachine example, we thus leave
|
|
// the fc untouched
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
return gob.NewEncoder(w).Encode(&s.store)
|
|
}
|
|
|
|
// RecoverFromSnapshot recovers the state using the provided snapshot.
|
|
func (s *FsStateMachine) RecoverFromSnapshot(r io.Reader,
|
|
files []sm.SnapshotFile,
|
|
done <-chan struct{}) error {
|
|
// restore the Count variable, that is the only state we maintain in this
|
|
// example, the input files is expected to be empty
|
|
|
|
err := gob.NewDecoder(r).Decode(&s.store)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the IStateMachine instance. There is nothing for us to cleanup
|
|
// or release as this is a pure in memory data store. Note that the Close
|
|
// method is not guaranteed to be called as node can crash at any time.
|
|
func (s *FsStateMachine) Close() error { return nil }
|