144 lines
3.6 KiB
Go
144 lines
3.6 KiB
Go
package fusenrender
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"runtime"
|
|
"syscall"
|
|
|
|
"github.com/474420502/execute/triggered"
|
|
"github.com/lni/dragonboat/v4"
|
|
"github.com/lni/dragonboat/v4/config"
|
|
"github.com/lni/dragonboat/v4/logger"
|
|
)
|
|
|
|
var shardID uint64 = 128
|
|
|
|
var DequeueHandler = triggered.RegisterExecute(func(params *triggered.Params[bool]) {
|
|
|
|
params.Shared.Value(func(v any) {
|
|
|
|
cs := stateClient.GetNoOPSession()
|
|
for i := 0; ; i++ {
|
|
|
|
item, err := stateClient.PopItem(cs, "test")
|
|
if err != nil {
|
|
log.Println(err)
|
|
break
|
|
}
|
|
if item == nil {
|
|
return
|
|
}
|
|
|
|
PopChannel <- []byte(item.Data.(string))
|
|
|
|
}
|
|
})
|
|
|
|
})
|
|
|
|
func StartNode(cfg *ConfigServer) {
|
|
|
|
replicaID := cfg.ServerID
|
|
addr := cfg.Address()
|
|
// addr := "localhost"
|
|
|
|
// addr = fmt.Sprintf("%s:%d", addr, port)
|
|
|
|
flag.Parse()
|
|
if len(addr) == 0 && replicaID != 1 && replicaID != 2 && replicaID != 3 {
|
|
fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n")
|
|
os.Exit(1)
|
|
}
|
|
// https://github.com/golang/go/issues/17393
|
|
if runtime.GOOS == "darwin" {
|
|
signal.Ignore(syscall.Signal(0xd))
|
|
}
|
|
initialMembers := make(map[uint64]string)
|
|
|
|
// when joining a new node which is not an initial members, the initialMembers
|
|
// map should be empty.
|
|
// when restarting a node that is not a member of the initial nodes, you can
|
|
// leave the initialMembers to be empty. we still populate the initialMembers
|
|
// here for simplicity.
|
|
|
|
for idx, v := range cfg.Cluster {
|
|
// key is the ReplicaID, ReplicaID is not allowed to be 0
|
|
// value is the raft address
|
|
initialMembers[uint64(idx+1)] = v
|
|
}
|
|
|
|
// for simplicity, in this example program, addresses of all those 3 initial
|
|
// raft members are hard coded. when address is not specified on the command
|
|
// line, we assume the node being launched is an initial raft member.
|
|
|
|
var nodeAddr = initialMembers[uint64(replicaID)]
|
|
|
|
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
|
|
// change the log verbosity
|
|
logger.GetLogger("dragonboat").SetLevel(logger.ERROR)
|
|
logger.GetLogger("raft").SetLevel(logger.ERROR)
|
|
logger.GetLogger("raftpb").SetLevel(logger.ERROR)
|
|
logger.GetLogger("logdb").SetLevel(logger.ERROR)
|
|
logger.GetLogger("rsm").SetLevel(logger.ERROR)
|
|
logger.GetLogger("transport").SetLevel(logger.ERROR)
|
|
logger.GetLogger("grpc").SetLevel(logger.ERROR)
|
|
// config for raft node
|
|
// See GoDoc for all available options
|
|
rc := config.Config{
|
|
// ShardID and ReplicaID of the raft node
|
|
ReplicaID: uint64(replicaID),
|
|
ShardID: shardID,
|
|
|
|
ElectionRTT: 10,
|
|
|
|
HeartbeatRTT: 1,
|
|
CheckQuorum: true,
|
|
|
|
SnapshotEntries: 10,
|
|
|
|
CompactionOverhead: 5,
|
|
}
|
|
datadir := filepath.Join(
|
|
"example-data",
|
|
"queue-data",
|
|
fmt.Sprintf("node%d", replicaID))
|
|
|
|
nhc := config.NodeHostConfig{
|
|
|
|
WALDir: datadir,
|
|
// NodeHostDir is where everything else is stored.
|
|
NodeHostDir: datadir,
|
|
// RTTMillisecond is the average round trip time between NodeHosts (usually
|
|
// on two machines/vms), it is in millisecond. Such RTT includes the
|
|
// processing delays caused by NodeHosts, not just the network delay between
|
|
// two NodeHost instances.
|
|
RTTMillisecond: 200,
|
|
// RaftAddress is used to identify the NodeHost instance
|
|
RaftAddress: nodeAddr,
|
|
}
|
|
|
|
nh, err := dragonboat.NewNodeHost(nhc)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
stateClient = &StateClient{nh: nh}
|
|
|
|
// 把引用计数设置为0
|
|
DequeueHandler.RefCountAdd(-1)
|
|
// 设置共享的参数
|
|
DequeueHandler.WithShared(nh)
|
|
|
|
if err := nh.StartReplica(initialMembers, true, NewSMQueue, rc); err != nil {
|
|
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
HttpListen(nh, cfg.Port-1000)
|
|
}
|