From 7182da2cbe079cb5a8dd5dadd905feff05195c70 Mon Sep 17 00:00:00 2001 From: eson Date: Fri, 17 Jul 2020 19:20:08 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=97=B6=E5=8C=BA=E9=97=AE=E9=A2=98.=20?= =?UTF-8?q?sqluri=E5=AF=BC=E8=87=B4=E8=A7=A3=E6=9E=90=E9=94=99=E8=AF=AF.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yaml | 4 +-- .../openrec_extractor/openrec_extractor.go | 5 ++-- store.go | 28 +++++++++++-------- tasks/openrec/openrec_task1/task_openrec.go | 5 ++-- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/config.yaml b/config.yaml index 5a5df4a..1e031f8 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,3 @@ database: - source_uri: "root:@tcp(127.0.0.1:4000)/intimate_source?parseTime=true" - extractor_uri: "root:@tcp(127.0.0.1:4000)/intimate_extractor?parseTime=true" \ No newline at end of file + source_uri: "root:@tcp(127.0.0.1:4000)/intimate_source?parseTime=true&loc=Local" + extractor_uri: "root:@tcp(127.0.0.1:4000)/intimate_extractor?parseTime=true&loc=Local" \ No newline at end of file diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 1fbfa81..838eaba 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -79,7 +79,7 @@ func (oe *OpenrecExtractor) Execute() { // log.Println(anchorId) oe.extractFollowers(clog) - oe.extractAnchorName(streamer) + oe.extractUserName(streamer) oe.extractViewsAndLiveStreaming(clog) oe.extractGiversAndGratuity(clog) oe.extractLive(clog) @@ -138,7 +138,7 @@ func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) { clog.Set("Followers", sql.NullInt64{Int64: followersInt, Valid: true}) } -func (oe *OpenrecExtractor) extractAnchorName(ai intimate.ISet) { +func (oe *OpenrecExtractor) extractUserName(ai intimate.ISet) { extractor := oe.user.GetExtractor() xp, err := extractor.XPathResult("//p[@class='c-global__user__profile__list__name__text official-icon--after']/text()") if err != nil { @@ -214,6 +214,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) { if err != nil { log.Println(err) } + log.Println(iter.Node().NodeValue(), tm.Local()) clog.Set("LiveStartTime", sql.NullTime{Time: tm.Local(), Valid: true}) duration, err := extractor.XPathResult("//meta[@itemprop='duration']/@content") diff --git a/store.go b/store.go index ab181e7..3e761c1 100644 --- a/store.go +++ b/store.go @@ -210,19 +210,33 @@ func (store *ExtractorStore) UpdateStreamerLogUid(logUid, streamerUid int64) err // InsertStreamer Streamer表, 插入数据 func (store *ExtractorStore) InsertStreamer(isource IGet) (Uid int64, err error) { - // select uid from table where platform = ? and anchor_id = ? - selectSQL := "select uid from " + StreamerTable + " where platform = ? and anchor_id = ?" + // select uid from table where platform = ? and user_id = ? + selectSQL := "select uid from " + StreamerTable + " where platform = ? and user_id = ?" tx, err := store.db.Begin() if err != nil { log.Println(err) return 0, err } + defer func() { + err = tx.Commit() + if err != nil { + log.Println(err) + err = tx.Rollback() + if err != nil { + log.Println(err) + } + Uid = 0 + } + }() + row := tx.QueryRow(selectSQL+` limit 1 for update`, isource.Get("Platform"), isource.Get("UserId")) var uid int64 if err = row.Scan(&uid); err == nil { return uid, nil + } else { + log.Println(err) } result, err := tx.Exec("insert into "+StreamerTable+"(platform, user_id, user_name, live_url, channel, latest_log_uid, ext) values(?,?,?,?,?,?,?);", isource.Get("Platform"), isource.Get("UserId"), isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext")) @@ -232,16 +246,6 @@ func (store *ExtractorStore) InsertStreamer(isource IGet) (Uid int64, err error) return 0, nil } - err = tx.Commit() - if err != nil { - log.Println(err) - err = tx.Rollback() - if err != nil { - log.Println(err) - } - return 0, err - } - return result.LastInsertId() } diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index a6c33a2..08ac70c 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -60,7 +60,8 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { resp, err := cxt.Hunt() if err != nil { log.Println(err) - break + time.Sleep(time.Second * 2) + continue } tp := cxt.Temporary() @@ -69,7 +70,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { if len(content) <= 200 { // 末页退出 finishpoint := time.Now() log.Println("任务结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*30)) - for time.Now().Sub(finishpoint) < time.Minute*30 { + for time.Now().Sub(finishpoint) < time.Minute*60 { time.Sleep(time.Second) if atomic.LoadInt32(&loop) > 0 { return