fusenapi/fsm/fsm.go
2023-07-25 19:32:51 +08:00

136 lines
2.8 KiB
Go

package fsm
import (
"context"
"encoding/gob"
"fmt"
"fusenapi/model/gmodel"
"fusenapi/utils/auth"
"io"
"log"
"sync"
"time"
"github.com/hashicorp/raft"
"gorm.io/gorm"
)
// StateCluster is a simple key-value store as an FSM.
type StateCluster struct {
mu sync.Mutex
store map[int64]*UserState
gdb *gorm.DB
waiter *WaitCallback
ra *raft.Raft // The consensus mechanism
}
func (fsm *StateCluster) GetUserState(Userid int64) (*UserState, error) {
fsm.mu.Lock()
if us, ok := fsm.store[Userid]; ok {
// log.Println("exists")
fsm.mu.Unlock()
return us, nil
}
fsm.mu.Unlock()
// log.Println("apply")
cmd := &command{
Op: "update",
Key: Userid,
}
future := fsm.ra.Apply(cmd.Encode(), time.Minute)
// log.Println(future.Index())
if future.Response() != nil {
return nil, future.Response().(error)
}
if us := fsm.waiter.Wait(Userid, time.Second*5); us != nil {
return us, nil
} else {
return nil, fmt.Errorf("timeout")
}
}
// Apply applies a Raft log entry to the key-value store.
func (fsm *StateCluster) Apply(fsmlog *raft.Log) interface{} {
var cmd command
err := cmd.Decode(fsmlog.Data)
if err != nil {
return err
}
switch cmd.Op {
case "update":
fsm.mu.Lock()
defer fsm.mu.Unlock()
// log.Println("update")
models := gmodel.NewAllModels(fsm.gdb)
user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key)
if err != nil {
log.Println(err)
}
userState := &UserState{
UserId: cmd.Key,
PwdHash: auth.StringToHash(*user.PasswordHash),
Expired: time.Now(),
}
fsm.store[cmd.Key] = userState
fsm.waiter.Done(userState)
default:
return fmt.Errorf("unrecognized command operation: %s", cmd.Op)
}
return nil
}
// Snapshot returns a snapshot of the key-value store.
func (fsm *StateCluster) Snapshot() (raft.FSMSnapshot, error) {
fsm.mu.Lock()
defer fsm.mu.Unlock()
snapshot := kvStoreSnapshot{
store: make(map[int64]*UserState),
}
for k, v := range fsm.store {
snapshot.store[k] = v
}
return &snapshot, nil
}
// Restore stores the key-value store to a previous state.
func (fsm *StateCluster) Restore(rc io.ReadCloser) error {
var snapshot kvStoreSnapshot
dec := gob.NewDecoder(rc)
if err := dec.Decode(&snapshot); err != nil {
return err
}
fsm.store = snapshot.store
return nil
}
// kvStoreSnapshot represents a snapshot of the key-value store.
type kvStoreSnapshot struct {
store map[int64]*UserState
}
// Persist writes the snapshot to the provided sink.
func (snapshot *kvStoreSnapshot) Persist(sink raft.SnapshotSink) error {
// The example has been simplified. In a production environment, you would
// need to handle this operation with more care.
return nil
}
// Release is invoked when the Raft instance is finished with the snapshot.
func (snapshot *kvStoreSnapshot) Release() {
// Normally you would put any cleanup here.
}