From c3dc022b721e6718934880f88ad1c5cf0c6e1b67 Mon Sep 17 00:00:00 2001 From: eson <9673575+githubcontent@user.noreply.gitee.com> Date: Fri, 4 Aug 2023 18:59:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E7=BE=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 16 ++++++---- go.sum | 25 ++++++++++++---- main.go | 4 ++- sm.go | 16 +++++----- websocket.go | 40 +++++++++++++++++++++---- websocket_test.go | 75 ++++++++++++++++++++++++++++++++++------------- 6 files changed, 130 insertions(+), 46 deletions(-) diff --git a/go.mod b/go.mod index c2bbb83..b471861 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,19 @@ module fusenrender go 1.20 require ( - github.com/474420502/execute v0.1.1 + github.com/474420502/execute v0.2.2 + github.com/474420502/passer v0.0.1 github.com/google/uuid v1.3.0 + github.com/gorilla/websocket v1.5.0 github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 ) require ( - github.com/474420502/passer v0.0.1 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/VictoriaMetrics/metrics v1.18.1 // indirect + github.com/andybalholm/brotli v1.0.5 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/cockroachdb/errors v1.9.0 // indirect github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect @@ -23,7 +25,6 @@ require ( github.com/getsentry/sentry-go v0.12.0 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.0.0 // indirect github.com/hashicorp/go-msgpack v0.5.3 // indirect @@ -38,19 +39,24 @@ require ( github.com/pierrec/lz4/v4 v4.1.14 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.8.1 // indirect + github.com/schollz/progressbar v1.0.0 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/stretchr/testify v1.8.3 // indirect + github.com/tidwall/gjson v1.12.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect golang.org/x/crypto v0.11.0 // indirect - golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect + golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 // indirect ) require ( + github.com/474420502/requests v1.41.0 github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.12.3 // indirect + github.com/klauspost/compress v1.13.5 // indirect github.com/pkg/errors v0.9.1 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/go.sum b/go.sum index 493e021..716c885 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,12 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/474420502/execute v0.1.1 h1:lMG/f/NOSScD10Yyqkazd2uAgW8Ogj0ZLG/Pm7lsYE8= -github.com/474420502/execute v0.1.1/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= +github.com/474420502/execute v0.2.2 h1:Hzzb/HFa/urRvi3xWe+p/DnJgRyxXFVyEBEXbbITy/E= +github.com/474420502/execute v0.2.2/go.mod h1:wkKeKBIXYp7T844eU1YS2nPvFj8lra4VRcQYnWyXej4= github.com/474420502/passer v0.0.1 h1:ZWnt7hpFzsYDV7LHSEyLvLUvW5mRxrnDmgFdIl17q3w= github.com/474420502/passer v0.0.1/go.mod h1:MmnnrF9d51sPkFzdRq2pQtxQKqyjburVM1LjMbOCezE= +github.com/474420502/random v0.4.1 h1:HUUyLXRWMijVb7CJoEC16f0aFQOW25Lkr80Mut6PoKU= +github.com/474420502/requests v1.41.0 h1:c5OfhVHCvETlwEhODO2OAdPOH7v//kXJkGGUTKOkLOU= +github.com/474420502/requests v1.41.0/go.mod h1:2SCVzim0ONFYG09g/GrM7RTeJIC6qTyZfnohsjnG5C8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -22,6 +25,8 @@ github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -65,6 +70,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/elazarl/goproxy v0.0.0-20210801061803-8e322dfb79c4 h1:lS3P5Nw3oPO05Lk2gFiYUOL3QPaH+fRoI1wFOc4G1UY= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -197,8 +203,8 @@ github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.9.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= -github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= +github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -275,6 +281,8 @@ github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4 github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= +github.com/schollz/progressbar v1.0.0 h1:gbyFReLHDkZo8mxy/dLWMr+Mpb1MokGJ1FqCiqacjZM= +github.com/schollz/progressbar v1.0.0/go.mod h1:/l9I7PC3L3erOuz54ghIRKUEFcosiWfLvJv+Eq26UMs= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= @@ -297,6 +305,12 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tidwall/gjson v1.12.0 h1:61wEp/qfvFnqKH/WCI3M8HuRut+mHT6Mr82QrFmM2SY= +github.com/tidwall/gjson v1.12.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= @@ -340,8 +354,9 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= -golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 h1:/yRP+0AN7mf5DkD3BAI6TOFnd51gEoDEb8o35jIFtgw= +golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/main.go b/main.go index dd36e64..bd05447 100644 --- a/main.go +++ b/main.go @@ -131,7 +131,7 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat HeartbeatRTT: 1, CheckQuorum: true, - SnapshotEntries: 100, + SnapshotEntries: 10, CompactionOverhead: 5, } @@ -159,6 +159,8 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat panic(err) } + // 把引用计数设置为0 + Consumption.RefCountAdd(-1) // 设置共享的参数 Consumption.WithShared(nh) diff --git a/sm.go b/sm.go index 476f8c2..7a03461 100644 --- a/sm.go +++ b/sm.go @@ -2,7 +2,6 @@ package fusenrender import ( "context" - "encoding/gob" "io" "log" "sync" @@ -152,10 +151,11 @@ func (s *SMQueue) SaveSnapshot(w io.Writer, // there is no external file in this IStateMachine example, we thus leave // the fc untouched - s.mu.Lock() - defer s.mu.Unlock() + // s.mu.Lock() + // defer s.mu.Unlock() - return gob.NewEncoder(w).Encode(&s.queues) + // return gob.NewEncoder(w).Encode(&s.queues) + return nil } // RecoverFromSnapshot recovers the state using the provided snapshot. @@ -165,10 +165,10 @@ func (s *SMQueue) RecoverFromSnapshot(r io.Reader, // restore the Count variable, that is the only state we maintain in this // example, the input files is expected to be empty - err := gob.NewDecoder(r).Decode(&s.queues) - if err != nil { - return err - } + // err := gob.NewDecoder(r).Decode(&s.queues) + // if err != nil { + // return err + // } return nil } diff --git a/websocket.go b/websocket.go index 219622c..b0a4bad 100644 --- a/websocket.go +++ b/websocket.go @@ -22,15 +22,42 @@ var upgrader = websocket.Upgrader{ } func queueHandler(w http.ResponseWriter, r *http.Request) { - conn, _ := upgrader.Upgrade(w, r, nil) var err error + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + defer conn.Close() + + log.Println("建立连接", Consumption.RefCountAdd(1)) + defer func() { + log.Println("退出连接", Consumption.RefCountAdd(-1)) + }() + for { + // msgType, msg, err := conn.ReadMessage() + + // if err != nil { + // log.Println(msgType, msg, err) + // return + // } + itemdata := <-PopChannel // 写回消息 err = conn.WriteMessage(websocket.BinaryMessage, itemdata) if err != nil { + log.Println(err) + var item QueueItem + err = item.Decode(itemdata) + if err != nil { + log.Println(err) + return + } + log.Println("重新回队") + PushItem(wns, &item) return } // 打印消息 @@ -38,9 +65,12 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { // 读取消息 // msgType, msg, err := conn.ReadMessage() + // if err != nil { + // log.Println(msgType, msg, err) // return // } + } } @@ -74,17 +104,15 @@ func queuePushHandler(w http.ResponseWriter, r *http.Request) { // 2. 定义结构体 - item := &QueueItem{ - Group: "test", - CreateAt: time.Now(), - Data: "saddas", - } + item := &QueueItem{} // 3. 解析JSON到结构体 err = json.Unmarshal(body, item) if err != nil { panic(err) } + + PushItem(wns, item) } var wns *dragonboat.NodeHost diff --git a/websocket_test.go b/websocket_test.go index 75ec059..656a053 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -3,11 +3,13 @@ package fusenrender_test import ( "fmt" "fusenrender" + "io" "log" - "net/url" + "net/http" "testing" + "time" - "github.com/gorilla/websocket" + "github.com/474420502/requests" ) func TestWebsocketA(t *testing.T) { @@ -16,8 +18,8 @@ func TestWebsocketA(t *testing.T) { panic(err) } - fusenrender.StartNode(svc.ServerID, 128, svc.Address()) - fusenrender.HttpStart(svc.Port - 1000) + nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + fusenrender.HttpStart(nh, svc.Port-1000) } func TestWebsocketB(t *testing.T) { @@ -26,8 +28,8 @@ func TestWebsocketB(t *testing.T) { panic(err) } - fusenrender.StartNode(svc.ServerID, 128, svc.Address()) - fusenrender.HttpStart(svc.Port - 1000) + nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + fusenrender.HttpStart(nh, svc.Port-1000) } func TestWebsocketC(t *testing.T) { @@ -36,26 +38,57 @@ func TestWebsocketC(t *testing.T) { panic(err) } - fusenrender.StartNode(svc.ServerID, 128, svc.Address()) - fusenrender.HttpStart(svc.Port - 1000) + nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + fusenrender.HttpStart(nh, svc.Port-1000) } -var addr = "localhost:4051" +var addr = "localhost:4052" func TestWebsocketCient(t *testing.T) { - u := url.URL{Scheme: "ws", Host: addr, Path: "/ws"} - conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) - log.Println(err) - defer conn.Close() + // u := url.URL{Scheme: "ws", Host: addr, Path: "/ws/pop/queue"} + // conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + // log.Println(err) + // defer conn.Close() - for { - err = conn.WriteMessage(websocket.BinaryMessage, []byte("123")) - if err != nil { - return + go func() { + C := time.NewTicker(time.Second * 5) + for range C.C { + item := fusenrender.QueueItem{ + Group: "test", + Wid: fusenrender.UidCreater.Get(), + CreateAt: time.Now(), + } + r, err := requests.Post(fmt.Sprintf("http://%s/api/queue/push", addr)).SetBodyJson(item).Execute() + log.Println(r, err) } + }() - mt, msg, err := conn.ReadMessage() - fmt.Sprintln(mt, err) - fmt.Printf("Received: %s\n", string(msg)) - } + select {} + + // for { + + // // err = conn.WriteMessage(websocket.PingMessage, nil) + // // log.Println("write ping") + // // if err != nil { + // // log.Println(err) + // // return + // // } + + // mt, msg, err := conn.ReadMessage() + // fmt.Println(mt, err) + // if mt == websocket.CloseMessage { + // return + // } + // fmt.Printf("Received: %s\n", string(msg)) + + // } +} + +func TestCallback(t *testing.T) { + http.HandleFunc("/api/websocket/render_notify", func(w http.ResponseWriter, r *http.Request) { + data, _ := io.ReadAll(r.Body) + log.Println(string(data)) + }) + log.Println(fmt.Sprintf(":%d", 4444)) + http.ListenAndServe(fmt.Sprintf(":%d", 4444), nil) }