package fusenrender import ( "context" "github.com/474420502/execute/triggered" "github.com/474420502/passer" sm "github.com/lni/dragonboat/v4/statemachine" ) // 结构体异步传递后, 执行的注册函数, 实际上就是update的handler var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { fsPasser := passer.NewPasser[sm.Result]() fsPasser.RegisterPasser(&CmdEnqueue{}, func(cxt context.Context, obj any) (sm.Result, error) { var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) var dequeueHandler = cxt.Value(ctxDequeueHandler{}).(*triggered.EventExecute[bool]) cmd := obj.(*CmdEnqueue) key := cmd.Item.GetKey() smqueue.mu.Lock() var queue *PriorityQueue[QueueItem] var ok bool if queue, ok = smqueue.queues[cmd.Group]; !ok { queue = NewPriorityQueue[QueueItem]() smqueue.queues[cmd.Group] = queue } smqueue.mu.Unlock() queue.Push(&Slice[QueueItem]{ Key: key, Value: cmd.Item, }) countPutPop.Use(func(item *CountPutPop) { item.PutCount++ }) var result sm.Result dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update return result, nil }) fsPasser.RegisterPasser(&CmdDequeue{}, func(cxt context.Context, obj any) (sm.Result, error) { var smqueue = cxt.Value(ctxSMQueue{}).(*SMQueue) var e = cxt.Value(ctxEntry{}).(*sm.Entry) var queue *PriorityQueue[QueueItem] var ok bool cmd := obj.(*CmdDequeue) smqueue.mu.Lock() if queue, ok = smqueue.queues[cmd.Group]; !ok { queue = NewPriorityQueue[QueueItem]() smqueue.queues[cmd.Group] = queue } smqueue.mu.Unlock() var result sm.Result if queue.Empty() { return result, nil } item := queue.Pop() if item == nil { result.Value = 0 return result, nil } if item != nil { d, err := item.Encode() if err != nil { return result, err } e.Result.Data = d result.Data = d smqueue.counter.Notify(queue.Size()) // log.Println("queue remain:", queue.Size()) } return result, nil }) return fsPasser }()