测试通过

This commit is contained in:
eson 2023-07-30 06:50:58 +08:00
parent a0a0a637d1
commit d5d39c4f3a
10 changed files with 419 additions and 607 deletions

4
.gitignore vendored
View File

@ -2,4 +2,6 @@ debug_test*
__debug_*
fusenrender
fusenrender
example-data

View File

@ -11,7 +11,7 @@ import (
)
type ConfigServer struct {
ServerID string `yaml:"serverid"`
ServerID uint64 `yaml:"serverid"`
Host string `yaml:"host"`
Port int `yaml:"port"`
}

View File

@ -1,3 +1,3 @@
serverid: "a"
serverid: 1
host: "localhost"
port: 5050

View File

@ -1,3 +1,3 @@
serverid: "b"
serverid: 2
host: "localhost"
port: 5051

View File

@ -1,3 +1,3 @@
serverid: "c"
serverid: 3
host: "localhost"
port: 5052

268
main.go
View File

@ -1,173 +1,115 @@
package fusenrender
import (
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"runtime"
"syscall"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/config"
"github.com/lni/dragonboat/v4/logger"
)
func main() {
// replicaID := 1
}
var addresses []string = []string{
"localhost:5500",
"localhost:5501",
"localhost:5502",
}
func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat.NodeHost {
// addr := "localhost"
// port := 5050
// flag.Parse()
// if len(addr) == 0 && replicaID != 1 && replicaID != 2 && replicaID != 3 {
// fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n")
// os.Exit(1)
// }
// // https://github.com/golang/go/issues/17393
// if runtime.GOOS == "darwin" {
// signal.Ignore(syscall.Signal(0xd))
// }
// initialMembers := make(map[uint64]string)
// // when joining a new node which is not an initial members, the initialMembers
// // map should be empty.
// // when restarting a node that is not a member of the initial nodes, you can
// // leave the initialMembers to be empty. we still populate the initialMembers
// // here for simplicity.
// // if join {
// // for idx, v := range addresses {
// // // key is the ReplicaID, ReplicaID is not allowed to be 0
// // // value is the raft address
// // initialMembers[uint64(idx+1)] = v
// // }
// // }
// var nodeAddr string
// // for simplicity, in this example program, addresses of all those 3 initial
// // raft members are hard coded. when address is not specified on the command
// // line, we assume the node being launched is an initial raft member.
// addr = fmt.Sprintf("%s:%d", addr, port)
// nodeAddr = initialMembers[uint64(replicaID)]
flag.Parse()
if len(addr) == 0 && replicaID != 1 && replicaID != 2 && replicaID != 3 {
fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n")
os.Exit(1)
}
// https://github.com/golang/go/issues/17393
if runtime.GOOS == "darwin" {
signal.Ignore(syscall.Signal(0xd))
}
initialMembers := make(map[uint64]string)
// when joining a new node which is not an initial members, the initialMembers
// map should be empty.
// when restarting a node that is not a member of the initial nodes, you can
// leave the initialMembers to be empty. we still populate the initialMembers
// here for simplicity.
for idx, v := range addresses {
// key is the ReplicaID, ReplicaID is not allowed to be 0
// value is the raft address
initialMembers[uint64(idx+1)] = v
}
// for simplicity, in this example program, addresses of all those 3 initial
// raft members are hard coded. when address is not specified on the command
// line, we assume the node being launched is an initial raft member.
var nodeAddr = initialMembers[uint64(replicaID)]
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
// change the log verbosity
logger.GetLogger("raft").SetLevel(logger.ERROR)
logger.GetLogger("rsm").SetLevel(logger.WARNING)
logger.GetLogger("transport").SetLevel(logger.WARNING)
logger.GetLogger("grpc").SetLevel(logger.WARNING)
// config for raft node
// See GoDoc for all available options
rc := config.Config{
// ShardID and ReplicaID of the raft node
ReplicaID: uint64(replicaID),
ShardID: exampleShardID,
ElectionRTT: 10,
HeartbeatRTT: 1,
CheckQuorum: true,
SnapshotEntries: 10,
CompactionOverhead: 5,
}
datadir := filepath.Join(
"example-data",
"queue-data",
fmt.Sprintf("node%d", replicaID))
nhc := config.NodeHostConfig{
WALDir: datadir,
// NodeHostDir is where everything else is stored.
NodeHostDir: datadir,
// RTTMillisecond is the average round trip time between NodeHosts (usually
// on two machines/vms), it is in millisecond. Such RTT includes the
// processing delays caused by NodeHosts, not just the network delay between
// two NodeHost instances.
RTTMillisecond: 200,
// RaftAddress is used to identify the NodeHost instance
RaftAddress: nodeAddr,
}
nh, err := dragonboat.NewNodeHost(nhc)
if err != nil {
panic(err)
}
if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
os.Exit(1)
}
return nh
// fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
// // change the log verbosity
// logger.GetLogger("raft").SetLevel(logger.ERROR)
// logger.GetLogger("rsm").SetLevel(logger.WARNING)
// logger.GetLogger("transport").SetLevel(logger.WARNING)
// logger.GetLogger("grpc").SetLevel(logger.WARNING)
// // config for raft node
// // See GoDoc for all available options
// rc := config.Config{
// // ShardID and ReplicaID of the raft node
// ReplicaID: uint64(replicaID),
// ShardID: 128,
// // In this example, we assume the end-to-end round trip time (RTT) between
// // NodeHost instances (on different machines, VMs or containers) are 200
// // millisecond, it is set in the RTTMillisecond field of the
// // config.NodeHostConfig instance below.
// // ElectionRTT is set to 10 in this example, it determines that the node
// // should start an election if there is no heartbeat from the leader for
// // 10 * RTT time intervals.
// ElectionRTT: 10,
// // HeartbeatRTT is set to 1 in this example, it determines that when the
// // node is a leader, it should broadcast heartbeat messages to its followers
// // every such 1 * RTT time interval.
// HeartbeatRTT: 1,
// CheckQuorum: true,
// // SnapshotEntries determines how often should we take a snapshot of the
// // replicated state machine, it is set to 10 her which means a snapshot
// // will be captured for every 10 applied proposals (writes).
// // In your real world application, it should be set to much higher values
// // You need to determine a suitable value based on how much space you are
// // willing use on Raft Logs, how fast can you capture a snapshot of your
// // replicated state machine, how often such snapshot is going to be used
// // etc.
// SnapshotEntries: 10,
// // Once a snapshot is captured and saved, how many Raft entries already
// // covered by the new snapshot should be kept. This is useful when some
// // followers are just a little bit left behind, with such overhead Raft
// // entries, the leaders can send them regular entries rather than the full
// // snapshot image.
// CompactionOverhead: 5,
// }
// datadir := filepath.Join(
// "example-data",
// "helloworld-data",
// fmt.Sprintf("node%d", replicaID))
// // config for the nodehost
// // See GoDoc for all available options
// // by default, insecure transport is used, you can choose to use Mutual TLS
// // Authentication to authenticate both servers and clients. To use Mutual
// // TLS Authentication, set the MutualTLS field in NodeHostConfig to true, set
// // the CAFile, CertFile and KeyFile fields to point to the path of your CA
// // file, certificate and key files.
// nhc := config.NodeHostConfig{
// // WALDir is the directory to store the WAL of all Raft Logs. It is
// // recommended to use Enterprise SSDs with good fsync() performance
// // to get the best performance. A few SSDs we tested or known to work very
// // well
// // Recommended SATA SSDs -
// // Intel S3700, Intel S3710, Micron 500DC
// // Other SATA enterprise class SSDs with power loss protection
// // Recommended NVME SSDs -
// // Most enterprise NVME currently available on the market.
// // SSD to avoid -
// // Consumer class SSDs, no matter whether they are SATA or NVME based, as
// // they usually have very poor fsync() performance.
// //
// // You can use the pg_test_fsync tool shipped with PostgreSQL to test the
// // fsync performance of your WAL disk. It is recommended to use SSDs with
// // fsync latency of well below 1 millisecond.
// //
// // Note that this is only for storing the WAL of Raft Logs, it is size is
// // usually pretty small, 64GB per NodeHost is usually more than enough.
// //
// // If you just have one disk in your system, just set WALDir and NodeHostDir
// // to the same location.
// WALDir: datadir,
// // NodeHostDir is where everything else is stored.
// NodeHostDir: datadir,
// // RTTMillisecond is the average round trip time between NodeHosts (usually
// // on two machines/vms), it is in millisecond. Such RTT includes the
// // processing delays caused by NodeHosts, not just the network delay between
// // two NodeHost instances.
// RTTMillisecond: 200,
// // RaftAddress is used to identify the NodeHost instance
// RaftAddress: nodeAddr,
// }
// nh, err := dragonboat.NewNodeHost(nhc)
// if err != nil {
// panic(err)
// }
// if err := nh.StartReplica(initialMembers, true, NewSMQueue, rc); err != nil {
// fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
// os.Exit(1)
// }
// raftStopper := syncutil.NewStopper()
// consoleStopper := syncutil.NewStopper()
// ch := make(chan string, 16)
// consoleStopper.RunWorker(func() {
// reader := bufio.NewReader(os.Stdin)
// for {
// s, err := reader.ReadString('\n')
// if err != nil {
// close(ch)
// return
// }
// if s == "exit\n" {
// raftStopper.Stop()
// // no data will be lost/corrupted if nodehost.Stop() is not called
// nh.Close()
// return
// }
// ch <- s
// }
// })
// raftStopper.RunWorker(func() {
// // this goroutine makes a linearizable read every 10 second. it returns the
// // Count value maintained in IStateMachine. see datastore.go for details.
// ticker := time.NewTicker(10 * time.Second)
// for {
// select {
// case <-ticker.C:
// ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
// result, err := nh.SyncRead(ctx, exampleShardID, []byte{})
// cancel()
// if err == nil {
// var count uint64
// count = binary.LittleEndian.Uint64(result.([]byte))
// fmt.Fprintf(os.Stdout, "count: %d\n", count)
// }
// case <-raftStopper.ShouldStop():
// return
// }
// }
// })
// raftStopper.RunWorker(func() {
// // use a NO-OP client session here
// // check the example in godoc to see how to use a regular client session

