databoard-transform/collect.go

80 lines
1.9 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
2020-12-13 17:28:05 +00:00
"database/sql"
"encoding/json"
2020-12-13 17:28:05 +00:00
"fmt"
"log"
"time"
2020-12-11 06:40:31 +00:00
mongodb "git.nonolive.co/eson.hsm/databoard-collect/database"
2020-12-10 09:50:42 +00:00
"github.com/go-sql-driver/mysql"
"go.mongodb.org/mongo-driver/bson"
2020-12-09 10:11:52 +00:00
"go.mongodb.org/mongo-driver/mongo"
)
// collectCopyCountLiveAnchors 从mongodb里复制需要增量的值
func collectCopyCountLiveAnchors(cxt *WorkerContext) {
2020-12-09 10:18:09 +00:00
var err error
2020-12-10 11:00:35 +00:00
for !ps.IsClose() {
2020-12-13 17:28:05 +00:00
db.Do(func(db *sql.DB) {
2020-12-11 06:40:31 +00:00
liveanchor := &CountLiveAnchors{}
2020-12-13 17:28:05 +00:00
// T.CountLiveAnchors.Order("create_at desc").Limit(1).Find(liveanchor)
selectsql := fmt.Sprintf("select uid, create_at from %s order by create_at desc limit 1", Tables.CountLiveAnchors)
row := db.QueryRow(selectsql)
if row.Err() == nil {
2020-12-11 06:40:31 +00:00
2020-12-13 17:28:05 +00:00
row.Scan(&liveanchor.UID, &liveanchor.CreateAt)
log.Println("last: ", liveanchor.CreateAt, liveanchor.UID)
2020-12-11 06:40:31 +00:00
2020-12-13 17:28:05 +00:00
var cur *mongo.Cursor
cur, err = mdb.C.CountLiveAnchors.Find(context.TODO(), bson.M{"create_at": bson.M{"$gt": liveanchor.CreateAt}})
2020-12-11 06:40:31 +00:00
if err != nil {
log.Println(err)
ps.Wait(time.Second * 5)
return
}
2020-12-09 10:18:09 +00:00
2020-12-11 06:40:31 +00:00
for cur.Next(context.TODO()) && !ps.IsClose() {
2020-12-11 06:40:31 +00:00
la := &mongodb.LiveAnchorsCountPointObjectID{}
err = cur.Decode(la)
if err != nil {
panic(err)
}
uid := la.ObjectID.Hex()
2020-12-10 09:50:42 +00:00
2020-12-11 06:40:31 +00:00
c := &CountLiveAnchors{}
c.UID = uid
c.IsCounted = 0
c.CreateAt = la.CreateAt
2020-12-11 06:40:31 +00:00
data, err := json.Marshal(la.LiveAnchorDict)
if err != nil {
panic(err)
}
c.CountMap = string(data)
2020-12-15 08:58:38 +00:00
insertsql := fmt.Sprintf("insert ignore into %s(uid, is_counted, count_map, create_at) values(?,?,?,?)", Tables.CountLiveAnchors)
_, err = db.Exec(insertsql, c.UID, c.IsCounted, c.CountMap, c.CreateAt)
if err != nil {
switch merr := err.(*mysql.MySQLError); merr.Number {
case 1062:
2020-12-15 08:58:38 +00:00
default:
log.Println(merr)
}
2020-12-10 09:50:42 +00:00
}
}
}
2020-12-11 06:40:31 +00:00
})
2020-12-10 09:50:42 +00:00
ps.Wait(time.Second * 5)
}
}