fusenapi/fsm/fsm.go

178 lines
3.9 KiB
Go
Raw Normal View History

2023-07-25 11:32:51 +00:00
package fsm
import (
"context"
"encoding/gob"
"fmt"
2023-07-30 15:17:02 +00:00
"fusenapi/initalize"
2023-07-25 11:32:51 +00:00
"fusenapi/model/gmodel"
"fusenapi/utils/auth"
"io"
"log"
"sync"
"time"
2023-07-30 10:50:27 +00:00
"github.com/lni/dragonboat/v4"
sm "github.com/lni/dragonboat/v4/statemachine"
2023-07-25 11:32:51 +00:00
"gorm.io/gorm"
)
2023-07-30 15:17:02 +00:00
// FsStateMachine is a simple key-value store as an FSM.
type FsStateMachine struct {
2023-07-30 10:50:27 +00:00
shardID uint64
replicaID uint64
2023-07-25 11:32:51 +00:00
2023-07-30 10:50:27 +00:00
mu sync.Mutex
2023-07-25 11:32:51 +00:00
store map[int64]*UserState
2023-07-30 10:50:27 +00:00
gdb *gorm.DB
// waiter *WaitCallback
// ra *raft.Raft // The consensus mechanism
2023-07-25 11:32:51 +00:00
}
2023-07-30 15:17:02 +00:00
func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine {
return &FsStateMachine{
shardID: shardID,
replicaID: replicaID,
store: make(map[int64]*UserState),
gdb: initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"),
}
}
type SharedState struct {
shardID uint64
replicaID uint64
nh *dragonboat.NodeHost
}
2023-08-02 11:08:44 +00:00
func (ss *SharedState) GetUserState(Userid int64) (us *UserState, err error) {
2023-07-25 11:32:51 +00:00
2023-08-02 11:08:44 +00:00
ius, err := ss.nh.SyncRead(context.TODO(), ss.shardID, Userid)
2023-07-30 10:50:27 +00:00
if err != nil {
log.Println(err)
return nil, err
}
if ius != nil {
return ius.(*UserState), nil
2023-07-25 11:32:51 +00:00
}
2023-07-30 10:50:27 +00:00
cmd := &Command{
Op: OP_Update,
2023-07-25 11:32:51 +00:00
Key: Userid,
}
2023-08-02 11:08:44 +00:00
cs := ss.nh.GetNoOPSession(128)
2023-07-30 10:50:27 +00:00
err = cmd.Encode(func(buf []byte) error {
2023-08-02 11:08:44 +00:00
result, err := ss.nh.SyncPropose(context.TODO(), cs, buf)
2023-07-30 10:50:27 +00:00
if err != nil {
return err
}
2023-07-25 11:32:51 +00:00
2023-07-30 10:50:27 +00:00
us = &UserState{}
err = us.Decode(result.Data)
if err != nil {
return err
}
return nil
})
return us, err
}
2023-07-25 11:32:51 +00:00
2023-07-30 10:50:27 +00:00
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
2023-07-30 15:17:02 +00:00
func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error) {
2023-07-30 10:50:27 +00:00
s.mu.Lock()
defer s.mu.Unlock()
userid := query.(int64)
if us, ok := s.store[userid]; ok {
2023-07-25 11:32:51 +00:00
return us, nil
}
2023-07-30 10:50:27 +00:00
return nil, nil
2023-07-25 11:32:51 +00:00
}
2023-07-30 10:50:27 +00:00
// Update updates the object using the specified committed raft entry.
2023-07-30 15:17:02 +00:00
func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) {
2023-07-25 11:32:51 +00:00
2023-07-30 10:50:27 +00:00
var cmd Command
err = cmd.Decode(e.Cmd)
2023-07-25 11:32:51 +00:00
if err != nil {
2023-07-30 10:50:27 +00:00
return result, err
2023-07-25 11:32:51 +00:00
}
switch cmd.Op {
2023-07-30 10:50:27 +00:00
case OP_Update:
s.mu.Lock()
defer s.mu.Unlock()
if old, ok := s.store[cmd.Key]; ok {
if time.Since(old.UpdateAt) <= time.Second {
return
}
}
2023-07-25 11:32:51 +00:00
// log.Println("update")
2023-07-30 10:50:27 +00:00
models := gmodel.NewAllModels(s.gdb)
2023-07-25 11:32:51 +00:00
user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key)
if err != nil {
log.Println(err)
}
userState := &UserState{
2023-07-30 10:50:27 +00:00
UserId: cmd.Key,
PwdHash: auth.StringToHash(*user.PasswordHash),
UpdateAt: time.Now(),
2023-07-25 11:32:51 +00:00
}
2023-07-30 10:50:27 +00:00
s.store[cmd.Key] = userState
err = userState.Encode(func(b []byte) error {
e.Result.Data = b
result.Data = b
return nil
})
return result, err
2023-07-25 11:32:51 +00:00
default:
2023-07-30 10:50:27 +00:00
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Op)
2023-07-25 11:32:51 +00:00
}
}
2023-07-30 10:50:27 +00:00
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
2023-07-30 15:17:02 +00:00
func (s *FsStateMachine) SaveSnapshot(w io.Writer,
2023-07-30 10:50:27 +00:00
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
// the fc untouched
2023-07-25 11:32:51 +00:00
2023-07-30 10:50:27 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2023-07-25 11:32:51 +00:00
2023-07-30 10:50:27 +00:00
return gob.NewEncoder(w).Encode(&s.store)
2023-07-25 11:32:51 +00:00
}
2023-07-30 10:50:27 +00:00
// RecoverFromSnapshot recovers the state using the provided snapshot.
2023-07-30 15:17:02 +00:00
func (s *FsStateMachine) RecoverFromSnapshot(r io.Reader,
2023-07-30 10:50:27 +00:00
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty
err := gob.NewDecoder(r).Decode(&s.store)
if err != nil {
2023-07-25 11:32:51 +00:00
return err
}
return nil
}
2023-07-30 10:50:27 +00:00
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
2023-07-30 15:17:02 +00:00
func (s *FsStateMachine) Close() error { return nil }