View File

@ -1,39 +1,72 @@
package fusenrender
import "bytes"
import (
"bytes"
"container/heap"
"sync"
)
// An Slice is something we manage in a priority queue.
type Slice[T any] struct {
Key []byte // 键
Value T // 值
Value *T // 值
// 索引是在容器中的位置
index int
}
type PriorityQueue[T any] struct {
Queue HeapSlice[T]
mu sync.Mutex
}
func (pq *PriorityQueue[T]) Push(x *Slice[T]) {
pq.mu.Lock()
defer pq.mu.Unlock()
heap.Push(&pq.Queue, x)
}
func (pq *PriorityQueue[T]) Pop() *T {
pq.mu.Lock()
defer pq.mu.Unlock()
if pq.Queue.Len() == 0 {
return nil
}
return heap.Pop(&pq.Queue).(*T)
}
func NewPriorityQueue[T any]() *PriorityQueue[T] {
return &PriorityQueue[T]{
Queue: make(HeapSlice[T], 0),
}
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue[T any] []*Slice[T]
type HeapSlice[T any] []*Slice[T]
func (pq PriorityQueue[T]) Len() int { return len(pq) }
func (pq HeapSlice[T]) Len() int { return len(pq) }
func (pq PriorityQueue[T]) Less(i, j int) bool {
func (pq HeapSlice[T]) Less(i, j int) bool {
// 使用字节数组键比较
return bytes.Compare(pq[i].Key, pq[j].Key) < 0
}
func (pq PriorityQueue[T]) Swap(i, j int) {
func (pq HeapSlice[T]) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue[T]) Push(x any) {
func (pq *HeapSlice[T]) Push(x any) {
n := len(*pq)
item := x.(*Slice[T])
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue[T]) Pop() any {
func (pq *HeapSlice[T]) Pop() any {
old := *pq
n := len(old)
item := old[n-1]

View File

@ -1,48 +1,45 @@
package fusenrender_test
import (
"container/heap"
"fmt"
"fusenrender"
"testing"
)
func TestQueue(t *testing.T) {
q := make(fusenrender.PriorityQueue[*fusenrender.QueueItem], 0)
heap.Init(&q)
// q := make(fusenrender.PriorityQueue[*fusenrender.QueueItem], 0)
// heap.Init(&q)
// 添加几个item
qitema := &fusenrender.QueueItem{Group: "testa", Priority: 2}
a := &fusenrender.Slice[*fusenrender.QueueItem]{
Key: qitema.GetKey(),
Value: qitema,
}
// // 添加几个item
// qitema := &fusenrender.QueueItem{Group: "testa", Priority: 2}
// a := &fusenrender.Slice[*fusenrender.QueueItem]{
// Key: qitema.GetKey(),
// Value: qitema,
// }
qitemb := &fusenrender.QueueItem{Group: "testa", Priority: 4}
b := &fusenrender.Slice[*fusenrender.QueueItem]{
Key: qitemb.GetKey(),
Value: qitemb,
}
// qitemb := &fusenrender.QueueItem{Group: "testa", Priority: 4}
// b := &fusenrender.Slice[*fusenrender.QueueItem]{
// Key: qitemb.GetKey(),
// Value: qitemb,
// }
qitemc := &fusenrender.QueueItem{Group: "testb", Priority: 3}
c := &fusenrender.Slice[*fusenrender.QueueItem]{
Key: qitemc.GetKey(),
Value: qitemc,
}
// qitemc := &fusenrender.QueueItem{Group: "testb", Priority: 3}
// c := &fusenrender.Slice[*fusenrender.QueueItem]{
// Key: qitemc.GetKey(),
// Value: qitemc,
// }
heap.Push(&q, a)
// heap.Push(&q, a)
heap.Push(&q, a)
// heap.Push(&q, a)
heap.Push(&q, a)
heap.Push(&q, b)
heap.Push(&q, c)
// heap.Push(&q, a)
// heap.Push(&q, b)
// heap.Push(&q, c)
// 取出最高优先级的item
for len(q) != 0 {
item := heap.Pop(&q).(*fusenrender.QueueItem)
fmt.Printf("%v\n", item)
}
// // 取出最高优先级的item
// for len(q) != 0 {
// item := heap.Pop(&q).(*fusenrender.QueueItem)
// fmt.Printf("%v\n", item)
// }
// 更新某一项的优先级
// banana := pq[1] // 拿到banana项

411
sm.go
View File

@ -1,12 +1,14 @@
package fusenrender
import (
"encoding/gob"
"encoding/json"
"fmt"
"io"
"log"
"sync"
"time"
"github.com/dgraph-io/badger/v3"
sm "github.com/lni/dragonboat/v4/statemachine"
)
@ -14,15 +16,23 @@ type SMQueue struct {
shardID uint64
replicaID uint64
db *badger.DB
queue PriorityQueue[QueueItem]
mu sync.Mutex
queues map[string]*PriorityQueue[QueueItem]
}
type Command struct {
Name string `json:"name"`
Name string `json:"name"`
Group string `json:"group"`
Group *string `json:"group"`
Item *QueueItem `json:"item"`
Item *QueueItem `json:"item"`
}
func (cmd *Command) Encode() ([]byte, error) {
val, err := json.Marshal(cmd)
if err != nil {
return nil, err
}
return val, nil
}
type QueueItem struct {
@ -46,16 +56,12 @@ func (item *QueueItem) GetKey() []byte {
// // NewSMQueue creates and return a new ExampleStateMachine object.
func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
// opts := badger.DefaultOptions(datapath)
// db, err := badger.Open(opts)
// if err != nil {
// panic(err)
// }
return &SMQueue{
shardID: shardID,
replicaID: replicaID,
// db: db,
queues: make(map[string]*PriorityQueue[QueueItem]),
}
}
@ -67,10 +73,6 @@ func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) {
return item, nil
}
// func (q *SMQueue) GetKey(item *QueueItem) []byte {
// return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
// }
// Update updates the object using the specified committed raft entry.
func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
@ -87,35 +89,49 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
if err != nil {
return result, err
}
key := cmd.Item.GetKey()
err = s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(s.GetKey(cmd.Item)), d)
s.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = s.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
s.queues[cmd.Group] = queue
}
defer s.mu.Unlock()
queue.Push(&Slice[QueueItem]{
Key: key,
Value: cmd.Item,
})
return sm.Result{Value: uint64(len(d))}, err
result.Value = uint64(len(d))
log.Printf("%#v, %d, %#v", s.queues, len(queue.Queue), queue.Queue)
return result, err
case "dequeue":
prefix := []byte(fmt.Sprintf("%s_", *cmd.Group))
err = s.db.Update(func(txn *badger.Txn) error {
// prefix := []byte(fmt.Sprintf("%s_", cmd.Group))
s.mu.Lock()
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = s.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
s.queues[cmd.Group] = queue
}
defer s.mu.Unlock()
it.Seek(prefix)
if !it.ValidForPrefix(prefix) {
return nil
}
itemKey := it.Item().Key()
err = it.Item().Value(func(val []byte) error {
result.Data = val
return nil
})
item := queue.Pop()
if item != nil {
d, err := item.Encode()
if err != nil {
log.Println(err)
return result, err
}
return txn.Delete(itemKey)
})
e.Result.Data = d
result.Data = d
}
log.Printf("%#v", s.queues)
return result, err
default:
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name)
@ -123,303 +139,36 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
}
// type KeyValue struct {
// Key []byte
// Val []byte
// }
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
func (s *SMQueue) SaveSnapshot(w io.Writer,
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
// the fc untouched
// // SaveSnapshot saves the current IStateMachine state into a snapshot using the
// // specified io.Writer object.
// func (s *SMQueue) SaveSnapshot(w io.Writer,
// fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// // as shown above, the only state that can be saved is the Count variable
// // there is no external file in this IStateMachine example, we thus leave
// // the fc untouched
s.mu.Lock()
defer s.mu.Unlock()
// // 创建一个只读事务
// txn := s.db.NewTransaction(false)
return gob.NewEncoder(w).Encode(&s.queues)
}
// // 在事务中读取数据
// iter := txn.NewIterator(badger.IteratorOptions{})
// var items []*KeyValue
// RecoverFromSnapshot recovers the state using the provided snapshot.
func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty
// for iter.Rewind(); iter.Valid(); iter.Next() {
// item := iter.Item()
err := gob.NewDecoder(r).Decode(&s.queues)
if err != nil {
return err
}
// err := item.Value(func(val []byte) error {
// // _, err := w.Write(val)
// items = append(items, &KeyValue{
// Key: item.Key(),
// Val: val,
// })
// return nil
// })
// if err != nil {
// return err
// }
return nil
}
// // 处理key-value
// }
// // 释放迭代器和事务
// iter.Close()
// txn.Discard()
// return gob.NewEncoder(w).Encode(&items)
// }
// // RecoverFromSnapshot recovers the state using the provided snapshot.
// func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
// files []sm.SnapshotFile,
// done <-chan struct{}) error {
// // restore the Count variable, that is the only state we maintain in this
// // example, the input files is expected to be empty
// var items []*KeyValue
// err := gob.NewDecoder(r).Decode(&items)
// if err != nil {
// return err
// }
// s.db.Update(func(txn *badger.Txn) error {
// for _, item := range items {
// if err := txn.Set(item.Key, item.Val); err != nil {
// return err
// }
// }
// return nil
// })
// return nil
// }
// // Close closes the IStateMachine instance. There is nothing for us to cleanup
// // or release as this is a pure in memory data store. Note that the Close
// // method is not guaranteed to be called as node can crash at any time.
// func (s *SMQueue) Close() error { return nil }
// import (
// "encoding/json"
// "fmt"
// "io"
// "log"
// "sync"
// "time"
// "github.com/dgraph-io/badger/v3"
// "github.com/hashicorp/raft"
// )
// type SMQueue struct {
// mu sync.Mutex
// queue *Queue
// ra *raft.Raft
// }
// func NewQueueFSM(datapath string) *SMQueue {
// q, err := NewQueue(datapath)
// if err != nil {
// panic(err)
// }
// return &SMQueue{
// queue: q,
// }
// }
// const (
// Enqueue raft.LogType = 20
// Dequeue raft.LogType = 21
// )
// func (q *SMQueue) LogInfo() {
// log.Println(q.ra.GetConfiguration().Configuration(), q.ra.State())
// }
// func (q *SMQueue) Put(item *QueueItem) error {
// data, err := item.Encode()
// if err != nil {
// return err
// }
// return q.PutJsonString(data)
// }
// func (q *SMQueue) PutJsonString(item []byte) error {
// task := ApplyTask{
// Name: "enqueue",
// Object: item,
// }
// d, err := task.Encode()
// if err != nil {
// return err
// }
// future := q.ra.Apply(d, time.Second*15)
// if future.Error() != nil {
// return future.Error()
// }
// resp := future.Response()
// if resp == nil {
// return nil
// }
// return resp.(error)
// }
// func (q *SMQueue) Pop(group string) (*QueueItem, error) {
// task := ApplyTask{
// Name: "dequeue",
// Object: []byte(group),
// }
// d, err := task.Encode()
// if err != nil {
// return nil, err
// }
// ierr := q.ra.Apply(d, time.Second*15)
// if ierr.Error() != nil {
// return nil, ierr.Error()
// }
// switch v := ierr.Response().(type) {
// case error:
// return nil, v
// case *QueueItem:
// return v, nil
// default:
// return nil, fmt.Errorf("unknown %v", v)
// }
// }
// func (q *SMQueue) Apply(log *raft.Log) interface{} {
// leader, id := q.ra.LeaderWithID()
// q.mu.Lock()
// defer q.mu.Unlock()
// var task ApplyTask
// err := task.Decode(log.Data)
// if err != nil {
// return err
// }
// switch task.Name {
// case "enqueue":
// var item QueueItem
// if err := json.Unmarshal(task.Object, &item); err != nil {
// return err
// }
// return q.queue.Enqueue(&item)
// case "dequeue":
// // log.Data 传入group
// item, err := q.queue.Dequeue(string(task.Object))
// if err != nil {
// return err
// }
// return item
// default:
// return fmt.Errorf("unknown type: %v", log.Type)
// }
// }
// type QueueSnapshot struct {
// Items []*QueueItem
// }
// // Persist writes the snapshot to the provided sink.
// func (snapshot *QueueSnapshot) Persist(sink raft.SnapshotSink) error {
// // The example has been simplified. In a production environment, you would
// // need to handle this operation with more care.
// return nil
// }
// // Release is invoked when the Raft instance is finished with the snapshot.
// func (snapshot *QueueSnapshot) Release() {
// // Normally you would put any cleanup here.
// }
// // Snapshot 返回队列快照
// func (fsm *SMQueue) Snapshot() (raft.FSMSnapshot, error) {
// var items []*QueueItem
// // 使用 Badger 读取所有队列项
// fsm.queue.db.View(func(txn *badger.Txn) error {
// opts := badger.DefaultIteratorOptions
// opts.PrefetchValues = false // 只需要key
// it := txn.NewIterator(opts)
// defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
// err := it.Item().Value(func(val []byte) error {
// item := &QueueItem{}
// err := json.Unmarshal(val, item)
// if err != nil {
// return err
// }
// items = append(items, item)
// return nil
// })
// if err != nil {
// log.Println(err)
// }
// }
// return nil
// })
// snapshot := &QueueSnapshot{Items: items}
// return snapshot, nil
// }
// // Restore 恢复队列状态
// func (fsm *SMQueue) Restore(rc io.ReadCloser) error {
// snapshot := &QueueSnapshot{}
// if err := json.NewDecoder(rc).Decode(snapshot); err != nil {
// return err
// }
// // 用快照数据重建队列
// fsm.queue.db.Update(func(txn *badger.Txn) error {
// for _, item := range snapshot.Items {
// val, err := item.Encode()
// if err != nil {
// log.Println(err)
// continue
// }
// if err := txn.Set(fsm.queue.GetKey(item), val); err != nil {
// return err
// }
// }
// return nil
// })
// return nil
// }
// func waitForCluster(ra *raft.Raft) {
// ticker := time.NewTicker(500 * time.Millisecond)
// defer ticker.Stop()
// for range ticker.C {
// state := ra.State()
// if state == raft.Leader || state == raft.Follower {
// log.Println("Raft cluster is running")
// return
// } else {
// log.Println("Still waiting for the cluster to start...")
// }
// }
// }
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
func (s *SMQueue) Close() error { return nil }

