From 128a3df07c794e6f4956c6667db4a136433b548a Mon Sep 17 00:00:00 2001 From: eson <9673575+githubcontent@user.noreply.gitee.com> Date: Tue, 1 Aug 2023 11:14:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 4 ++-- main.go | 23 +++++++++++++++-------- sm.go | 19 ++++++++++++------- start_test.go | 7 +++---- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index dfbb839..b63d143 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.20 require ( github.com/474420502/batchexecute v0.0.2 - github.com/474420502/execute v0.0.2 + github.com/474420502/execute v0.0.3 github.com/google/uuid v1.3.0 github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 diff --git a/go.sum b/go.sum index 5ae943c..724417b 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/474420502/batchexecute v0.0.2 h1:gTMyUh2x6HpV7/+nOLj4qFRO/l5/B/eg7dXsHxLJpbg= github.com/474420502/batchexecute v0.0.2/go.mod h1:IWazO1QTaB5LyWwMxSqIX/6g/UXwwpnqk0AVM5j24J0= -github.com/474420502/execute v0.0.2 h1:CEhTCBa7UemBWzbfQ6Szj0/GJfx3/JJA0s6r807NPtw= -github.com/474420502/execute v0.0.2/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= +github.com/474420502/execute v0.0.3 h1:kmVaUG/LQis0vXiLsr0WS0pCpHjtsqb1gy9fbuH8oTk= +github.com/474420502/execute v0.0.3/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/main.go b/main.go index ddb37e5..844bbdd 100644 --- a/main.go +++ b/main.go @@ -22,9 +22,12 @@ func main() { } -var BatchQueueExecute = triggered.NewEventTriggeredExecute[bool]() -var Consumption triggered.Event = BatchQueueExecute.RegisterExecute(func(params *triggered.Params[bool]) { - nh := params.Shared.(*dragonboat.NodeHost) +var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) { + var nh *dragonboat.NodeHost + params.Shared.Value(func(v any) { + nh = v.(*dragonboat.NodeHost) + }) + cs := nh.GetNoOPSession(128) for { @@ -64,7 +67,7 @@ var Consumption triggered.Event = BatchQueueExecute.RegisterExecute(func(params if err != nil { log.Println(err) } - log.Println(item) + // log.Println(item) } } @@ -114,10 +117,13 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr) // change the log verbosity + logger.GetLogger("dragonboat").SetLevel(logger.ERROR) logger.GetLogger("raft").SetLevel(logger.ERROR) - logger.GetLogger("rsm").SetLevel(logger.WARNING) - logger.GetLogger("transport").SetLevel(logger.WARNING) - logger.GetLogger("grpc").SetLevel(logger.WARNING) + logger.GetLogger("raftpb").SetLevel(logger.ERROR) + logger.GetLogger("logdb").SetLevel(logger.ERROR) + logger.GetLogger("rsm").SetLevel(logger.ERROR) + logger.GetLogger("transport").SetLevel(logger.ERROR) + logger.GetLogger("grpc").SetLevel(logger.ERROR) // config for raft node // See GoDoc for all available options rc := config.Config{ @@ -158,7 +164,8 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat panic(err) } - BatchQueueExecute.SetShared(Consumption, nh) + // 设置共享的参数 + Consumption.SetShared(nh) if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) diff --git a/sm.go b/sm.go index 05cea8a..fdeab75 100644 --- a/sm.go +++ b/sm.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/474420502/batchexecute" + "github.com/474420502/execute/triggered" sm "github.com/lni/dragonboat/v4/statemachine" ) @@ -24,6 +24,8 @@ type SMQueue struct { mu *sync.Mutex // 组名到队列的映射 queues map[string]*PriorityQueue[QueueItem] + + counter *triggered.EventExecute[int64] } type Command struct { @@ -79,12 +81,14 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { return &SMQueue{ shardID: shardID, replicaID: replicaID, - counter: batchexecute.NewBatchExecute[int64](func(params *int64) { - log.Printf("queue remain: %d\n", *params) - time.Sleep(time.Second * 5) - }), + mu: mu, queues: make(map[string]*PriorityQueue[QueueItem]), + + counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { + log.Printf("queue remain: %d\n", *params.Value) + time.Sleep(time.Second * 5) + }, nil), } } @@ -129,7 +133,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { Value: cmd.Item, }) result.Value = uint64(len(d)) - BatchQueueExecute.Notify(Consumption, nil) // 通知可以执行update + Consumption.Notify(nil) // 通知可以执行update return result, err case "dequeue": var queue *PriorityQueue[QueueItem] @@ -161,7 +165,8 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { result.Data = d queue.Empty() size := queue.Size() - go s.counter.Execute(&size) + + s.counter.Notify(&size) // log.Println("queue remain:", queue.Size()) } diff --git a/start_test.go b/start_test.go index ea90179..969e211 100644 --- a/start_test.go +++ b/start_test.go @@ -32,7 +32,7 @@ func TestStartNodeA(t *testing.T) { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. cs := nh.GetNoOPSession(128) - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(10 * time.Millisecond) for { select { case <-ticker.C: @@ -71,7 +71,6 @@ func TestStartNodeA(t *testing.T) { }) raftStopper.Wait() - } func TestStartNodeB(t *testing.T) { @@ -90,7 +89,7 @@ func TestStartNodeB(t *testing.T) { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. cs := nh.GetNoOPSession(128) - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(10 * time.Millisecond) for { select { case <-ticker.C: @@ -147,7 +146,7 @@ func TestStartNodeC(t *testing.T) { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. // cs := nh.GetNoOPSession(128) - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(2 * time.Second) for { select { case <-ticker.C: