fusen-render/queue.go

100 lines
1.8 KiB
Go
Raw Normal View History

2023-07-28 11:04:21 +00:00
package fusenrender
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/dgraph-io/badger/v3"
)
type ApplyTask struct {
Name string `json:"name"`
Data any `json:"data"`
}
type QueueItem struct {
Group string `json:"group"` // 组名
Priority uint32 `json:"priority"` // 处理的优先级
CreateAt time.Time `json:"create_at"` // 创建时间 统一utc
Data any `json:"data"` // 操作的数据结构
}
func (item *QueueItem) Encode() ([]byte, error) {
val, err := json.Marshal(item)
if err != nil {
return nil, err
}
return val, nil
}
type Queue struct {
db *badger.DB
}
func NewQueue(datapath string) (*Queue, error) {
opts := badger.DefaultOptions(datapath)
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
return &Queue{
db: db,
}, nil
}
func (q *Queue) GetKey(item *QueueItem) []byte {
return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
}
func (q *Queue) Enqueue(item *QueueItem) error {
// Badger存储
val, err := json.Marshal(item)
if err != nil {
return err
}
err = q.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(q.GetKey(item)), val)
})
return err
}
func (q *Queue) Dequeue(group string) (*QueueItem, error) {
var item *QueueItem
prefix := []byte(fmt.Sprintf("%s_", group))
err := q.db.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
it.Seek(prefix)
if !it.ValidForPrefix(prefix) {
return nil
}
itemKey := it.Item().Key()
err := it.Item().Value(func(val []byte) error {
item := &QueueItem{}
return json.Unmarshal(val, item)
})
if err != nil {
log.Println(err)
}
return txn.Delete(itemKey)
})
return item, err
}