diff --git a/collect.go b/collect.go index 63863c8..a36ed83 100644 --- a/collect.go +++ b/collect.go @@ -9,6 +9,7 @@ import ( "git.nonolive.co/eson.hsm/databoard-collect/database" "github.com/google/uuid" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" ) // collectCopyCountLiveAnchors 从mongodb里复制需要增量的值 @@ -18,10 +19,17 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) { for !ps.IsClose() { liveanchor := &CountLiveAnchors{} - if ok, err := db.T.CountLiveAnchors.OrderBy("create_at desc").Limit(1).Get(liveanchor); ok { - last := liveanchor.CreateAt - log.Println("last: ", last) - cur, err := mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) + ok, err := db.T.CountLiveAnchors.OrderBy("create_at desc").Limit(1).Get(liveanchor) + if ok || (ok == false && err == nil) { + + var cur *mongo.Cursor + if liveanchor == nil { + cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": "2020-05-30"}}) + } else { + last := liveanchor.CreateAt + log.Println("last: ", last) + cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) + } if err != nil { log.Println(err) ps.Wait(time.Second * 5) @@ -50,9 +58,6 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) { log.Println(err) } } - } else { - log.Println(err) - ps.Wait(time.Second * 5) } } } diff --git a/worker.go b/worker.go index 431fd68..d3f24e2 100644 --- a/worker.go +++ b/worker.go @@ -17,26 +17,25 @@ func (cxt *WorkerContext) Done() { // Worker 主进程 type Worker struct { - wg *sync.WaitGroup cxt *WorkerContext } var worker = func() *Worker { w := &Worker{} w.cxt = &WorkerContext{} - w.wg = &sync.WaitGroup{} + w.cxt.wg = &sync.WaitGroup{} return w }() // Handler 处理方法 func (w *Worker) Handler(handleFunc func(cxt *WorkerContext)) { - w.wg.Add(1) + w.cxt.wg.Add(1) go handleFunc(w.cxt) } // Run 运行 func (w *Worker) Run() { log.Println("worker running") - w.wg.Wait() + w.cxt.wg.Wait() log.Println("worker stop") }