This commit is contained in:
eson 2023-07-30 23:17:02 +08:00
parent 049aedf115
commit 7fd48edef8
19 changed files with 58 additions and 44 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/gob"
"fmt"
"fusenapi/initalize"
"fusenapi/model/gmodel"
"fusenapi/utils/auth"
"io"
@ -16,11 +17,10 @@ import (
"gorm.io/gorm"
)
// StateCluster is a simple key-value store as an FSM.
type StateCluster struct {
// FsStateMachine is a simple key-value store as an FSM.
type FsStateMachine struct {
shardID uint64
replicaID uint64
nh *dragonboat.NodeHost
mu sync.Mutex
store map[int64]*UserState
@ -30,9 +30,24 @@ type StateCluster struct {
// ra *raft.Raft // The consensus mechanism
}
func (fsm *StateCluster) GetUserState(Userid int64) (us *UserState, err error) {
func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine {
return &FsStateMachine{
shardID: shardID,
replicaID: replicaID,
store: make(map[int64]*UserState),
gdb: initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"),
}
}
ius, err := fsm.nh.SyncRead(context.TODO(), fsm.shardID, Userid)
type SharedState struct {
shardID uint64
replicaID uint64
nh *dragonboat.NodeHost
}
func (fsm *SharedState) GetUserState(Userid int64) (us *UserState, err error) {
ius, err := nh.SyncRead(context.TODO(), fsm.shardID, Userid)
if err != nil {
log.Println(err)
return nil, err
@ -47,9 +62,9 @@ func (fsm *StateCluster) GetUserState(Userid int64) (us *UserState, err error) {
Key: Userid,
}
cs := fsm.nh.GetNoOPSession(128)
cs := nh.GetNoOPSession(128)
err = cmd.Encode(func(buf []byte) error {
result, err := fsm.nh.SyncPropose(context.TODO(), cs, buf)
result, err := nh.SyncPropose(context.TODO(), cs, buf)
if err != nil {
return err
}
@ -69,7 +84,7 @@ func (fsm *StateCluster) GetUserState(Userid int64) (us *UserState, err error) {
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
func (s *StateCluster) Lookup(query interface{}) (item interface{}, err error) {
func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error) {
s.mu.Lock()
defer s.mu.Unlock()
@ -82,7 +97,7 @@ func (s *StateCluster) Lookup(query interface{}) (item interface{}, err error) {
}
// Update updates the object using the specified committed raft entry.
func (s *StateCluster) Update(e sm.Entry) (result sm.Result, err error) {
func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) {
var cmd Command
err = cmd.Decode(e.Cmd)
@ -129,7 +144,7 @@ func (s *StateCluster) Update(e sm.Entry) (result sm.Result, err error) {
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
func (s *StateCluster) SaveSnapshot(w io.Writer,
func (s *FsStateMachine) SaveSnapshot(w io.Writer,
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
@ -142,7 +157,7 @@ func (s *StateCluster) SaveSnapshot(w io.Writer,
}
// RecoverFromSnapshot recovers the state using the provided snapshot.
func (s *StateCluster) RecoverFromSnapshot(r io.Reader,
func (s *FsStateMachine) RecoverFromSnapshot(r io.Reader,
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
@ -159,4 +174,4 @@ func (s *StateCluster) RecoverFromSnapshot(r io.Reader,
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
func (s *StateCluster) Close() error { return nil }
func (s *FsStateMachine) Close() error { return nil }

View File

@ -5,7 +5,6 @@ import (
"encoding/gob"
"flag"
"fmt"
"fusenapi/initalize"
"fusenapi/utils/autoconfig"
"log"
"net"
@ -17,26 +16,26 @@ import (
"time"
"github.com/hashicorp/raft"
"github.com/lni/dragonboat"
"github.com/lni/dragonboat/config"
"github.com/lni/dragonboat/logger"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/config"
"github.com/lni/dragonboat/v4/logger"
"gorm.io/gorm"
)
func test1() {
log.SetFlags(log.Llongfile)
// log.SetFlags(log.Llongfile)
fsm := StartNode("fs1", "localhost:5500", nil, initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"))
// fsm := StartNode("fs1", "localhost:5500", nil, initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"))
time.Sleep(time.Second * 5)
// time.Sleep(time.Second * 5)
for i := 0; i < 30; i++ {
go log.Println(fsm.GetUserState(39))
}
// for i := 0; i < 30; i++ {
// go log.Println(fsm.GetUserState(39))
// }
log.Println(fsm.GetUserState(39))
// log.Println(fsm.GetUserState(39))
select {}
// select {}
}
var addresses []string = []string{
@ -125,7 +124,7 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string, gdb *gorm.D
panic(err)
}
if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil {
if err := nh.StartReplica(initialMembers, false, New, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
os.Exit(1)
}
@ -135,9 +134,9 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string, gdb *gorm.D
}
// StartNode 启动节点
func StartNode1(ServerID string, RaftBind string, serverconfigs []*autoconfig.ConfigServer, gdb *gorm.DB) *StateCluster {
func StartNode1(ServerID string, RaftBind string, serverconfigs []*autoconfig.ConfigServer, gdb *gorm.DB) *FsStateMachine {
fsm := &StateCluster{
fsm := &FsStateMachine{
store: make(map[int64]*UserState),
gdb: gdb,

View File

@ -18,7 +18,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -16,7 +16,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -19,7 +19,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -17,7 +17,7 @@ import (
type ServiceContext struct {
Config config.Config
SharedState *fsm.StateCluster
SharedState *fsm.FsStateMachine
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen

View File

@ -51,7 +51,7 @@ func NormalAfterLogic(w http.ResponseWriter, r *http.Request, resp *Response) {
}
}
func RequestParse(w http.ResponseWriter, r *http.Request, state *fsm.StateCluster, LogicRequest any) (*auth.UserInfo, error) {
func RequestParse(w http.ResponseWriter, r *http.Request, state *fsm.FsStateMachine, LogicRequest any) (*auth.UserInfo, error) {
token, info, err := auth.ParseJwtTokenHeader[auth.UserInfo](r) //解析Token头, 和payload信息
if err != nil {