更新最新版本

This commit is contained in:
eson 2023-08-02 18:16:05 +08:00
parent da8af186c8
commit 1af5dba6e5
6 changed files with 173 additions and 141 deletions

66
command.go Normal file
View File

@ -0,0 +1,66 @@
package fusenrender
import (
"encoding/json"
"fmt"
"time"
)
type CmdEnqueue struct {
Command
Item *QueueItem `json:"item"`
}
type CmdDequeue struct {
Command
}
type Command struct {
Group string `json:"group"`
}
// func (cmd *Command) Encode() ([]byte, error) {
// val, err := json.Marshal(cmd)
// if err != nil {
// return nil, err
// }
// return val, nil
// }
// func (cmd *Command) Decode(data []byte) error {
// err := json.Unmarshal(data, cmd)
// return err
// }
// QueueItem表示队列中的一个项
type QueueItem struct {
// 队列组名
Group string `json:"group"` // 组名
// 优先级
Priority uint32 `json:"priority"` // 处理的优先级
// 创建时间
CreateAt time.Time `json:"create_at"` // 创建时间 统一utc
// 要排队的数据
Data any `json:"data"` // 操作的数据结构
}
func (item *QueueItem) Encode() ([]byte, error) {
val, err := json.Marshal(item)
if err != nil {
return nil, err
}
return val, nil
}
func (item *QueueItem) Decode(buf []byte) error {
err := json.Unmarshal(buf, item)
if err != nil {
return err
}
return nil
}
func (item *QueueItem) GetKey() []byte {
return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
}

1
go.mod
View File

