package shared import ( "bytes" "encoding/gob" "fmt" "fusenapi/utils/autoconfig" "log" "os" "os/signal" "path/filepath" "runtime" "syscall" "time" "github.com/hashicorp/raft" "github.com/lni/dragonboat/v4" "github.com/lni/dragonboat/v4/config" "github.com/lni/dragonboat/v4/logger" "gorm.io/gorm" ) func test1() { // log.SetFlags(log.Llongfile) // fsm := StartNode("fs1", "localhost:5500", nil, initalize.InitMysql("fsreaderwriter:XErSYmLELKMnf3Dh@tcp(fusen.cdmigcvz3rle.us-east-2.rds.amazonaws.com:3306)/fusen")) // time.Sleep(time.Second * 5) // for i := 0; i < 30; i++ { // go log.Println(fsm.GetUserState(39)) // } // log.Println(fsm.GetUserState(39)) // select {} } var addresses []string = []string{ "localhost:5500", "localhost:5501", "localhost:5502", } var shardID uint64 = 128 func StartNode(ServerID uint64, serverconfigs []*autoconfig.ConfigServer, gdb *gorm.DB) *SharedState { // addr := "localhost" // addr = fmt.Sprintf("%s:%d", addr, port) // https://github.com/golang/go/issues/17393 if runtime.GOOS == "darwin" { signal.Ignore(syscall.Signal(0xd)) } initialMembers := make(map[uint64]string) // when joining a new node which is not an initial members, the initialMembers // map should be empty. // when restarting a node that is not a member of the initial nodes, you can // leave the initialMembers to be empty. we still populate the initialMembers // here for simplicity. for _, v := range serverconfigs { // key is the ReplicaID, ReplicaID is not allowed to be 0 // value is the raft address initialMembers[v.ReplicaId] = fmt.Sprintf("%s:%d", v.Host, v.Port-2000) } // for simplicity, in this example program, addresses of all those 3 initial // raft members are hard coded. when address is not specified on the command // line, we assume the node being launched is an initial raft member. var nodeAddr = initialMembers[ServerID] fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr) // change the log verbosity logger.GetLogger("raft").SetLevel(logger.ERROR) logger.GetLogger("rsm").SetLevel(logger.WARNING) logger.GetLogger("transport").SetLevel(logger.WARNING) logger.GetLogger("grpc").SetLevel(logger.WARNING) // config for raft node // See GoDoc for all available options rc := config.Config{ // ShardID and ReplicaID of the raft node ReplicaID: uint64(ServerID), ShardID: shardID, ElectionRTT: 10, HeartbeatRTT: 1, CheckQuorum: true, SnapshotEntries: 10, CompactionOverhead: 5, } datadir := filepath.Join( "shared-state", fmt.Sprintf("node%d", ServerID)) nhc := config.NodeHostConfig{ WALDir: datadir, // NodeHostDir is where everything else is stored. NodeHostDir: datadir, // RTTMillisecond is the average round trip time between NodeHosts (usually // on two machines/vms), it is in millisecond. Such RTT includes the // processing delays caused by NodeHosts, not just the network delay between // two NodeHost instances. RTTMillisecond: 200, // RaftAddress is used to identify the NodeHost instance RaftAddress: nodeAddr, } nh, err := dragonboat.NewNodeHost(nhc) if err != nil { panic(err) } if err := nh.StartReplica(initialMembers, true, NewFsStateMachine, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) os.Exit(1) } ss := &SharedState{ shardID: shardID, replicaID: ServerID, nh: nh, } return ss } func StartNodeEx(ServerID uint64, serverconfigs []*autoconfig.ConfigServer, gdb *gorm.DB) *SharedState { // addr := "localhost" // addr = fmt.Sprintf("%s:%d", addr, port) // https://github.com/golang/go/issues/17393 if runtime.GOOS == "darwin" { signal.Ignore(syscall.Signal(0xd)) } initialMembers := make(map[uint64]string) // when joining a new node which is not an initial members, the initialMembers // map should be empty. // when restarting a node that is not a member of the initial nodes, you can // leave the initialMembers to be empty. we still populate the initialMembers // here for simplicity. for _, v := range serverconfigs { // key is the ReplicaID, ReplicaID is not allowed to be 0 // value is the raft address initialMembers[v.ReplicaId] = fmt.Sprintf("%s:%d", v.Host, v.Port-2000) } // for simplicity, in this example program, addresses of all those 3 initial // raft members are hard coded. when address is not specified on the command // line, we assume the node being launched is an initial raft member. var nodeAddr = initialMembers[ServerID] fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr) // change the log verbosity logger.GetLogger("raft").SetLevel(logger.ERROR) logger.GetLogger("rsm").SetLevel(logger.WARNING) logger.GetLogger("transport").SetLevel(logger.WARNING) logger.GetLogger("grpc").SetLevel(logger.WARNING) // config for raft node // See GoDoc for all available options rc := config.Config{ // ShardID and ReplicaID of the raft node ReplicaID: uint64(ServerID), ShardID: shardID, ElectionRTT: 10, HeartbeatRTT: 1, CheckQuorum: true, SnapshotEntries: 10, CompactionOverhead: 5, } datadir := filepath.Join( "shared-state", fmt.Sprintf("node%d", ServerID)) nhc := config.NodeHostConfig{ WALDir: datadir, // NodeHostDir is where everything else is stored. NodeHostDir: datadir, // RTTMillisecond is the average round trip time between NodeHosts (usually // on two machines/vms), it is in millisecond. Such RTT includes the // processing delays caused by NodeHosts, not just the network delay between // two NodeHost instances. RTTMillisecond: 200, // RaftAddress is used to identify the NodeHost instance RaftAddress: nodeAddr, } nh, err := dragonboat.NewNodeHost(nhc) if err != nil { panic(err) } if err := nh.StartReplica(initialMembers, true, NewFsStateMachine, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) os.Exit(1) } ss := &SharedState{ shardID: shardID, replicaID: ServerID, nh: nh, } return ss } // func JoinCluster(ServerID string, LeaderAddress string, RaftBind string, gdb *gorm.DB) *StateCluster { // fsm := StartNode(ServerID, RaftBind, gdb) // configFuture := fsm.ra.GetConfiguration() // if err := configFuture.Error(); err != nil { // log.Fatalf("failed to get raft configuration: %v", err) // } // for _, srv := range configFuture.Configuration().Servers { // if srv.ID == raft.ServerID(ServerID) && srv.Address == raft.ServerAddress(LeaderAddress) { // if future := fsm.ra.RemoveServer(srv.ID, 0, 0); future.Error() != nil { // log.Fatalf("Error removing existing server [%s]: %v", ServerID, future.Error()) // } // } // } // f := fsm.ra.AddVoter(raft.ServerID(ServerID), raft.ServerAddress(RaftBind), 0, 0) // if f.Error() != nil { // log.Fatalf("Error adding voter: %v", f.Error()) // } // return fsm // } func waitForCluster(ra *raft.Raft) { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for range ticker.C { state := ra.State() if state == raft.Leader || state == raft.Follower { log.Println("Raft cluster is running") return } else { log.Println("Still waiting for the cluster to start...") } } } // var gdb *gorm.DB = initalize.InitMysql("fsreaderwriter:XErSYmLELKMnf3Dh@tcp(fusen.cdmigcvz3rle.us-east-2.rds.amazonaws.com:3306)/fusen") type UserState struct { UpdateAt time.Time UserId int64 PwdHash uint64 } func (us *UserState) Encode(do func([]byte) error) error { var buf bytes.Buffer err := gob.NewEncoder(&buf).Encode(us) if err != nil { return err } if do != nil { err := do(buf.Bytes()) if err != nil { return err } } return nil } func (us *UserState) Decode(data []byte) error { buf := bytes.NewBuffer(data) err := gob.NewDecoder(buf).Decode(us) if err != nil { return err } return nil }