fix duplicate error log

This commit is contained in:
eson 2020-12-10 17:50:42 +08:00
parent c93b4380ec
commit e9862571e2
3 changed files with 28 additions and 12 deletions

View File

@ -7,16 +7,17 @@ import (
"time" "time"
"git.nonolive.co/eson.hsm/databoard-collect/database" "git.nonolive.co/eson.hsm/databoard-collect/database"
"github.com/go-sql-driver/mysql"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
) )
// collectCopyCountLiveAnchors 从mongodb里复制需要增量的值 // collectCopyCountLiveAnchors 从mongodb里复制需要增量的值
func collectCopyCountLiveAnchors(cxt *WorkerContext) { func collectCopyCountLiveAnchors(cxt *WorkerContext) {
defer cxt.Done()
var err error var err error
var ok bool var ok bool
var lastLiveAnchor *CountLiveAnchors
for !ps.IsClose() { for !ps.IsClose() {
liveanchor := &CountLiveAnchors{} liveanchor := &CountLiveAnchors{}
@ -32,8 +33,17 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) {
} }
cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}})
} else { } else {
if lastLiveAnchor != nil {
if liveanchor.UID == liveanchor.UID {
ps.Wait(time.Second * 2)
continue
}
}
last = liveanchor.CreateAt last = liveanchor.CreateAt
log.Println("last: ", last) log.Println("last: ", last, liveanchor.UID)
lastLiveAnchor = liveanchor
cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}}) cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": last}})
} }
@ -43,8 +53,9 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) {
continue continue
} }
var isNoData bool
for cur.Next(context.TODO()) && !ps.IsClose() { for cur.Next(context.TODO()) && !ps.IsClose() {
// la := &database.LiveAnchorsCountPoint{}
la := &database.LiveAnchorsCountPointObjectID{} la := &database.LiveAnchorsCountPointObjectID{}
err = cur.Decode(la) err = cur.Decode(la)
if err != nil { if err != nil {
@ -63,10 +74,18 @@ func collectCopyCountLiveAnchors(cxt *WorkerContext) {
c.CountMap = string(data) c.CountMap = string(data)
_, err = db.T.CountLiveAnchors.Insert(c) _, err = db.T.CountLiveAnchors.Insert(c)
if err != nil { if err != nil {
log.Println(err) switch err.(*mysql.MySQLError).Number {
case 1062: // duplicate
isNoData = true
default:
log.Println(err.(*mysql.MySQLError).Number, err)
}
}
}
if isNoData {
ps.Wait(time.Second * 2) ps.Wait(time.Second * 2)
} }
} }
} }
}
} }

1
go.mod
View File

@ -7,6 +7,5 @@ require (
github.com/474420502/perfectshutdown v0.1.0 github.com/474420502/perfectshutdown v0.1.0
github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql v1.5.0
github.com/go-xorm/xorm v0.7.9 github.com/go-xorm/xorm v0.7.9
github.com/google/uuid v1.1.2
go.mongodb.org/mongo-driver v1.4.3 go.mongodb.org/mongo-driver v1.4.3
) )

View File

@ -10,11 +10,6 @@ type WorkerContext struct {
wg *sync.WaitGroup wg *sync.WaitGroup
} }
// Done 必须在Handler里结束时处理. 默认defer cxt.Done()
func (cxt *WorkerContext) Done() {
cxt.wg.Done()
}
// Worker 主进程 // Worker 主进程
type Worker struct { type Worker struct {
cxt *WorkerContext cxt *WorkerContext
@ -30,7 +25,10 @@ var worker = func() *Worker {
// Handler 处理方法 // Handler 处理方法
func (w *Worker) Handler(handleFunc func(cxt *WorkerContext)) { func (w *Worker) Handler(handleFunc func(cxt *WorkerContext)) {
w.cxt.wg.Add(1) w.cxt.wg.Add(1)
go handleFunc(w.cxt) go func() {
defer w.cxt.wg.Done()
handleFunc(w.cxt)
}()
} }
// Run 运行 // Run 运行