From d5d39c4f3a855de38e2dae5af75182825fc4ab2f Mon Sep 17 00:00:00 2001 From: eson <474420502@qq.com> Date: Sun, 30 Jul 2023 06:50:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E9=80=9A=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 +- config.go | 2 +- etc/etc_a.yaml | 2 +- etc/etc_b.yaml | 2 +- etc/etc_c.yaml | 2 +- main.go | 268 +++++++++++---------------- priority_queue.go | 49 ++++- priority_queue_test.go | 59 +++--- sm.go | 411 ++++++++--------------------------------- start_test.go | 227 ++++++++++++++++------- 10 files changed, 419 insertions(+), 607 deletions(-) diff --git a/.gitignore b/.gitignore index ff7ba64..460972d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ debug_test* __debug_* -fusenrender \ No newline at end of file +fusenrender + +example-data \ No newline at end of file diff --git a/config.go b/config.go index 1352fe2..d3c12dd 100644 --- a/config.go +++ b/config.go @@ -11,7 +11,7 @@ import ( ) type ConfigServer struct { - ServerID string `yaml:"serverid"` + ServerID uint64 `yaml:"serverid"` Host string `yaml:"host"` Port int `yaml:"port"` } diff --git a/etc/etc_a.yaml b/etc/etc_a.yaml index d36bbd2..cbf1282 100644 --- a/etc/etc_a.yaml +++ b/etc/etc_a.yaml @@ -1,3 +1,3 @@ -serverid: "a" +serverid: 1 host: "localhost" port: 5050 diff --git a/etc/etc_b.yaml b/etc/etc_b.yaml index 75cce8a..ba66ec4 100644 --- a/etc/etc_b.yaml +++ b/etc/etc_b.yaml @@ -1,3 +1,3 @@ -serverid: "b" +serverid: 2 host: "localhost" port: 5051 diff --git a/etc/etc_c.yaml b/etc/etc_c.yaml index f128bb8..f481929 100644 --- a/etc/etc_c.yaml +++ b/etc/etc_c.yaml @@ -1,3 +1,3 @@ -serverid: "c" +serverid: 3 host: "localhost" port: 5052 diff --git a/main.go b/main.go index 74354ae..0be6676 100644 --- a/main.go +++ b/main.go @@ -1,173 +1,115 @@ package fusenrender +import ( + "flag" + "fmt" + "os" + "os/signal" + "path/filepath" + "runtime" + "syscall" + + "github.com/lni/dragonboat/v4" + "github.com/lni/dragonboat/v4/config" + "github.com/lni/dragonboat/v4/logger" +) + func main() { - // replicaID := 1 + +} + +var addresses []string = []string{ + "localhost:5500", + "localhost:5501", + "localhost:5502", +} + +func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat.NodeHost { + // addr := "localhost" - // port := 5050 - // flag.Parse() - // if len(addr) == 0 && replicaID != 1 && replicaID != 2 && replicaID != 3 { - // fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n") - // os.Exit(1) - // } - // // https://github.com/golang/go/issues/17393 - // if runtime.GOOS == "darwin" { - // signal.Ignore(syscall.Signal(0xd)) - // } - // initialMembers := make(map[uint64]string) - // // when joining a new node which is not an initial members, the initialMembers - // // map should be empty. - // // when restarting a node that is not a member of the initial nodes, you can - // // leave the initialMembers to be empty. we still populate the initialMembers - // // here for simplicity. - // // if join { - // // for idx, v := range addresses { - // // // key is the ReplicaID, ReplicaID is not allowed to be 0 - // // // value is the raft address - // // initialMembers[uint64(idx+1)] = v - // // } - // // } - // var nodeAddr string - // // for simplicity, in this example program, addresses of all those 3 initial - // // raft members are hard coded. when address is not specified on the command - // // line, we assume the node being launched is an initial raft member. + // addr = fmt.Sprintf("%s:%d", addr, port) - // nodeAddr = initialMembers[uint64(replicaID)] + flag.Parse() + if len(addr) == 0 && replicaID != 1 && replicaID != 2 && replicaID != 3 { + fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n") + os.Exit(1) + } + // https://github.com/golang/go/issues/17393 + if runtime.GOOS == "darwin" { + signal.Ignore(syscall.Signal(0xd)) + } + initialMembers := make(map[uint64]string) + + // when joining a new node which is not an initial members, the initialMembers + // map should be empty. + // when restarting a node that is not a member of the initial nodes, you can + // leave the initialMembers to be empty. we still populate the initialMembers + // here for simplicity. + + for idx, v := range addresses { + // key is the ReplicaID, ReplicaID is not allowed to be 0 + // value is the raft address + initialMembers[uint64(idx+1)] = v + } + + // for simplicity, in this example program, addresses of all those 3 initial + // raft members are hard coded. when address is not specified on the command + // line, we assume the node being launched is an initial raft member. + + var nodeAddr = initialMembers[uint64(replicaID)] + + fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr) + // change the log verbosity + logger.GetLogger("raft").SetLevel(logger.ERROR) + logger.GetLogger("rsm").SetLevel(logger.WARNING) + logger.GetLogger("transport").SetLevel(logger.WARNING) + logger.GetLogger("grpc").SetLevel(logger.WARNING) + // config for raft node + // See GoDoc for all available options + rc := config.Config{ + // ShardID and ReplicaID of the raft node + ReplicaID: uint64(replicaID), + ShardID: exampleShardID, + + ElectionRTT: 10, + + HeartbeatRTT: 1, + CheckQuorum: true, + + SnapshotEntries: 10, + + CompactionOverhead: 5, + } + datadir := filepath.Join( + "example-data", + "queue-data", + fmt.Sprintf("node%d", replicaID)) + + nhc := config.NodeHostConfig{ + + WALDir: datadir, + // NodeHostDir is where everything else is stored. + NodeHostDir: datadir, + // RTTMillisecond is the average round trip time between NodeHosts (usually + // on two machines/vms), it is in millisecond. Such RTT includes the + // processing delays caused by NodeHosts, not just the network delay between + // two NodeHost instances. + RTTMillisecond: 200, + // RaftAddress is used to identify the NodeHost instance + RaftAddress: nodeAddr, + } + nh, err := dragonboat.NewNodeHost(nhc) + if err != nil { + panic(err) + } + if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { + fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) + os.Exit(1) + } + + return nh - // fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr) - // // change the log verbosity - // logger.GetLogger("raft").SetLevel(logger.ERROR) - // logger.GetLogger("rsm").SetLevel(logger.WARNING) - // logger.GetLogger("transport").SetLevel(logger.WARNING) - // logger.GetLogger("grpc").SetLevel(logger.WARNING) - // // config for raft node - // // See GoDoc for all available options - // rc := config.Config{ - // // ShardID and ReplicaID of the raft node - // ReplicaID: uint64(replicaID), - // ShardID: 128, - // // In this example, we assume the end-to-end round trip time (RTT) between - // // NodeHost instances (on different machines, VMs or containers) are 200 - // // millisecond, it is set in the RTTMillisecond field of the - // // config.NodeHostConfig instance below. - // // ElectionRTT is set to 10 in this example, it determines that the node - // // should start an election if there is no heartbeat from the leader for - // // 10 * RTT time intervals. - // ElectionRTT: 10, - // // HeartbeatRTT is set to 1 in this example, it determines that when the - // // node is a leader, it should broadcast heartbeat messages to its followers - // // every such 1 * RTT time interval. - // HeartbeatRTT: 1, - // CheckQuorum: true, - // // SnapshotEntries determines how often should we take a snapshot of the - // // replicated state machine, it is set to 10 her which means a snapshot - // // will be captured for every 10 applied proposals (writes). - // // In your real world application, it should be set to much higher values - // // You need to determine a suitable value based on how much space you are - // // willing use on Raft Logs, how fast can you capture a snapshot of your - // // replicated state machine, how often such snapshot is going to be used - // // etc. - // SnapshotEntries: 10, - // // Once a snapshot is captured and saved, how many Raft entries already - // // covered by the new snapshot should be kept. This is useful when some - // // followers are just a little bit left behind, with such overhead Raft - // // entries, the leaders can send them regular entries rather than the full - // // snapshot image. - // CompactionOverhead: 5, - // } - // datadir := filepath.Join( - // "example-data", - // "helloworld-data", - // fmt.Sprintf("node%d", replicaID)) - // // config for the nodehost - // // See GoDoc for all available options - // // by default, insecure transport is used, you can choose to use Mutual TLS - // // Authentication to authenticate both servers and clients. To use Mutual - // // TLS Authentication, set the MutualTLS field in NodeHostConfig to true, set - // // the CAFile, CertFile and KeyFile fields to point to the path of your CA - // // file, certificate and key files. - // nhc := config.NodeHostConfig{ - // // WALDir is the directory to store the WAL of all Raft Logs. It is - // // recommended to use Enterprise SSDs with good fsync() performance - // // to get the best performance. A few SSDs we tested or known to work very - // // well - // // Recommended SATA SSDs - - // // Intel S3700, Intel S3710, Micron 500DC - // // Other SATA enterprise class SSDs with power loss protection - // // Recommended NVME SSDs - - // // Most enterprise NVME currently available on the market. - // // SSD to avoid - - // // Consumer class SSDs, no matter whether they are SATA or NVME based, as - // // they usually have very poor fsync() performance. - // // - // // You can use the pg_test_fsync tool shipped with PostgreSQL to test the - // // fsync performance of your WAL disk. It is recommended to use SSDs with - // // fsync latency of well below 1 millisecond. - // // - // // Note that this is only for storing the WAL of Raft Logs, it is size is - // // usually pretty small, 64GB per NodeHost is usually more than enough. - // // - // // If you just have one disk in your system, just set WALDir and NodeHostDir - // // to the same location. - // WALDir: datadir, - // // NodeHostDir is where everything else is stored. - // NodeHostDir: datadir, - // // RTTMillisecond is the average round trip time between NodeHosts (usually - // // on two machines/vms), it is in millisecond. Such RTT includes the - // // processing delays caused by NodeHosts, not just the network delay between - // // two NodeHost instances. - // RTTMillisecond: 200, - // // RaftAddress is used to identify the NodeHost instance - // RaftAddress: nodeAddr, - // } - // nh, err := dragonboat.NewNodeHost(nhc) - // if err != nil { - // panic(err) - // } - // if err := nh.StartReplica(initialMembers, true, NewSMQueue, rc); err != nil { - // fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) - // os.Exit(1) - // } - // raftStopper := syncutil.NewStopper() - // consoleStopper := syncutil.NewStopper() - // ch := make(chan string, 16) - // consoleStopper.RunWorker(func() { - // reader := bufio.NewReader(os.Stdin) - // for { - // s, err := reader.ReadString('\n') - // if err != nil { - // close(ch) - // return - // } - // if s == "exit\n" { - // raftStopper.Stop() - // // no data will be lost/corrupted if nodehost.Stop() is not called - // nh.Close() - // return - // } - // ch <- s - // } - // }) - // raftStopper.RunWorker(func() { - // // this goroutine makes a linearizable read every 10 second. it returns the - // // Count value maintained in IStateMachine. see datastore.go for details. - // ticker := time.NewTicker(10 * time.Second) - // for { - // select { - // case <-ticker.C: - // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - // result, err := nh.SyncRead(ctx, exampleShardID, []byte{}) - // cancel() - // if err == nil { - // var count uint64 - // count = binary.LittleEndian.Uint64(result.([]byte)) - // fmt.Fprintf(os.Stdout, "count: %d\n", count) - // } - // case <-raftStopper.ShouldStop(): - // return - // } - // } - // }) // raftStopper.RunWorker(func() { // // use a NO-OP client session here // // check the example in godoc to see how to use a regular client session diff --git a/priority_queue.go b/priority_queue.go index e63c1cb..8b6e5c6 100644 --- a/priority_queue.go +++ b/priority_queue.go @@ -1,39 +1,72 @@ package fusenrender -import "bytes" +import ( + "bytes" + "container/heap" + "sync" +) // An Slice is something we manage in a priority queue. type Slice[T any] struct { Key []byte // 键 - Value T // 值 + Value *T // 值 // 索引是在容器中的位置 index int } +type PriorityQueue[T any] struct { + Queue HeapSlice[T] + mu sync.Mutex +} + +func (pq *PriorityQueue[T]) Push(x *Slice[T]) { + pq.mu.Lock() + defer pq.mu.Unlock() + + heap.Push(&pq.Queue, x) +} + +func (pq *PriorityQueue[T]) Pop() *T { + pq.mu.Lock() + defer pq.mu.Unlock() + + if pq.Queue.Len() == 0 { + return nil + } + + return heap.Pop(&pq.Queue).(*T) +} + +func NewPriorityQueue[T any]() *PriorityQueue[T] { + return &PriorityQueue[T]{ + Queue: make(HeapSlice[T], 0), + } +} + // A PriorityQueue implements heap.Interface and holds Items. -type PriorityQueue[T any] []*Slice[T] +type HeapSlice[T any] []*Slice[T] -func (pq PriorityQueue[T]) Len() int { return len(pq) } +func (pq HeapSlice[T]) Len() int { return len(pq) } -func (pq PriorityQueue[T]) Less(i, j int) bool { +func (pq HeapSlice[T]) Less(i, j int) bool { // 使用字节数组键比较 return bytes.Compare(pq[i].Key, pq[j].Key) < 0 } -func (pq PriorityQueue[T]) Swap(i, j int) { +func (pq HeapSlice[T]) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } -func (pq *PriorityQueue[T]) Push(x any) { +func (pq *HeapSlice[T]) Push(x any) { n := len(*pq) item := x.(*Slice[T]) item.index = n *pq = append(*pq, item) } -func (pq *PriorityQueue[T]) Pop() any { +func (pq *HeapSlice[T]) Pop() any { old := *pq n := len(old) item := old[n-1] diff --git a/priority_queue_test.go b/priority_queue_test.go index d46e7d3..62eb7f3 100644 --- a/priority_queue_test.go +++ b/priority_queue_test.go @@ -1,48 +1,45 @@ package fusenrender_test import ( - "container/heap" - "fmt" - "fusenrender" "testing" ) func TestQueue(t *testing.T) { - q := make(fusenrender.PriorityQueue[*fusenrender.QueueItem], 0) - heap.Init(&q) + // q := make(fusenrender.PriorityQueue[*fusenrender.QueueItem], 0) + // heap.Init(&q) - // 添加几个item - qitema := &fusenrender.QueueItem{Group: "testa", Priority: 2} - a := &fusenrender.Slice[*fusenrender.QueueItem]{ - Key: qitema.GetKey(), - Value: qitema, - } + // // 添加几个item + // qitema := &fusenrender.QueueItem{Group: "testa", Priority: 2} + // a := &fusenrender.Slice[*fusenrender.QueueItem]{ + // Key: qitema.GetKey(), + // Value: qitema, + // } - qitemb := &fusenrender.QueueItem{Group: "testa", Priority: 4} - b := &fusenrender.Slice[*fusenrender.QueueItem]{ - Key: qitemb.GetKey(), - Value: qitemb, - } + // qitemb := &fusenrender.QueueItem{Group: "testa", Priority: 4} + // b := &fusenrender.Slice[*fusenrender.QueueItem]{ + // Key: qitemb.GetKey(), + // Value: qitemb, + // } - qitemc := &fusenrender.QueueItem{Group: "testb", Priority: 3} - c := &fusenrender.Slice[*fusenrender.QueueItem]{ - Key: qitemc.GetKey(), - Value: qitemc, - } + // qitemc := &fusenrender.QueueItem{Group: "testb", Priority: 3} + // c := &fusenrender.Slice[*fusenrender.QueueItem]{ + // Key: qitemc.GetKey(), + // Value: qitemc, + // } - heap.Push(&q, a) + // heap.Push(&q, a) - heap.Push(&q, a) + // heap.Push(&q, a) - heap.Push(&q, a) - heap.Push(&q, b) - heap.Push(&q, c) + // heap.Push(&q, a) + // heap.Push(&q, b) + // heap.Push(&q, c) - // 取出最高优先级的item - for len(q) != 0 { - item := heap.Pop(&q).(*fusenrender.QueueItem) - fmt.Printf("%v\n", item) - } + // // 取出最高优先级的item + // for len(q) != 0 { + // item := heap.Pop(&q).(*fusenrender.QueueItem) + // fmt.Printf("%v\n", item) + // } // 更新某一项的优先级 // banana := pq[1] // 拿到banana项 diff --git a/sm.go b/sm.go index 2da2f56..465ba57 100644 --- a/sm.go +++ b/sm.go @@ -1,12 +1,14 @@ package fusenrender import ( + "encoding/gob" "encoding/json" "fmt" + "io" "log" + "sync" "time" - "github.com/dgraph-io/badger/v3" sm "github.com/lni/dragonboat/v4/statemachine" ) @@ -14,15 +16,23 @@ type SMQueue struct { shardID uint64 replicaID uint64 - db *badger.DB - queue PriorityQueue[QueueItem] + mu sync.Mutex + queues map[string]*PriorityQueue[QueueItem] } type Command struct { - Name string `json:"name"` + Name string `json:"name"` + Group string `json:"group"` - Group *string `json:"group"` - Item *QueueItem `json:"item"` + Item *QueueItem `json:"item"` +} + +func (cmd *Command) Encode() ([]byte, error) { + val, err := json.Marshal(cmd) + if err != nil { + return nil, err + } + return val, nil } type QueueItem struct { @@ -46,16 +56,12 @@ func (item *QueueItem) GetKey() []byte { // // NewSMQueue creates and return a new ExampleStateMachine object. func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { - // opts := badger.DefaultOptions(datapath) - // db, err := badger.Open(opts) - // if err != nil { - // panic(err) - // } return &SMQueue{ shardID: shardID, replicaID: replicaID, - // db: db, + + queues: make(map[string]*PriorityQueue[QueueItem]), } } @@ -67,10 +73,6 @@ func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) { return item, nil } -// func (q *SMQueue) GetKey(item *QueueItem) []byte { -// return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix())) -// } - // Update updates the object using the specified committed raft entry. func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { @@ -87,35 +89,49 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { if err != nil { return result, err } + key := cmd.Item.GetKey() - err = s.db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(s.GetKey(cmd.Item)), d) + s.mu.Lock() + var queue *PriorityQueue[QueueItem] + var ok bool + if queue, ok = s.queues[cmd.Group]; !ok { + queue = NewPriorityQueue[QueueItem]() + s.queues[cmd.Group] = queue + } + defer s.mu.Unlock() + + queue.Push(&Slice[QueueItem]{ + Key: key, + Value: cmd.Item, }) - return sm.Result{Value: uint64(len(d))}, err + result.Value = uint64(len(d)) + + log.Printf("%#v, %d, %#v", s.queues, len(queue.Queue), queue.Queue) + + return result, err case "dequeue": - prefix := []byte(fmt.Sprintf("%s_", *cmd.Group)) - err = s.db.Update(func(txn *badger.Txn) error { + // prefix := []byte(fmt.Sprintf("%s_", cmd.Group)) + s.mu.Lock() - it := txn.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() + var queue *PriorityQueue[QueueItem] + var ok bool + if queue, ok = s.queues[cmd.Group]; !ok { + queue = NewPriorityQueue[QueueItem]() + s.queues[cmd.Group] = queue + } + defer s.mu.Unlock() - it.Seek(prefix) - if !it.ValidForPrefix(prefix) { - return nil - } - - itemKey := it.Item().Key() - err = it.Item().Value(func(val []byte) error { - result.Data = val - return nil - }) + item := queue.Pop() + if item != nil { + d, err := item.Encode() if err != nil { - log.Println(err) + return result, err } - return txn.Delete(itemKey) - }) - + e.Result.Data = d + result.Data = d + } + log.Printf("%#v", s.queues) return result, err default: return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name) @@ -123,303 +139,36 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { } -// type KeyValue struct { -// Key []byte -// Val []byte -// } +// SaveSnapshot saves the current IStateMachine state into a snapshot using the +// specified io.Writer object. +func (s *SMQueue) SaveSnapshot(w io.Writer, + fc sm.ISnapshotFileCollection, done <-chan struct{}) error { + // as shown above, the only state that can be saved is the Count variable + // there is no external file in this IStateMachine example, we thus leave + // the fc untouched -// // SaveSnapshot saves the current IStateMachine state into a snapshot using the -// // specified io.Writer object. -// func (s *SMQueue) SaveSnapshot(w io.Writer, -// fc sm.ISnapshotFileCollection, done <-chan struct{}) error { -// // as shown above, the only state that can be saved is the Count variable -// // there is no external file in this IStateMachine example, we thus leave -// // the fc untouched + s.mu.Lock() + defer s.mu.Unlock() -// // 创建一个只读事务 -// txn := s.db.NewTransaction(false) + return gob.NewEncoder(w).Encode(&s.queues) +} -// // 在事务中读取数据 -// iter := txn.NewIterator(badger.IteratorOptions{}) -// var items []*KeyValue +// RecoverFromSnapshot recovers the state using the provided snapshot. +func (s *SMQueue) RecoverFromSnapshot(r io.Reader, + files []sm.SnapshotFile, + done <-chan struct{}) error { + // restore the Count variable, that is the only state we maintain in this + // example, the input files is expected to be empty -// for iter.Rewind(); iter.Valid(); iter.Next() { -// item := iter.Item() + err := gob.NewDecoder(r).Decode(&s.queues) + if err != nil { + return err + } -// err := item.Value(func(val []byte) error { -// // _, err := w.Write(val) -// items = append(items, &KeyValue{ -// Key: item.Key(), -// Val: val, -// }) -// return nil -// }) -// if err != nil { -// return err -// } + return nil +} -// // 处理key-value -// } - -// // 释放迭代器和事务 -// iter.Close() -// txn.Discard() - -// return gob.NewEncoder(w).Encode(&items) -// } - -// // RecoverFromSnapshot recovers the state using the provided snapshot. -// func (s *SMQueue) RecoverFromSnapshot(r io.Reader, -// files []sm.SnapshotFile, -// done <-chan struct{}) error { -// // restore the Count variable, that is the only state we maintain in this -// // example, the input files is expected to be empty - -// var items []*KeyValue -// err := gob.NewDecoder(r).Decode(&items) -// if err != nil { -// return err -// } - -// s.db.Update(func(txn *badger.Txn) error { -// for _, item := range items { -// if err := txn.Set(item.Key, item.Val); err != nil { -// return err -// } -// } -// return nil -// }) - -// return nil -// } - -// // Close closes the IStateMachine instance. There is nothing for us to cleanup -// // or release as this is a pure in memory data store. Note that the Close -// // method is not guaranteed to be called as node can crash at any time. -// func (s *SMQueue) Close() error { return nil } - -// import ( -// "encoding/json" -// "fmt" -// "io" -// "log" -// "sync" -// "time" - -// "github.com/dgraph-io/badger/v3" -// "github.com/hashicorp/raft" -// ) - -// type SMQueue struct { -// mu sync.Mutex -// queue *Queue -// ra *raft.Raft -// } - -// func NewQueueFSM(datapath string) *SMQueue { -// q, err := NewQueue(datapath) -// if err != nil { -// panic(err) -// } -// return &SMQueue{ -// queue: q, -// } -// } - -// const ( -// Enqueue raft.LogType = 20 -// Dequeue raft.LogType = 21 -// ) - -// func (q *SMQueue) LogInfo() { -// log.Println(q.ra.GetConfiguration().Configuration(), q.ra.State()) -// } - -// func (q *SMQueue) Put(item *QueueItem) error { -// data, err := item.Encode() -// if err != nil { -// return err -// } -// return q.PutJsonString(data) -// } - -// func (q *SMQueue) PutJsonString(item []byte) error { -// task := ApplyTask{ -// Name: "enqueue", -// Object: item, -// } - -// d, err := task.Encode() -// if err != nil { -// return err -// } - -// future := q.ra.Apply(d, time.Second*15) - -// if future.Error() != nil { -// return future.Error() -// } - -// resp := future.Response() - -// if resp == nil { -// return nil -// } - -// return resp.(error) -// } - -// func (q *SMQueue) Pop(group string) (*QueueItem, error) { -// task := ApplyTask{ -// Name: "dequeue", -// Object: []byte(group), -// } - -// d, err := task.Encode() -// if err != nil { -// return nil, err -// } - -// ierr := q.ra.Apply(d, time.Second*15) - -// if ierr.Error() != nil { -// return nil, ierr.Error() -// } - -// switch v := ierr.Response().(type) { -// case error: -// return nil, v -// case *QueueItem: -// return v, nil -// default: -// return nil, fmt.Errorf("unknown %v", v) -// } -// } - -// func (q *SMQueue) Apply(log *raft.Log) interface{} { -// leader, id := q.ra.LeaderWithID() - -// q.mu.Lock() -// defer q.mu.Unlock() - -// var task ApplyTask -// err := task.Decode(log.Data) -// if err != nil { -// return err -// } - -// switch task.Name { -// case "enqueue": -// var item QueueItem -// if err := json.Unmarshal(task.Object, &item); err != nil { -// return err -// } -// return q.queue.Enqueue(&item) -// case "dequeue": -// // log.Data 传入group -// item, err := q.queue.Dequeue(string(task.Object)) -// if err != nil { -// return err -// } -// return item -// default: -// return fmt.Errorf("unknown type: %v", log.Type) -// } -// } - -// type QueueSnapshot struct { -// Items []*QueueItem -// } - -// // Persist writes the snapshot to the provided sink. -// func (snapshot *QueueSnapshot) Persist(sink raft.SnapshotSink) error { -// // The example has been simplified. In a production environment, you would -// // need to handle this operation with more care. -// return nil -// } - -// // Release is invoked when the Raft instance is finished with the snapshot. -// func (snapshot *QueueSnapshot) Release() { -// // Normally you would put any cleanup here. -// } - -// // Snapshot 返回队列快照 -// func (fsm *SMQueue) Snapshot() (raft.FSMSnapshot, error) { - -// var items []*QueueItem - -// // 使用 Badger 读取所有队列项 -// fsm.queue.db.View(func(txn *badger.Txn) error { -// opts := badger.DefaultIteratorOptions -// opts.PrefetchValues = false // 只需要key - -// it := txn.NewIterator(opts) -// defer it.Close() - -// for it.Rewind(); it.Valid(); it.Next() { - -// err := it.Item().Value(func(val []byte) error { -// item := &QueueItem{} -// err := json.Unmarshal(val, item) -// if err != nil { -// return err -// } -// items = append(items, item) -// return nil -// }) -// if err != nil { -// log.Println(err) -// } - -// } - -// return nil -// }) - -// snapshot := &QueueSnapshot{Items: items} -// return snapshot, nil - -// } - -// // Restore 恢复队列状态 -// func (fsm *SMQueue) Restore(rc io.ReadCloser) error { - -// snapshot := &QueueSnapshot{} -// if err := json.NewDecoder(rc).Decode(snapshot); err != nil { -// return err -// } - -// // 用快照数据重建队列 -// fsm.queue.db.Update(func(txn *badger.Txn) error { -// for _, item := range snapshot.Items { - -// val, err := item.Encode() -// if err != nil { -// log.Println(err) -// continue -// } - -// if err := txn.Set(fsm.queue.GetKey(item), val); err != nil { -// return err -// } -// } -// return nil -// }) - -// return nil -// } - -// func waitForCluster(ra *raft.Raft) { -// ticker := time.NewTicker(500 * time.Millisecond) -// defer ticker.Stop() - -// for range ticker.C { -// state := ra.State() -// if state == raft.Leader || state == raft.Follower { -// log.Println("Raft cluster is running") -// return -// } else { -// log.Println("Still waiting for the cluster to start...") -// } -// } -// } +// Close closes the IStateMachine instance. There is nothing for us to cleanup +// or release as this is a pure in memory data store. Note that the Close +// method is not guaranteed to be called as node can crash at any time. +func (s *SMQueue) Close() error { return nil } diff --git a/start_test.go b/start_test.go index 3b077b1..7d87816 100644 --- a/start_test.go +++ b/start_test.go @@ -1,89 +1,178 @@ package fusenrender_test -// func TestStartNodeA(t *testing.T) { -// svc, err := fusenrender.LoadConfig("etc/etc_a.yaml") -// if err != nil { -// panic(err) -// } +import ( + "context" + "fusenrender" + "log" + "testing" + "time" -// item := &fusenrender.QueueItem{ -// Group: "test", -// Priority: uint32(1), -// CreateAt: time.Now(), -// Data: "a", -// } + "github.com/lni/goutils/syncutil" +) -// a := fusenrender.StartNode(svc.ServerID, svc.Address(), fusenrender.LoadAllConfig("etc")) +func init() { + log.SetFlags(log.Llongfile) +} -// ticker := time.NewTicker(2 * time.Second) -// defer ticker.Stop() +func TestStartNodeA(t *testing.T) { + svc, err := fusenrender.LoadConfig("etc/etc_a.yaml") + if err != nil { + panic(err) + } -// for range ticker.C { -// a.LogInfo() -// // 每2秒执行一次 -// err := a.Put(item) -// if err != nil { -// log.Println(err) -// } -// } + item := &fusenrender.QueueItem{ + Group: "test", + Priority: uint32(1), + CreateAt: time.Now(), + Data: "a", + } -// select {} -// } + a := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) -// func TestStartNodeB(t *testing.T) { -// svc, err := fusenrender.LoadConfig("etc/etc_b.yaml") -// if err != nil { -// panic(err) -// } + raftStopper := syncutil.NewStopper() -// item := &fusenrender.QueueItem{ -// Group: "test", -// Priority: uint32(1), -// CreateAt: time.Now(), -// Data: "b", -// } + // ch := make(chan string, 16) -// b := fusenrender.StartNode(svc.ServerID, svc.Address(), fusenrender.LoadAllConfig("etc")) -// ticker := time.NewTicker(2 * time.Second) -// defer ticker.Stop() + raftStopper.RunWorker(func() { + // this goroutine makes a linearizable read every 10 second. it returns the + // Count value maintained in IStateMachine. see datastore.go for details. + cs := a.GetNoOPSession(128) + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + cmd := fusenrender.Command{ + Name: "enqueue", + Group: "test", + Item: item, + } -// for range ticker.C { -// b.LogInfo() -// // 每2秒执行一次 -// err := b.Put(item) -// if err != nil { -// log.Println(err) -// } -// } + data, err := cmd.Encode() + if err != nil { + log.Println(err) + } -// select {} + result, err := a.SyncPropose(ctx, cs, data) + if err != nil { + log.Println(err) + } -// } + log.Println(len(result.Data)) + cancel() -// func TestStartNodeC(t *testing.T) { + case <-raftStopper.ShouldStop(): + return + } + } + }) -// svc, err := fusenrender.LoadConfig("etc/etc_c.yaml") -// if err != nil { -// panic(err) -// } + raftStopper.Wait() -// c := fusenrender.StartNode(svc.ServerID, svc.Address(), fusenrender.LoadAllConfig("etc")) +} -// ticker := time.NewTicker(2 * time.Second) -// defer ticker.Stop() +func TestStartNodeB(t *testing.T) { + svc, err := fusenrender.LoadConfig("etc/etc_b.yaml") + if err != nil { + panic(err) + } -// for range ticker.C { -// c.LogInfo() + item := &fusenrender.QueueItem{ + Group: "test", + Priority: uint32(1), + CreateAt: time.Now(), + Data: "b", + } -// item, err := c.Pop("test") -// if err != nil { -// log.Println(err) -// continue -// } -// log.Println(item) + nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) -// if item == nil { -// log.Println("empty") -// } -// } -// } + raftStopper := syncutil.NewStopper() + + // ch := make(chan string, 16) + + raftStopper.RunWorker(func() { + // this goroutine makes a linearizable read every 10 second. it returns the + // Count value maintained in IStateMachine. see datastore.go for details. + cs := nh.GetNoOPSession(128) + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + cmd := fusenrender.Command{ + Name: "enqueue", + Group: "test", + Item: item, + } + + data, err := cmd.Encode() + if err != nil { + log.Println(err) + } + + result, err := nh.SyncPropose(ctx, cs, data) + if err != nil { + log.Println(err) + } + + log.Println(result) + cancel() + + case <-raftStopper.ShouldStop(): + return + } + } + }) + + raftStopper.Wait() + +} + +func TestStartNodeC(t *testing.T) { + + svc, err := fusenrender.LoadConfig("etc/etc_c.yaml") + if err != nil { + panic(err) + } + + nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + + raftStopper := syncutil.NewStopper() + + // ch := make(chan string, 16) + + raftStopper.RunWorker(func() { + // this goroutine makes a linearizable read every 10 second. it returns the + // Count value maintained in IStateMachine. see datastore.go for details. + cs := nh.GetNoOPSession(128) + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + cmd := fusenrender.Command{ + Name: "dequeue", + Group: "test", + } + + data, err := cmd.Encode() + if err != nil { + log.Println(err) + } + + result, err := nh.SyncPropose(ctx, cs, data) + if err != nil { + log.Println(err) + } + + log.Println(string(result.Data)) + cancel() + + case <-raftStopper.ShouldStop(): + return + } + } + }) + + raftStopper.Wait() +}