修复到合理

This commit is contained in:
eson 2023-08-06 04:26:57 +08:00
parent 354824bc68
commit 4195de3818
8 changed files with 178 additions and 147 deletions

7
go.mod
View File

@ -5,10 +5,8 @@ go 1.20
require (
github.com/474420502/execute v0.2.2
github.com/474420502/passer v0.0.1
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4
)
require (
@ -23,8 +21,10 @@ require (
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/getsentry/sentry-go v0.12.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.5.3 // indirect
@ -34,6 +34,7 @@ require (
github.com/hashicorp/memberlist v0.3.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 // indirect
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 // indirect
github.com/miekg/dns v1.1.26 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
@ -49,6 +50,7 @@ require (
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 // indirect
google.golang.org/protobuf v1.26.0 // indirect
)
require (
@ -57,6 +59,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.13.5 // indirect
github.com/lni/dragonboat v2.1.7+incompatible
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect

4
go.sum
View File

@ -121,6 +121,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
@ -216,6 +217,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/lni/dragonboat v2.1.7+incompatible h1:U04ZmiQKXcdRWNE6jccSiYJK7k+ECwN7XIw1xpTiJeE=
github.com/lni/dragonboat v2.1.7+incompatible/go.mod h1:eI3naIUzh2+DKqtOFwdZ2t0Z/BjlGEjWZgS07x7x8oU=
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be h1:0K1suLIEoHH6f2OnWF9IIlVAjR8OhFt+VuC9ZKpEJCk=
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be/go.mod h1:DbE6sDHHvYPZvJPgP5K82+HHn6OMTgbAWy/ISa42VEk=
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 h1:6gzI38ZJmbzZ7oZebXz6jII0uVK+MZ3+ds+7mIt1ioI=
@ -494,6 +497,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

60
main.go
View File

@ -1,7 +1,6 @@
package fusenrender
import (
"context"
"flag"
"fmt"
"log"
@ -10,7 +9,6 @@ import (
"path/filepath"
"runtime"
"syscall"
"time"
"github.com/474420502/execute/triggered"
"github.com/lni/dragonboat/v4"
@ -18,66 +16,39 @@ import (
"github.com/lni/dragonboat/v4/logger"
)
func main() {
}
var shardID uint64 = 128
var DequeueHandler = triggered.RegisterExecute(func(params *triggered.Params[bool]) {
var nh *dragonboat.NodeHost
params.Shared.Value(func(v any) {
nh = v.(*dragonboat.NodeHost)
})
cs := nh.GetNoOPSession(128)
cs := stateClient.GetNoOPSession()
for i := 0; ; i++ {
cmd := &CmdDequeue{Command{Group: "test"}}
data, err := FsPasser.PackToBytes(cmd)
if err != nil {
log.Println(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
result, err := nh.SyncPropose(ctx, cs, data)
cancel()
item, err := stateClient.PopItem(cs, "test")
if err != nil {
log.Println(err)
break
} else {
if len(result.Data) == 0 {
// log.Println("wait 10 second")
var m runtime.MemStats
runtime.ReadMemStats(&m)
allocMB := float64(m.Alloc) / 1024 / 1024
totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024
sysMB := float64(m.Sys) / 1024 / 1024
fmt.Printf("dequeue count %d, Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", i, allocMB, totalAllocMB, sysMB)
break
} else {
var item QueueItem
err := item.Decode(result.Data)
if err != nil {
log.Println(err)
}
// log.Println(item)
if item == nil {
return
}
PopChannel <- []byte(item.Data.(string))
// log.Println(item)
}
}
}
})
})
var addresses []string = []string{
"localhost:5500",
"localhost:5501",
"localhost:5502",
}
func StartNode(cfg *ConfigServer) *dragonboat.NodeHost {
var exampleShardID uint64 = 128
func StartNode(cfg *ConfigServer) {
replicaID := cfg.ServerID
addr := cfg.Address()
// addr := "localhost"
@ -127,7 +98,7 @@ func StartNode(cfg *ConfigServer) *dragonboat.NodeHost {
rc := config.Config{
// ShardID and ReplicaID of the raft node
ReplicaID: uint64(replicaID),
ShardID: exampleShardID,
ShardID: shardID,
ElectionRTT: 10,
@ -162,6 +133,8 @@ func StartNode(cfg *ConfigServer) *dragonboat.NodeHost {
panic(err)
}
stateClient = &StateClient{nh: nh}
// 把引用计数设置为0
DequeueHandler.RefCountAdd(-1)
// 设置共享的参数
@ -172,8 +145,5 @@ func StartNode(cfg *ConfigServer) *dragonboat.NodeHost {
os.Exit(1)
}
wsPort := cfg.Port - 1000
HttpStart(nh, wsPort)
return nh
HttpListen(nh, cfg.Port-1000)
}

View File

@ -3,5 +3,5 @@ package fusenrender
import "testing"
func TestMain(t *testing.T) {
main()
}

89
sm.go
View File

@ -2,93 +2,16 @@ package fusenrender
import (
"context"
"fmt"
"io"
"log"
"runtime"
"sync"
"time"
"github.com/474420502/execute/triggered"
"github.com/474420502/passer"
sm "github.com/lni/dragonboat/v4/statemachine"
)
// 结构体异步传递后, 执行的注册函数
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
fsPasser := passer.NewPasser[sm.Result]()
fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var consumer = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool])
cmd := obj.(*CmdEnqueue)
key := cmd.Item.GetKey()
smqueue.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
queue.Push(&Slice[QueueItem]{
Key: key,
Value: cmd.Item,
})
var result sm.Result
consumer.Notify(consumer.NULL) // 通知可以执行update
return result, nil
})
fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var e = cxt.Value(ctxEntry{}).(*sm.Entry)
var queue *PriorityQueue[QueueItem]
var ok bool
cmd := obj.(*CmdDequeue)
smqueue.mu.Lock()
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
var result sm.Result
if queue.Empty() {
return result, nil
}
item := queue.Pop()
if item == nil {
result.Value = 0
return result, nil
}
if item != nil {
d, err := item.Encode()
if err != nil {
return result, err
}
e.Result.Data = d
result.Data = d
queue.Empty()
size := queue.Size()
smqueue.counter.Notify(size)
// log.Println("queue remain:", queue.Size())
}
return result, nil
})
return fsPasser
}()
// SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列
type SMQueue struct {
// 所属的Shard ID
@ -116,7 +39,13 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) {
if params.Value != 0 {
log.Printf("queue remain: %d\n", params.Value)
var m runtime.MemStats
runtime.ReadMemStats(&m)
allocMB := float64(m.Alloc) / 1024 / 1024
totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024
sysMB := float64(m.Sys) / 1024 / 1024
fmt.Printf("queue remain: %d, Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", params.Value, allocMB, totalAllocMB, sysMB)
time.Sleep(time.Second * 5)
} else {
time.Sleep(time.Second * 15)

86
sm_upate_handler.go Normal file
View File

@ -0,0 +1,86 @@
package fusenrender
import (
"context"
"github.com/474420502/execute/triggered"
"github.com/474420502/passer"
sm "github.com/lni/dragonboat/v4/statemachine"
)
// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
fsPasser := passer.NewPasser[sm.Result]()
fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var dequeueHandler = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool])
cmd := obj.(*CmdEnqueue)
key := cmd.Item.GetKey()
smqueue.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
queue.Push(&Slice[QueueItem]{
Key: key,
Value: cmd.Item,
})
var result sm.Result
dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update
return result, nil
})
fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var e = cxt.Value(ctxEntry{}).(*sm.Entry)
var queue *PriorityQueue[QueueItem]
var ok bool
cmd := obj.(*CmdDequeue)
smqueue.mu.Lock()
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
var result sm.Result
if queue.Empty() {
return result, nil
}
item := queue.Pop()
if item == nil {
result.Value = 0
return result, nil
}
if item != nil {
d, err := item.Encode()
if err != nil {
return result, err
}
e.Result.Data = d
result.Data = d
queue.Empty()
size := queue.Size()
smqueue.counter.Notify(size)
// log.Println("queue remain:", queue.Size())
}
return result, nil
})
return fsPasser
}()

