From 8b0cdeabb30faa1011b99c874396c7b8ded7cb6f Mon Sep 17 00:00:00 2001 From: eson <9673575+githubcontent@user.noreply.gitee.com> Date: Mon, 7 Aug 2023 19:03:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 3 --- go.sum | 4 ---- main.go | 5 ++--- sm.go | 17 +++++++++-------- sm_upate_handler.go | 8 +++++--- websocket.go | 20 +++++++++----------- websocket_test.go | 2 +- 7 files changed, 26 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index 9b1b465..15411a0 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/cockroachdb/redact v1.1.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/getsentry/sentry-go v0.12.0 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/uuid v1.3.0 // indirect @@ -50,7 +49,6 @@ require ( github.com/valyala/histogram v1.2.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 // indirect - google.golang.org/protobuf v1.26.0 // indirect ) require ( @@ -59,7 +57,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/compress v1.13.5 // indirect - github.com/lni/dragonboat v2.1.7+incompatible github.com/pkg/errors v0.9.1 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/go.sum b/go.sum index 4746832..716c885 100644 --- a/go.sum +++ b/go.sum @@ -121,7 +121,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -217,8 +216,6 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= -github.com/lni/dragonboat v2.1.7+incompatible h1:U04ZmiQKXcdRWNE6jccSiYJK7k+ECwN7XIw1xpTiJeE= -github.com/lni/dragonboat v2.1.7+incompatible/go.mod h1:eI3naIUzh2+DKqtOFwdZ2t0Z/BjlGEjWZgS07x7x8oU= github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be h1:0K1suLIEoHH6f2OnWF9IIlVAjR8OhFt+VuC9ZKpEJCk= github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be/go.mod h1:DbE6sDHHvYPZvJPgP5K82+HHn6OMTgbAWy/ISa42VEk= github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 h1:6gzI38ZJmbzZ7oZebXz6jII0uVK+MZ3+ds+7mIt1ioI= @@ -497,7 +494,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go index eb6e59d..285eae8 100644 --- a/main.go +++ b/main.go @@ -34,8 +34,7 @@ var DequeueHandler = triggered.RegisterExecute(func(params *triggered.Params[boo return } - PopChannel <- []byte(item.Data.(string)) - + PopChannel <- item } }) @@ -134,7 +133,7 @@ func StartNode(cfg *ConfigServer) { // 设置共享的参数 DequeueHandler.WithShared(nh) - if err := nh.StartReplica(initialMembers, true, NewSMQueue, rc); err != nil { + if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) os.Exit(1) } diff --git a/sm.go b/sm.go index 862a59a..2579091 100644 --- a/sm.go +++ b/sm.go @@ -2,6 +2,7 @@ package fusenrender import ( "context" + "encoding/gob" "fmt" "io" "runtime" @@ -85,11 +86,11 @@ func (s *SMQueue) SaveSnapshot(w io.Writer, // there is no external file in this IStateMachine example, we thus leave // the fc untouched - // s.mu.Lock() - // defer s.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() - // return gob.NewEncoder(w).Encode(&s.queues) - return nil + return gob.NewEncoder(w).Encode(&s.queues) + // return nil } // RecoverFromSnapshot recovers the state using the provided snapshot. @@ -99,10 +100,10 @@ func (s *SMQueue) RecoverFromSnapshot(r io.Reader, // restore the Count variable, that is the only state we maintain in this // example, the input files is expected to be empty - // err := gob.NewDecoder(r).Decode(&s.queues) - // if err != nil { - // return err - // } + err := gob.NewDecoder(r).Decode(&s.queues) + if err != nil { + return err + } return nil } diff --git a/sm_upate_handler.go b/sm_upate_handler.go index c13b28d..0850d19 100644 --- a/sm_upate_handler.go +++ b/sm_upate_handler.go @@ -72,10 +72,12 @@ var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { } e.Result.Data = d result.Data = d - queue.Empty() - size := queue.Size() + if !queue.Empty() { + size := queue.Size() + + smqueue.counter.Notify(size) + } - smqueue.counter.Notify(size) // log.Println("queue remain:", queue.Size()) } diff --git a/websocket.go b/websocket.go index c1a988f..c96f5ec 100644 --- a/websocket.go +++ b/websocket.go @@ -15,7 +15,7 @@ import ( ) var UidCreater = NewUniqueId(1) -var PopChannel chan []byte = make(chan []byte, 3) // chan *QueueItem = make(chan *QueueItem, 1) +var PopChannel chan *QueueItem = make(chan *QueueItem, 3) // chan *QueueItem = make(chan *QueueItem, 1) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, @@ -53,24 +53,22 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { // return // } - itemdata := <-PopChannel + item := <-PopChannel + + if item == nil { + continue + } // 写回消息 - err = conn.WriteMessage(websocket.BinaryMessage, itemdata) + err = conn.WriteMessage(websocket.BinaryMessage, []byte(item.Data.(string))) if err != nil { log.Println(err) - var item QueueItem - err = item.Decode(itemdata) - if err != nil { - log.Println(err) - return - } log.Println("重新回队") - stateClient.PushItem(nil, &item) + stateClient.PushItem(nil, item) return } // 打印消息 - fmt.Printf("%s len=%d sent: \n ", conn.RemoteAddr(), len(itemdata)) + fmt.Printf("%s 处理完成. len\n", conn.RemoteAddr()) // 读取消息 // msgType, msg, err := conn.ReadMessage() diff --git a/websocket_test.go b/websocket_test.go index 52ef977..b3bcd7a 100644 --- a/websocket_test.go +++ b/websocket_test.go @@ -86,7 +86,7 @@ func TestWebsocketCient(t *testing.T) { } func TestCallback(t *testing.T) { - http.HandleFunc("/api/websocket/render_notify", func(w http.ResponseWriter, r *http.Request) { + http.HandleFunc("/api/render/render_notify", func(w http.ResponseWriter, r *http.Request) { data, _ := io.ReadAll(r.Body) log.Println(string(data)) })