This commit is contained in:
eson 2023-08-07 19:03:22 +08:00
parent f9c68c748c
commit 8b0cdeabb3
7 changed files with 26 additions and 33 deletions

3
go.mod
View File

@ -21,7 +21,6 @@ require (
github.com/cockroachdb/redact v1.1.3 // indirect github.com/cockroachdb/redact v1.1.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/getsentry/sentry-go v0.12.0 // indirect github.com/getsentry/sentry-go v0.12.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.0 // indirect github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0 // indirect
@ -50,7 +49,6 @@ require (
github.com/valyala/histogram v1.2.0 // indirect github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/crypto v0.11.0 // indirect golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 // indirect golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 // indirect
google.golang.org/protobuf v1.26.0 // indirect
) )
require ( require (
@ -59,7 +57,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.13.5 // indirect github.com/klauspost/compress v1.13.5 // indirect
github.com/lni/dragonboat v2.1.7+incompatible
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
golang.org/x/net v0.12.0 // indirect golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect golang.org/x/sys v0.10.0 // indirect

4
go.sum
View File

@ -121,7 +121,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
@ -217,8 +216,6 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y= github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/lni/dragonboat v2.1.7+incompatible h1:U04ZmiQKXcdRWNE6jccSiYJK7k+ECwN7XIw1xpTiJeE=
github.com/lni/dragonboat v2.1.7+incompatible/go.mod h1:eI3naIUzh2+DKqtOFwdZ2t0Z/BjlGEjWZgS07x7x8oU=
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be h1:0K1suLIEoHH6f2OnWF9IIlVAjR8OhFt+VuC9ZKpEJCk= github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be h1:0K1suLIEoHH6f2OnWF9IIlVAjR8OhFt+VuC9ZKpEJCk=
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be/go.mod h1:DbE6sDHHvYPZvJPgP5K82+HHn6OMTgbAWy/ISa42VEk= github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be/go.mod h1:DbE6sDHHvYPZvJPgP5K82+HHn6OMTgbAWy/ISa42VEk=
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 h1:6gzI38ZJmbzZ7oZebXz6jII0uVK+MZ3+ds+7mIt1ioI= github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 h1:6gzI38ZJmbzZ7oZebXz6jII0uVK+MZ3+ds+7mIt1ioI=
@ -497,7 +494,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -34,8 +34,7 @@ var DequeueHandler = triggered.RegisterExecute(func(params *triggered.Params[boo
return return
} }
PopChannel <- []byte(item.Data.(string)) PopChannel <- item
} }
}) })
@ -134,7 +133,7 @@ func StartNode(cfg *ConfigServer) {
// 设置共享的参数 // 设置共享的参数
DequeueHandler.WithShared(nh) DequeueHandler.WithShared(nh)
if err := nh.StartReplica(initialMembers, true, NewSMQueue, rc); err != nil { if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
os.Exit(1) os.Exit(1)
} }

17
sm.go
View File

@ -2,6 +2,7 @@ package fusenrender
import ( import (
"context" "context"
"encoding/gob"
"fmt" "fmt"
"io" "io"
"runtime" "runtime"
@ -85,11 +86,11 @@ func (s *SMQueue) SaveSnapshot(w io.Writer,
// there is no external file in this IStateMachine example, we thus leave // there is no external file in this IStateMachine example, we thus leave
// the fc untouched // the fc untouched
// s.mu.Lock() s.mu.Lock()
// defer s.mu.Unlock() defer s.mu.Unlock()
// return gob.NewEncoder(w).Encode(&s.queues) return gob.NewEncoder(w).Encode(&s.queues)
return nil // return nil
} }
// RecoverFromSnapshot recovers the state using the provided snapshot. // RecoverFromSnapshot recovers the state using the provided snapshot.
@ -99,10 +100,10 @@ func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
// restore the Count variable, that is the only state we maintain in this // restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty // example, the input files is expected to be empty
// err := gob.NewDecoder(r).Decode(&s.queues) err := gob.NewDecoder(r).Decode(&s.queues)
// if err != nil { if err != nil {
// return err return err
// } }
return nil return nil
} }

View File

@ -72,10 +72,12 @@ var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
} }
e.Result.Data = d e.Result.Data = d
result.Data = d result.Data = d
queue.Empty() if !queue.Empty() {
size := queue.Size() size := queue.Size()
smqueue.counter.Notify(size) smqueue.counter.Notify(size)
}
// log.Println("queue remain:", queue.Size()) // log.Println("queue remain:", queue.Size())
} }

View File

@ -15,7 +15,7 @@ import (
) )
var UidCreater = NewUniqueId(1) var UidCreater = NewUniqueId(1)
var PopChannel chan []byte = make(chan []byte, 3) // chan *QueueItem = make(chan *QueueItem, 1) var PopChannel chan *QueueItem = make(chan *QueueItem, 3) // chan *QueueItem = make(chan *QueueItem, 1)
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 1024,
@ -53,24 +53,22 @@ func queueHandler(w http.ResponseWriter, r *http.Request) {
// return // return
// } // }
itemdata := <-PopChannel item := <-PopChannel
if item == nil {
continue
}
// 写回消息 // 写回消息
err = conn.WriteMessage(websocket.BinaryMessage, itemdata) err = conn.WriteMessage(websocket.BinaryMessage, []byte(item.Data.(string)))
if err != nil { if err != nil {
log.Println(err) log.Println(err)
var item QueueItem
err = item.Decode(itemdata)
if err != nil {
log.Println(err)
return
}
log.Println("重新回队") log.Println("重新回队")
stateClient.PushItem(nil, &item) stateClient.PushItem(nil, item)
return return
} }
// 打印消息 // 打印消息
fmt.Printf("%s len=%d sent: \n ", conn.RemoteAddr(), len(itemdata)) fmt.Printf("%s 处理完成. len\n", conn.RemoteAddr())
// 读取消息 // 读取消息
// msgType, msg, err := conn.ReadMessage() // msgType, msg, err := conn.ReadMessage()

View File

@ -86,7 +86,7 @@ func TestWebsocketCient(t *testing.T) {
} }
func TestCallback(t *testing.T) { func TestCallback(t *testing.T) {
http.HandleFunc("/api/websocket/render_notify", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/api/render/render_notify", func(w http.ResponseWriter, r *http.Request) {
data, _ := io.ReadAll(r.Body) data, _ := io.ReadAll(r.Body)
log.Println(string(data)) log.Println(string(data))
}) })