From ef7b59ce3dd8ccc2f3938ca02e4036597beb4af9 Mon Sep 17 00:00:00 2001 From: eson Date: Fri, 11 Sep 2020 18:52:04 +0800 Subject: [PATCH] TODO: streamerlist --- autostore.go | 4 +- extractor/nimo_extractor/nimo_extractor.go | 9 ++- .../twitch_extractor/tiwtch_extractor.go | 60 ++++++++++++------- tasks/twitch/twitch_task1/task_twitch.go | 24 ++++---- utils.go | 37 ++++++++++-- 5 files changed, 87 insertions(+), 47 deletions(-) diff --git a/autostore.go b/autostore.go index 28dcca1..a7c73c8 100644 --- a/autostore.go +++ b/autostore.go @@ -427,10 +427,10 @@ func (t *Table) UpdateError(obj interface{}, err error) { } } - _, dberr := t.store.db.Exec("update "+t.name+" set operator = ?, error_msg = ? where ? = ?", 10000, sql.NullString{String: err.Error(), Valid: true}, uidname, uidvalue) + _, dberr := t.store.db.Exec("update "+t.name+" set operator = ?, error_msg = ? where "+uidname+" = ?", 10000, sql.NullString{String: err.Error(), Valid: true}, uidvalue) if dberr != nil { // email tell owner to deal with - panic(err) + panic(dberr) } } diff --git a/extractor/nimo_extractor/nimo_extractor.go b/extractor/nimo_extractor/nimo_extractor.go index 79c9cfd..b94b6fb 100644 --- a/extractor/nimo_extractor/nimo_extractor.go +++ b/extractor/nimo_extractor/nimo_extractor.go @@ -28,10 +28,10 @@ type LiveInfo struct { } func Execute() { - wd := intimate.GetChromeDriver() + adriver := intimate.GetChromeDriver() count := 0 countlimit := 200 - + wd := adriver.Webdriver waitfor := intimate.NewWaitFor(wd) ps := intimate.NewPerfectShutdown() @@ -118,9 +118,8 @@ func Execute() { count++ if count >= countlimit { count = 0 - wd.Close() - wd.Quit() - wd = intimate.GetChromeDriver() + adriver.Close() + adriver = intimate.GetChromeDriver() } } } diff --git a/extractor/twitch_extractor/tiwtch_extractor.go b/extractor/twitch_extractor/tiwtch_extractor.go index 6aa0ab4..a1eedc9 100644 --- a/extractor/twitch_extractor/tiwtch_extractor.go +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -7,7 +7,6 @@ import ( "intimate" "log" "regexp" - "strings" "time" "github.com/tebeka/selenium" @@ -23,15 +22,13 @@ func main() { adriver := intimate.GetChromeDriver() - defer func() { - adriver.Close() - }() - ps := intimate.NewPerfectShutdown() - queue := intimate.TStreamerList.Queue(intimate.StreamerList{}, intimate.ConditionDefault(intimate.Ptwitch)) + slqueue := intimate.TStreamerList.Queue(intimate.StreamerList{}, intimate.ConditionDefault(intimate.Ptwitch)) + squeue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Ptwitch)) + var count = 0 - var countlimt = 1 + var countlimt = 200 var recreate = time.Now() var lasterr error = nil @@ -40,13 +37,36 @@ func main() { wd := adriver.Webdriver // sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) - isl, err := queue.Pop() + isl, err := slqueue.Pop() if err != nil { if lasterr != err { lasterr = err log.Println(err) } - time.Sleep(time.Minute) + + istreamer, err := squeue.Pop() + if err != nil { + if lasterr != err { + lasterr = err + log.Println(err) + ps.Wait(time.Minute) + continue + } + } + + streamer := istreamer.(*intimate.Streamer) + Extractor(wd, streamer) + if err = intimate.TStreamer.Update(streamer); err != nil { + log.Println(err) + } + + count++ + if count >= countlimt || time.Now().Sub(recreate) >= time.Minute*120 { + count = 0 + adriver.Close() + adriver = intimate.GetChromeDriver() + recreate = time.Now() + } continue } @@ -185,17 +205,14 @@ func main() { count++ if count >= countlimt || time.Now().Sub(recreate) >= time.Minute*120 { count = 0 - countlimt = 3 + adriver.Close() adriver = intimate.GetChromeDriver() recreate = time.Now() } - if count >= 2 { - break - } - } + adriver.Close() } func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) { @@ -211,16 +228,16 @@ func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) { // var updateUrl map[string]string // json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) - liveUrl := streamer.LiveUrl.String - - liveUrl = strings.Replace(liveUrl, "/watchparty", "", -1) + liveUrl := "https://www.twitch.tv/" + (*streamer.UserId) + // liveUrl = strings.Replace(liveUrl, "/watchparty", "", -1) log.Println(liveUrl) // err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about") err := wd.Get(liveUrl + "/about") if err != nil { - log.Println(err) - intimate.TStreamer.UpdateError(streamer, err) + errstr := fmt.Errorf("%s: %s", err.Error(), liveUrl+"/about") + log.Println(errstr) + intimate.TStreamer.UpdateError(streamer, errstr) time.Sleep(time.Second * 5) return } @@ -235,10 +252,10 @@ func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) { if err != nil { _, err = wd.FindElement(selenium.ByXPATH, "//a[@data-a-target='browse-channels-button']") if err == nil { - log.Println(streamer.UserId, "may be cancell") + log.Println(*streamer.UserId, "may be cancell") streamer.Operator = 5 streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} - intimate.TStreamer.UpdateError(streamer, fmt.Errorf("")) + intimate.TStreamer.UpdateError(streamer, fmt.Errorf(*streamer.UserId, "may be cancell")) } return } @@ -276,7 +293,6 @@ func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) { return } - streamer.Operator = 10 streamer.LatestLogUid = lastClogId if clog.Tags != nil { streamer.Tags = clog.Tags diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index 32c531b..57e8618 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -20,11 +20,12 @@ import ( func Execute() { ps := intimate.NewPerfectShutdown() + var adriver *intimate.AutoCloseDriver for !ps.IsClose() { var err error - adriver := intimate.GetChromeDriver() + adriver = intimate.GetChromeDriver() wd := adriver.Webdriver weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" @@ -109,20 +110,17 @@ func Execute() { if err != nil { log.Println(err) } - - // TODO: Save href - // source := &intimate.Source{} - // source.Source = sql.NullString{String: href, Valid: true} - // source.Operator = 0 - // source.Target = intimate.TTwitchChannel - // source.Url = weburl - // sstore.Insert(source) } log.Println("hrefs len:", len(hrefs)) - // sstore.Deduplicate(intimate.TTwitchChannel, "source") - // wd.Close() - // wd.Quit() - time.Sleep(time.Minute * 30) + adriver.Close() + ps.Wait(time.Minute * 5) + } + + func merge(nums1 []int, m int, nums2 []int, n int) { + for i := 0 ; i < n ; i++ { + nums1[m+i] = n[i] + } + sort } } diff --git a/utils.go b/utils.go index 5f3683f..aa14bbe 100644 --- a/utils.go +++ b/utils.go @@ -100,14 +100,29 @@ type AutoCloseDriver struct { } func (adriver *AutoCloseDriver) Close() { - killshell := fmt.Sprintf("pkill -P `pgrep -f 'port=%d '` && pkill -f 'port=%d '", adriver.Port, adriver.Port) - log.Println(killshell) - // log.Printf(fmt.Sprintf("kill -9 $(lsof -t -i:%d)", port)) - cmd := exec.Command("sh", "-c", killshell) - err := cmd.Run() + data, err := exec.Command("/bin/bash", "-c", fmt.Sprintf(`pgrep -f "port=%d"`, adriver.Port)).Output() if err != nil { log.Println(err) + log.Println(string(data)) + return + } + // log.Println(string(data)) + + killshell := fmt.Sprintf("pkill -P %s", data) + // log.Println(killshell) + // pkill -f \"port=%d\" + // log.Printf(fmt.Sprintf("kill -9 $(lsof -t -i:%d)", port)) + err = exec.Command("/bin/bash", "-c", killshell).Run() + if err != nil { + log.Println(err) + return + } + + err = exec.Command("/bin/bash", "-c", fmt.Sprintf("kill %s", data)).Run() + if err != nil { + log.Println(err) + return } } @@ -217,6 +232,18 @@ func (ps *PerfectShutdown) IsClose() bool { return atomic.LoadInt32(&ps.loop) == 0 } +// Wait 判断是否要关闭 +func (ps *PerfectShutdown) Wait(tm time.Duration) bool { + now := time.Now() + for time.Now().Sub(now) <= tm { + if ps.IsClose() { + return false + } + time.Sleep(time.Second) + } + return true +} + type Counter struct { dcount int count int