添加详细的日志信息输出
This commit is contained in:
parent
bc96550885
commit
af8568c8cb
2
sm.go
2
sm.go
|
@ -37,12 +37,14 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
|
||||||
queues: make(map[string]*PriorityQueue[QueueItem]),
|
queues: make(map[string]*PriorityQueue[QueueItem]),
|
||||||
|
|
||||||
counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) {
|
counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) {
|
||||||
|
if params.Value != 0 {
|
||||||
var m runtime.MemStats
|
var m runtime.MemStats
|
||||||
runtime.ReadMemStats(&m)
|
runtime.ReadMemStats(&m)
|
||||||
allocMB := float64(m.Alloc) / 1024 / 1024
|
allocMB := float64(m.Alloc) / 1024 / 1024
|
||||||
// totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024
|
// totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024
|
||||||
sysMB := float64(m.Sys) / 1024 / 1024
|
sysMB := float64(m.Sys) / 1024 / 1024
|
||||||
log.Printf("队列堆积数据: %d, Alloc = %.2f MB, Sys = %.2fMB\n", params.Value, allocMB, sysMB)
|
log.Printf("队列堆积数据: %d, Alloc = %.2f MB, Sys = %.2fMB\n", params.Value, allocMB, sysMB)
|
||||||
|
}
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,8 +8,6 @@ import (
|
||||||
sm "github.com/lni/dragonboat/v4/statemachine"
|
sm "github.com/lni/dragonboat/v4/statemachine"
|
||||||
)
|
)
|
||||||
|
|
||||||
var allPutCount int = 0
|
|
||||||
|
|
||||||
// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler
|
// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler
|
||||||
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
|
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
|
||||||
|
|
||||||
|
@ -37,7 +35,9 @@ var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
|
||||||
Value: cmd.Item,
|
Value: cmd.Item,
|
||||||
})
|
})
|
||||||
|
|
||||||
allPutCount++
|
countPutPop.Use(func(item *CountPutPop) {
|
||||||
|
item.PutCount++
|
||||||
|
})
|
||||||
|
|
||||||
var result sm.Result
|
var result sm.Result
|
||||||
dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update
|
dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update
|
||||||
|
|
45
websocket.go
45
websocket.go
|
@ -7,8 +7,10 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/474420502/execute/triggered"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/lni/dragonboat/v4"
|
"github.com/lni/dragonboat/v4"
|
||||||
"github.com/lni/dragonboat/v4/client"
|
"github.com/lni/dragonboat/v4/client"
|
||||||
|
@ -30,9 +32,34 @@ func HttpListen(ns *dragonboat.NodeHost, port int) {
|
||||||
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
|
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func queueHandler(w http.ResponseWriter, r *http.Request) {
|
type CountPutPop struct {
|
||||||
|
PutCount uint64
|
||||||
|
PopCount uint64
|
||||||
|
SelfCount map[string]uint64
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
var count int
|
func (pp *CountPutPop) Use(do func(item *CountPutPop)) {
|
||||||
|
pp.mu.Lock()
|
||||||
|
defer pp.mu.Unlock()
|
||||||
|
do(pp)
|
||||||
|
}
|
||||||
|
|
||||||
|
var countPutPop *CountPutPop = &CountPutPop{
|
||||||
|
SelfCount: map[string]uint64{},
|
||||||
|
}
|
||||||
|
|
||||||
|
var logIntervalTimeHandler = triggered.RegisterExecute[*CountPutPop](func(params *triggered.Params[*CountPutPop]) {
|
||||||
|
params.Value.Use(func(item *CountPutPop) {
|
||||||
|
log.Printf("all pop: %d all put: %d\n", item.PutCount, item.PopCount)
|
||||||
|
for k, v := range item.SelfCount {
|
||||||
|
log.Printf("%s pop: %d\n", k, v)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
time.Sleep(time.Second * 15)
|
||||||
|
})
|
||||||
|
|
||||||
|
func queueHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
conn, err := upgrader.Upgrade(w, r, nil)
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
@ -49,6 +76,10 @@ func queueHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
log.Printf("%s 退出连接 当前unity处理机器数量 %d\n", raddr, DequeueHandler.RefCountAdd(-1))
|
log.Printf("%s 退出连接 当前unity处理机器数量 %d\n", raddr, DequeueHandler.RefCountAdd(-1))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
countPutPop.Use(func(item *CountPutPop) {
|
||||||
|
item.SelfCount[raddr.String()] = 0
|
||||||
|
})
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
item := <-PopChannel
|
item := <-PopChannel
|
||||||
|
@ -71,7 +102,7 @@ func queueHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = conn.SetWriteDeadline(time.Now().Add(time.Second * 4))
|
err = conn.SetWriteDeadline(time.Now().Add(time.Second * 3))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
log.Println("重新回队")
|
log.Println("重新回队")
|
||||||
|
@ -88,9 +119,11 @@ func queueHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
count++
|
countPutPop.Use(func(item *CountPutPop) {
|
||||||
log.Printf("count: %d all put: %d", count, allPutCount)
|
item.PopCount++
|
||||||
|
item.SelfCount[raddr.String()]++
|
||||||
|
})
|
||||||
|
logIntervalTimeHandler.Notify(countPutPop)
|
||||||
// 打印消息
|
// 打印消息
|
||||||
log.Printf("source: [%s] 数据 推送到unity [%s]\n", item.Source, raddr.String())
|
log.Printf("source: [%s] 数据 推送到unity [%s]\n", item.Source, raddr.String())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user