新版本
This commit is contained in:
parent
fe90dab5a7
commit
128a3df07c
2
go.mod
2
go.mod
|
@ -4,7 +4,7 @@ go 1.20
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/474420502/batchexecute v0.0.2
|
github.com/474420502/batchexecute v0.0.2
|
||||||
github.com/474420502/execute v0.0.2
|
github.com/474420502/execute v0.0.3
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be
|
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be
|
||||||
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4
|
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -2,8 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
github.com/474420502/batchexecute v0.0.2 h1:gTMyUh2x6HpV7/+nOLj4qFRO/l5/B/eg7dXsHxLJpbg=
|
github.com/474420502/batchexecute v0.0.2 h1:gTMyUh2x6HpV7/+nOLj4qFRO/l5/B/eg7dXsHxLJpbg=
|
||||||
github.com/474420502/batchexecute v0.0.2/go.mod h1:IWazO1QTaB5LyWwMxSqIX/6g/UXwwpnqk0AVM5j24J0=
|
github.com/474420502/batchexecute v0.0.2/go.mod h1:IWazO1QTaB5LyWwMxSqIX/6g/UXwwpnqk0AVM5j24J0=
|
||||||
github.com/474420502/execute v0.0.2 h1:CEhTCBa7UemBWzbfQ6Szj0/GJfx3/JJA0s6r807NPtw=
|
github.com/474420502/execute v0.0.3 h1:kmVaUG/LQis0vXiLsr0WS0pCpHjtsqb1gy9fbuH8oTk=
|
||||||
github.com/474420502/execute v0.0.2/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc=
|
github.com/474420502/execute v0.0.3/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc=
|
||||||
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||||
|
|
23
main.go
23
main.go
|
@ -22,9 +22,12 @@ func main() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var BatchQueueExecute = triggered.NewEventTriggeredExecute[bool]()
|
var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) {
|
||||||
var Consumption triggered.Event = BatchQueueExecute.RegisterExecute(func(params *triggered.Params[bool]) {
|
var nh *dragonboat.NodeHost
|
||||||
nh := params.Shared.(*dragonboat.NodeHost)
|
params.Shared.Value(func(v any) {
|
||||||
|
nh = v.(*dragonboat.NodeHost)
|
||||||
|
})
|
||||||
|
|
||||||
cs := nh.GetNoOPSession(128)
|
cs := nh.GetNoOPSession(128)
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
@ -64,7 +67,7 @@ var Consumption triggered.Event = BatchQueueExecute.RegisterExecute(func(params
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
log.Println(item)
|
// log.Println(item)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,10 +117,13 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat
|
||||||
|
|
||||||
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
|
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
|
||||||
// change the log verbosity
|
// change the log verbosity
|
||||||
|
logger.GetLogger("dragonboat").SetLevel(logger.ERROR)
|
||||||
logger.GetLogger("raft").SetLevel(logger.ERROR)
|
logger.GetLogger("raft").SetLevel(logger.ERROR)
|
||||||
logger.GetLogger("rsm").SetLevel(logger.WARNING)
|
logger.GetLogger("raftpb").SetLevel(logger.ERROR)
|
||||||
logger.GetLogger("transport").SetLevel(logger.WARNING)
|
logger.GetLogger("logdb").SetLevel(logger.ERROR)
|
||||||
logger.GetLogger("grpc").SetLevel(logger.WARNING)
|
logger.GetLogger("rsm").SetLevel(logger.ERROR)
|
||||||
|
logger.GetLogger("transport").SetLevel(logger.ERROR)
|
||||||
|
logger.GetLogger("grpc").SetLevel(logger.ERROR)
|
||||||
// config for raft node
|
// config for raft node
|
||||||
// See GoDoc for all available options
|
// See GoDoc for all available options
|
||||||
rc := config.Config{
|
rc := config.Config{
|
||||||
|
@ -158,7 +164,8 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
BatchQueueExecute.SetShared(Consumption, nh)
|
// 设置共享的参数
|
||||||
|
Consumption.SetShared(nh)
|
||||||
|
|
||||||
if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil {
|
if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
|
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
|
||||||
|
|
19
sm.go
19
sm.go
|
@ -9,7 +9,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/474420502/batchexecute"
|
"github.com/474420502/execute/triggered"
|
||||||
sm "github.com/lni/dragonboat/v4/statemachine"
|
sm "github.com/lni/dragonboat/v4/statemachine"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,6 +24,8 @@ type SMQueue struct {
|
||||||
mu *sync.Mutex
|
mu *sync.Mutex
|
||||||
// 组名到队列的映射
|
// 组名到队列的映射
|
||||||
queues map[string]*PriorityQueue[QueueItem]
|
queues map[string]*PriorityQueue[QueueItem]
|
||||||
|
|
||||||
|
counter *triggered.EventExecute[int64]
|
||||||
}
|
}
|
||||||
|
|
||||||
type Command struct {
|
type Command struct {
|
||||||
|
@ -79,12 +81,14 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
|
||||||
return &SMQueue{
|
return &SMQueue{
|
||||||
shardID: shardID,
|
shardID: shardID,
|
||||||
replicaID: replicaID,
|
replicaID: replicaID,
|
||||||
counter: batchexecute.NewBatchExecute[int64](func(params *int64) {
|
|
||||||
log.Printf("queue remain: %d\n", *params)
|
|
||||||
time.Sleep(time.Second * 5)
|
|
||||||
}),
|
|
||||||
mu: mu,
|
mu: mu,
|
||||||
queues: make(map[string]*PriorityQueue[QueueItem]),
|
queues: make(map[string]*PriorityQueue[QueueItem]),
|
||||||
|
|
||||||
|
counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) {
|
||||||
|
log.Printf("queue remain: %d\n", *params.Value)
|
||||||
|
time.Sleep(time.Second * 5)
|
||||||
|
}, nil),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,7 +133,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
|
||||||
Value: cmd.Item,
|
Value: cmd.Item,
|
||||||
})
|
})
|
||||||
result.Value = uint64(len(d))
|
result.Value = uint64(len(d))
|
||||||
BatchQueueExecute.Notify(Consumption, nil) // 通知可以执行update
|
Consumption.Notify(nil) // 通知可以执行update
|
||||||
return result, err
|
return result, err
|
||||||
case "dequeue":
|
case "dequeue":
|
||||||
var queue *PriorityQueue[QueueItem]
|
var queue *PriorityQueue[QueueItem]
|
||||||
|
@ -161,7 +165,8 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
|
||||||
result.Data = d
|
result.Data = d
|
||||||
queue.Empty()
|
queue.Empty()
|
||||||
size := queue.Size()
|
size := queue.Size()
|
||||||
go s.counter.Execute(&size)
|
|
||||||
|
s.counter.Notify(&size)
|
||||||
// log.Println("queue remain:", queue.Size())
|
// log.Println("queue remain:", queue.Size())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestStartNodeA(t *testing.T) {
|
||||||
// this goroutine makes a linearizable read every 10 second. it returns the
|
// this goroutine makes a linearizable read every 10 second. it returns the
|
||||||
// Count value maintained in IStateMachine. see datastore.go for details.
|
// Count value maintained in IStateMachine. see datastore.go for details.
|
||||||
cs := nh.GetNoOPSession(128)
|
cs := nh.GetNoOPSession(128)
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
ticker := time.NewTicker(10 * time.Millisecond)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -71,7 +71,6 @@ func TestStartNodeA(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
raftStopper.Wait()
|
raftStopper.Wait()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStartNodeB(t *testing.T) {
|
func TestStartNodeB(t *testing.T) {
|
||||||
|
@ -90,7 +89,7 @@ func TestStartNodeB(t *testing.T) {
|
||||||
// this goroutine makes a linearizable read every 10 second. it returns the
|
// this goroutine makes a linearizable read every 10 second. it returns the
|
||||||
// Count value maintained in IStateMachine. see datastore.go for details.
|
// Count value maintained in IStateMachine. see datastore.go for details.
|
||||||
cs := nh.GetNoOPSession(128)
|
cs := nh.GetNoOPSession(128)
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
ticker := time.NewTicker(10 * time.Millisecond)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -147,7 +146,7 @@ func TestStartNodeC(t *testing.T) {
|
||||||
// this goroutine makes a linearizable read every 10 second. it returns the
|
// this goroutine makes a linearizable read every 10 second. it returns the
|
||||||
// Count value maintained in IStateMachine. see datastore.go for details.
|
// Count value maintained in IStateMachine. see datastore.go for details.
|
||||||
// cs := nh.GetNoOPSession(128)
|
// cs := nh.GetNoOPSession(128)
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(2 * time.Second)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user