package fusenrender_test import ( "context" "fusenrender" "log" "testing" "time" "github.com/google/uuid" "github.com/lni/goutils/syncutil" ) func init() { log.SetFlags(log.Llongfile) } func TestStartNodeA(t *testing.T) { svc, err := fusenrender.LoadConfig("etc/etc_a.yaml") if err != nil { panic(err) } nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) raftStopper := syncutil.NewStopper() // ch := make(chan string, 16) raftStopper.RunWorker(func() { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. cs := nh.GetNoOPSession(128) ticker := time.NewTicker(100 * time.Millisecond) for { select { case <-ticker.C: item := &fusenrender.QueueItem{ Group: "test", Priority: uint32(2), CreateAt: time.Now(), Data: uuid.New().String(), } cmd := fusenrender.Command{ Name: "enqueue", Group: "test", Item: item, } data, err := cmd.Encode() if err != nil { log.Println(err) } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) _, err = nh.SyncPropose(ctx, cs, data) if err != nil { log.Println(err) } // log.Println("enqueue", len(result.Data)) cancel() case <-raftStopper.ShouldStop(): return } } }) raftStopper.Wait() } func TestStartNodeB(t *testing.T) { svc, err := fusenrender.LoadConfig("etc/etc_b.yaml") if err != nil { panic(err) } nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) raftStopper := syncutil.NewStopper() // ch := make(chan string, 16) raftStopper.RunWorker(func() { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. cs := nh.GetNoOPSession(128) ticker := time.NewTicker(100 * time.Millisecond) for { select { case <-ticker.C: item := &fusenrender.QueueItem{ Group: "test", Priority: uint32(1), CreateAt: time.Now(), Data: uuid.New().String(), } cmd := fusenrender.Command{ Name: "enqueue", Group: "test", Item: item, } data, err := cmd.Encode() if err != nil { log.Println(err) } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) _, err = nh.SyncPropose(ctx, cs, data) if err != nil { log.Println(err) } else { // log.Println("enqueue", len(result.Data)) } cancel() case <-raftStopper.ShouldStop(): return } } }) raftStopper.Wait() } func TestStartNodeC(t *testing.T) { svc, err := fusenrender.LoadConfig("etc/etc_c.yaml") if err != nil { panic(err) } nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) log.Println(nh) raftStopper := syncutil.NewStopper() raftStopper.RunWorker(func() { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. // cs := nh.GetNoOPSession(128) ticker := time.NewTicker(5 * time.Second) for { select { case <-ticker.C: // for { // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) // cmd := fusenrender.Command{ // Name: "dequeue", // Group: "test", // } // data, err := cmd.Encode() // if err != nil { // log.Println(err) // } // result, err := nh.SyncPropose(ctx, cs, data) // if err != nil { // log.Println(err) // } // log.Println(len(result.Data), string(result.Data)) // cancel() // if len(result.Data) == 0 { // break // } // } case <-raftStopper.ShouldStop(): return } } }) raftStopper.Wait() }