package fusenrender import ( "context" "flag" "fmt" "log" "os" "os/signal" "path/filepath" "runtime" "syscall" "time" "github.com/474420502/execute" "github.com/lni/dragonboat/v4" "github.com/lni/dragonboat/v4/config" "github.com/lni/dragonboat/v4/logger" ) func main() { } var BatchQueueExecute = execute.NewEventTriggeredExecute[bool]() var Consumption execute.Event var addresses []string = []string{ "localhost:5500", "localhost:5501", "localhost:5502", } func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat.NodeHost { // addr := "localhost" // addr = fmt.Sprintf("%s:%d", addr, port) flag.Parse() if len(addr) == 0 && replicaID != 1 && replicaID != 2 && replicaID != 3 { fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n") os.Exit(1) } // https://github.com/golang/go/issues/17393 if runtime.GOOS == "darwin" { signal.Ignore(syscall.Signal(0xd)) } initialMembers := make(map[uint64]string) // when joining a new node which is not an initial members, the initialMembers // map should be empty. // when restarting a node that is not a member of the initial nodes, you can // leave the initialMembers to be empty. we still populate the initialMembers // here for simplicity. for idx, v := range addresses { // key is the ReplicaID, ReplicaID is not allowed to be 0 // value is the raft address initialMembers[uint64(idx+1)] = v } // for simplicity, in this example program, addresses of all those 3 initial // raft members are hard coded. when address is not specified on the command // line, we assume the node being launched is an initial raft member. var nodeAddr = initialMembers[uint64(replicaID)] fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr) // change the log verbosity logger.GetLogger("raft").SetLevel(logger.ERROR) logger.GetLogger("rsm").SetLevel(logger.WARNING) logger.GetLogger("transport").SetLevel(logger.WARNING) logger.GetLogger("grpc").SetLevel(logger.WARNING) // config for raft node // See GoDoc for all available options rc := config.Config{ // ShardID and ReplicaID of the raft node ReplicaID: uint64(replicaID), ShardID: exampleShardID, ElectionRTT: 10, HeartbeatRTT: 1, CheckQuorum: true, SnapshotEntries: 100, CompactionOverhead: 5, } datadir := filepath.Join( "example-data", "queue-data", fmt.Sprintf("node%d", replicaID)) nhc := config.NodeHostConfig{ WALDir: datadir, // NodeHostDir is where everything else is stored. NodeHostDir: datadir, // RTTMillisecond is the average round trip time between NodeHosts (usually // on two machines/vms), it is in millisecond. Such RTT includes the // processing delays caused by NodeHosts, not just the network delay between // two NodeHost instances. RTTMillisecond: 200, // RaftAddress is used to identify the NodeHost instance RaftAddress: nodeAddr, } nh, err := dragonboat.NewNodeHost(nhc) if err != nil { panic(err) } Consumption = BatchQueueExecute.RegisterExecute(func(*bool) { cs := nh.GetNoOPSession(128) for { cmd := Command{ Name: "dequeue", Group: "test", } data, err := cmd.Encode() if err != nil { log.Println(err) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) result, err := nh.SyncPropose(ctx, cs, data) cancel() if err != nil { log.Println(err) break } else { if len(result.Data) == 0 { log.Println("wait 10 second") var m runtime.MemStats runtime.ReadMemStats(&m) allocMB := float64(m.Alloc) / 1024 / 1024 totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 sysMB := float64(m.Sys) / 1024 / 1024 fmt.Printf("Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", allocMB, totalAllocMB, sysMB) time.Sleep(time.Second * 10) break } else { var item QueueItem err := item.Decode(result.Data) if err != nil { log.Println(err) } log.Println(item) } } } }, nil) if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) os.Exit(1) } return nh }