@ -10,6 +10,7 @@ require (
)
require (
github.com/474420502/passer v0.0.1 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/VictoriaMetrics/metrics v1.18.1 // indirect

2
go.sum
View File

@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/474420502/execute v0.1.1 h1:lMG/f/NOSScD10Yyqkazd2uAgW8Ogj0ZLG/Pm7lsYE8=
github.com/474420502/execute v0.1.1/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc=
github.com/474420502/passer v0.0.1 h1:ZWnt7hpFzsYDV7LHSEyLvLUvW5mRxrnDmgFdIl17q3w=
github.com/474420502/passer v0.0.1/go.mod h1:MmnnrF9d51sPkFzdRq2pQtxQKqyjburVM1LjMbOCezE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=

19
main.go
View File

@ -22,21 +22,22 @@ func main() {
}
func init() {
}
var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) {
var nh *dragonboat.NodeHost
params.Shared.Value(func(v any) {
nh = v.(*dragonboat.NodeHost)
})
cs := nh.GetNoOPSession(128)
for {
for i := 0; ; i++ {
cmd := Command{
Name: "dequeue",
Group: "test",
}
data, err := cmd.Encode()
cmd := &CmdDequeue{Command: Command{Group: "test"}}
data, err := FsPasser.PackToBytes(cmd)
if err != nil {
log.Println(err)
}
@ -49,7 +50,7 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool])
break
} else {
if len(result.Data) == 0 {
log.Println("wait 10 second")
// log.Println("wait 10 second")
var m runtime.MemStats
runtime.ReadMemStats(&m)
@ -58,7 +59,7 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool])
totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024
sysMB := float64(m.Sys) / 1024 / 1024
fmt.Printf("Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", allocMB, totalAllocMB, sysMB)
fmt.Printf("dequeue count %d, Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", i, allocMB, totalAllocMB, sysMB)
time.Sleep(time.Second * 10)
break
} else {

207
sm.go
View File

@ -1,18 +1,94 @@
package fusenrender
import (
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"log"
"sync"
"time"
"github.com/474420502/execute/triggered"
"github.com/474420502/passer"
sm "github.com/lni/dragonboat/v4/statemachine"
)
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
fsPasser := passer.NewPasser[sm.Result]()
fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var consumer = cxt.Value(ctxConsumption{}).(*triggered.EventExecute[bool])
cmd := obj.(*CmdEnqueue)
key := cmd.Item.GetKey()
smqueue.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
queue.Push(&Slice[QueueItem]{
Key: key,
Value: cmd.Item,
})
var result sm.Result
consumer.Notify(consumer.NULL) // 通知可以执行update
return result, nil
})
fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) {
var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue)
var e = cxt.Value(ctxEntry{}).(*sm.Entry)
var queue *PriorityQueue[QueueItem]
var ok bool
cmd := obj.(*CmdDequeue)
smqueue.mu.Lock()
if queue, ok = smqueue.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
smqueue.queues[cmd.Group] = queue
}
smqueue.mu.Unlock()
var result sm.Result
if queue.Empty() {
return result, nil
}
item := queue.Pop()
if item == nil {
result.Value = 0
return result, nil
}
if item != nil {
d, err := item.Encode()
if err != nil {
return result, err
}
e.Result.Data = d
result.Data = d
queue.Empty()
size := queue.Size()
smqueue.counter.Notify(size)
// log.Println("queue remain:", queue.Size())
}
return result, nil
})
return fsPasser
}()
// SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列
type SMQueue struct {
// 所属的Shard ID
@ -28,53 +104,6 @@ type SMQueue struct {
counter *triggered.EventExecute[int64]
}
type Command struct {
Name string `json:"name"`
Group string `json:"group"`
Item *QueueItem `json:"item"`
}
func (cmd *Command) Encode() ([]byte, error) {
val, err := json.Marshal(cmd)
if err != nil {
return nil, err
}
return val, nil
}
// QueueItem表示队列中的一个项
type QueueItem struct {
// 队列组名
Group string `json:"group"` // 组名
// 优先级
Priority uint32 `json:"priority"` // 处理的优先级
// 创建时间
CreateAt time.Time `json:"create_at"` // 创建时间 统一utc
// 要排队的数据
Data any `json:"data"` // 操作的数据结构
}
func (item *QueueItem) Encode() ([]byte, error) {
val, err := json.Marshal(item)
if err != nil {
return nil, err
}
return val, nil
}
func (item *QueueItem) Decode(buf []byte) error {
err := json.Unmarshal(buf, item)
if err != nil {
return err
}
return nil
}
func (item *QueueItem) GetKey() []byte {
return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
}
// // NewSMQueue creates and return a new ExampleStateMachine object.
func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
mu := &sync.Mutex{}
@ -100,81 +129,19 @@ func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) {
return item, nil
}
type ctxEntry struct{}
type ctxSMQueue struct{}
type ctxConsumption struct{}
// Update处理Entry中的更新命令
// Update updates the object using the specified committed raft entry.
func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
ctx := context.TODO()
var cmd Command
err = json.Unmarshal(e.Cmd, &cmd)
if err != nil {
return result, err
}
switch cmd.Name {
case "enqueue":
d, err := cmd.Item.Encode()
if err != nil {
return result, err
}
key := cmd.Item.GetKey()
s.mu.Lock()
var queue *PriorityQueue[QueueItem]
var ok bool
if queue, ok = s.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
s.queues[cmd.Group] = queue
}
s.mu.Unlock()
queue.Push(&Slice[QueueItem]{
Key: key,
Value: cmd.Item,
})
result.Value = uint64(len(d))
Consumption.Notify(Consumption.NULL) // 通知可以执行update
return result, err
case "dequeue":
var queue *PriorityQueue[QueueItem]
var ok bool
s.mu.Lock()
if queue, ok = s.queues[cmd.Group]; !ok {
queue = NewPriorityQueue[QueueItem]()
s.queues[cmd.Group] = queue
}
s.mu.Unlock()
if queue.Empty() {
return result, nil
}
item := queue.Pop()
if item == nil {
result.Value = 0
return result, nil
}
if item != nil {
d, err := item.Encode()
if err != nil {
return result, err
}
e.Result.Data = d
result.Data = d
queue.Empty()
size := queue.Size()
s.counter.Notify(size)
// log.Println("queue remain:", queue.Size())
}
return result, err
default:
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name)
}
ctx = context.WithValue(ctx, ctxEntry{}, &e)
ctx = context.WithValue(ctx, ctxSMQueue{}, s)
ctx = context.WithValue(ctx, ctxConsumption{}, Consumption)
return FsPasser.ExecuteWithBytes(ctx, e.Cmd)
}
// SaveSnapshot saves the current IStateMachine state into a snapshot using the

View File

@ -44,13 +44,10 @@ func TestStartNodeA(t *testing.T) {
Data: uuid.New().String(),
}
cmd := fusenrender.Command{
Name: "enqueue",
Group: "test",
Item: item,
}
cmd := &fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}}
cmd.Item = item
data, err := fusenrender.FsPasser.PackToBytes(cmd)
data, err := cmd.Encode()
if err != nil {
log.Println(err)
}
@ -101,13 +98,11 @@ func TestStartNodeB(t *testing.T) {
CreateAt: time.Now(),
Data: uuid.New().String(),
}
cmd := fusenrender.Command{
Name: "enqueue",
Group: "test",
Item: item,
}
data, err := cmd.Encode()
cmd := fusenrender.CmdEnqueue{Command: fusenrender.Command{Group: "test"}}
cmd.Item = item
data, err := fusenrender.FsPasser.PackToBytes(&cmd)
if err != nil {
log.Println(err)
}