View File

@ -1,89 +1,178 @@
package fusenrender_test
// func TestStartNodeA(t *testing.T) {
// svc, err := fusenrender.LoadConfig("etc/etc_a.yaml")
// if err != nil {
// panic(err)
// }
import (
"context"
"fusenrender"
"log"
"testing"
"time"
// item := &fusenrender.QueueItem{
// Group: "test",
// Priority: uint32(1),
// CreateAt: time.Now(),
// Data: "a",
// }
"github.com/lni/goutils/syncutil"
)
// a := fusenrender.StartNode(svc.ServerID, svc.Address(), fusenrender.LoadAllConfig("etc"))
func init() {
log.SetFlags(log.Llongfile)
}
// ticker := time.NewTicker(2 * time.Second)
// defer ticker.Stop()
func TestStartNodeA(t *testing.T) {
svc, err := fusenrender.LoadConfig("etc/etc_a.yaml")
if err != nil {
panic(err)
}
// for range ticker.C {
// a.LogInfo()
// // 每2秒执行一次
// err := a.Put(item)
// if err != nil {
// log.Println(err)
// }
// }
item := &fusenrender.QueueItem{
Group: "test",
Priority: uint32(1),
CreateAt: time.Now(),
Data: "a",
}
// select {}
// }
a := fusenrender.StartNode(svc.ServerID, 128, svc.Address())
// func TestStartNodeB(t *testing.T) {
// svc, err := fusenrender.LoadConfig("etc/etc_b.yaml")
// if err != nil {
// panic(err)
// }
raftStopper := syncutil.NewStopper()
// item := &fusenrender.QueueItem{
// Group: "test",
// Priority: uint32(1),
// CreateAt: time.Now(),
// Data: "b",
// }
// ch := make(chan string, 16)
// b := fusenrender.StartNode(svc.ServerID, svc.Address(), fusenrender.LoadAllConfig("etc"))
// ticker := time.NewTicker(2 * time.Second)
// defer ticker.Stop()
raftStopper.RunWorker(func() {
// this goroutine makes a linearizable read every 10 second. it returns the
// Count value maintained in IStateMachine. see datastore.go for details.
cs := a.GetNoOPSession(128)
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
cmd := fusenrender.Command{
Name: "enqueue",
Group: "test",
Item: item,
}
// for range ticker.C {
// b.LogInfo()
// // 每2秒执行一次
// err := b.Put(item)
// if err != nil {
// log.Println(err)
// }
// }
data, err := cmd.Encode()
if err != nil {
log.Println(err)
}
// select {}
result, err := a.SyncPropose(ctx, cs, data)
if err != nil {
log.Println(err)
}
// }
log.Println(len(result.Data))
cancel()
// func TestStartNodeC(t *testing.T) {
case <-raftStopper.ShouldStop():
return
}
}
})
// svc, err := fusenrender.LoadConfig("etc/etc_c.yaml")
// if err != nil {
// panic(err)
// }
raftStopper.Wait()
// c := fusenrender.StartNode(svc.ServerID, svc.Address(), fusenrender.LoadAllConfig("etc"))
}
// ticker := time.NewTicker(2 * time.Second)
// defer ticker.Stop()
func TestStartNodeB(t *testing.T) {
svc, err := fusenrender.LoadConfig("etc/etc_b.yaml")
if err != nil {
panic(err)
}
// for range ticker.C {
// c.LogInfo()
item := &fusenrender.QueueItem{
Group: "test",
Priority: uint32(1),
CreateAt: time.Now(),
Data: "b",
}
// item, err := c.Pop("test")
// if err != nil {
// log.Println(err)
// continue
// }
// log.Println(item)
nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address())
// if item == nil {
// log.Println("empty")
// }
// }
// }
raftStopper := syncutil.NewStopper()
// ch := make(chan string, 16)
raftStopper.RunWorker(func() {
// this goroutine makes a linearizable read every 10 second. it returns the
// Count value maintained in IStateMachine. see datastore.go for details.
cs := nh.GetNoOPSession(128)
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
cmd := fusenrender.Command{
Name: "enqueue",
Group: "test",
Item: item,
}
data, err := cmd.Encode()
if err != nil {
log.Println(err)
}
result, err := nh.SyncPropose(ctx, cs, data)
if err != nil {
log.Println(err)
}
log.Println(result)
cancel()
case <-raftStopper.ShouldStop():
return
}
}
})
raftStopper.Wait()
}
func TestStartNodeC(t *testing.T) {
svc, err := fusenrender.LoadConfig("etc/etc_c.yaml")
if err != nil {
panic(err)
}
nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address())
raftStopper := syncutil.NewStopper()
// ch := make(chan string, 16)
raftStopper.RunWorker(func() {
// this goroutine makes a linearizable read every 10 second. it returns the
// Count value maintained in IStateMachine. see datastore.go for details.
cs := nh.GetNoOPSession(128)
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
cmd := fusenrender.Command{
Name: "dequeue",
Group: "test",
}
data, err := cmd.Encode()
if err != nil {
log.Println(err)
}
result, err := nh.SyncPropose(ctx, cs, data)
if err != nil {
log.Println(err)
}
log.Println(string(result.Data))
cancel()
case <-raftStopper.ShouldStop():
return
}
}
})
raftStopper.Wait()
}