View File

@ -11,6 +11,7 @@ import (
"github.com/gorilla/websocket"
"github.com/lni/dragonboat/v4"
"github.com/lni/dragonboat/v4/client"
)
var UidCreater = NewUniqueId(1)
@ -21,7 +22,7 @@ var upgrader = websocket.Upgrader{
WriteBufferSize: 1024,
}
func HttpStart(ns *dragonboat.NodeHost, port int) {
func HttpListen(ns *dragonboat.NodeHost, port int) {
http.HandleFunc("/api/queue/push", pushRenderTaskHandler)
http.HandleFunc("/ws/pop/queue", queueHandler)
http.HandleFunc("/ws/callback", callbackHandler)
@ -65,7 +66,7 @@ func queueHandler(w http.ResponseWriter, r *http.Request) {
return
}
log.Println("重新回队")
stateClient.PushItem(&item)
stateClient.PushItem(nil, &item)
return
}
// 打印消息
@ -120,7 +121,7 @@ func pushRenderTaskHandler(w http.ResponseWriter, r *http.Request) {
panic(err)
}
stateClient.PushItem(&item)
stateClient.PushItem(nil, &item)
}
type StateClient struct {
@ -129,8 +130,15 @@ type StateClient struct {
var stateClient *StateClient
func (cli *StateClient) PushItem(item *QueueItem) {
cs := cli.nh.GetNoOPSession(128)
func (cli *StateClient) GetNoOPSession() *client.Session {
return cli.nh.GetNoOPSession(shardID)
}
func (cli *StateClient) PushItem(cs *client.Session, item *QueueItem) {
if cs == nil {
cs = cli.GetNoOPSession()
}
cmd := &CmdEnqueue{Command: Command{Group: "test"}}
cmd.Item = item
data, err := FsPasser.PackToBytes(cmd)
@ -148,3 +156,34 @@ func (cli *StateClient) PushItem(item *QueueItem) {
// log.Println("enqueue", len(result.Data))
cancel()
}
func (cli *StateClient) PopItem(cs *client.Session, group string) (*QueueItem, error) {
if cs == nil {
cs = cli.GetNoOPSession()
}
cmd := &CmdDequeue{Command{Group: "test"}}
data, err := FsPasser.PackToBytes(cmd)
if err != nil {
log.Println(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
result, err := cli.nh.SyncPropose(ctx, cs, data)
cancel()
if err != nil {
return nil, err
}
if len(result.Data) == 0 {
return nil, nil
}
var item QueueItem
err = item.Decode(result.Data)
if err != nil {
log.Println(err)
}
return &item, nil
}

View File

@ -18,8 +18,8 @@ func TestWebsocketA(t *testing.T) {
panic(err)
}
nh := fusenrender.StartNode(svc)
fusenrender.HttpStart(nh, svc.Port-1000)
fusenrender.StartNode(svc)
}
func TestWebsocketB(t *testing.T) {
@ -28,8 +28,8 @@ func TestWebsocketB(t *testing.T) {
panic(err)
}
nh := fusenrender.StartNode(svc)
fusenrender.HttpStart(nh, svc.Port-1000)
fusenrender.StartNode(svc)
}
func TestWebsocketC(t *testing.T) {
@ -38,8 +38,8 @@ func TestWebsocketC(t *testing.T) {
panic(err)
}
nh := fusenrender.StartNode(svc)
fusenrender.HttpStart(nh, svc.Port-1000)
fusenrender.StartNode(svc)
}
var addr = "localhost:4052"