diff --git a/extractor/nimo_extractor/nimo_extractor.go b/extractor/nimo_extractor/nimo_extractor.go index d823de5..79c9cfd 100644 --- a/extractor/nimo_extractor/nimo_extractor.go +++ b/extractor/nimo_extractor/nimo_extractor.go @@ -28,7 +28,7 @@ type LiveInfo struct { } func Execute() { - wd := intimate.GetChromeDriver(3030) + wd := intimate.GetChromeDriver() count := 0 countlimit := 200 @@ -120,7 +120,7 @@ func Execute() { count = 0 wd.Close() wd.Quit() - wd = intimate.GetChromeDriver(3030) + wd = intimate.GetChromeDriver() } } } diff --git a/extractor/twitcasting_extractor/twitcasting_extractor.go b/extractor/twitcasting_extractor/twitcasting_extractor.go index 0fe20a6..da8c17d 100644 --- a/extractor/twitcasting_extractor/twitcasting_extractor.go +++ b/extractor/twitcasting_extractor/twitcasting_extractor.go @@ -34,13 +34,17 @@ func main() { ps := intimate.NewPerfectShutdown() ses := requests.NewSession() streamerQueue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Ptwitcasting)) - + var lasterr error for !ps.IsClose() { // streamer, err := estore.Pop(intimate.Ptwitcasting) isteamer, err := streamerQueue.Pop() if err != nil { - log.Println(err, isteamer) + if lasterr != err { + lasterr = err + log.Println(err) + } + time.Sleep(time.Minute) continue } diff --git a/extractor/twitch_extractor/tiwtch_extractor.go b/extractor/twitch_extractor/tiwtch_extractor.go index 8a00fcd..6aa0ab4 100644 --- a/extractor/twitch_extractor/tiwtch_extractor.go +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -20,22 +20,36 @@ import ( // var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func main() { - wd := intimate.GetChromeDriver(3040) + + adriver := intimate.GetChromeDriver() + + defer func() { + adriver.Close() + }() + ps := intimate.NewPerfectShutdown() queue := intimate.TStreamerList.Queue(intimate.StreamerList{}, intimate.ConditionDefault(intimate.Ptwitch)) var count = 0 - var countlimt = 200 + var countlimt = 1 + var recreate = time.Now() - // var lasterr error = nil + var lasterr error = nil // var err error for !ps.IsClose() { + wd := adriver.Webdriver // sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) isl, err := queue.Pop() if err != nil { - panic(err) + if lasterr != err { + lasterr = err + log.Println(err) + } + time.Sleep(time.Minute) + continue } + streamerlist := isl.(*intimate.StreamerList) weburl := streamerlist.Url + "?sort=VIEWER_COUNT" @@ -169,15 +183,19 @@ func main() { } count++ - if count >= countlimt { + if count >= countlimt || time.Now().Sub(recreate) >= time.Minute*120 { count = 0 - wd = intimate.GetChromeDriver(3031) + countlimt = 3 + adriver = intimate.GetChromeDriver() + recreate = time.Now() + } + + if count >= 2 { + break } } - wd.Close() - wd.Quit() } func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) { diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index 8a6fc0d..32c531b 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -22,8 +22,10 @@ func Execute() { ps := intimate.NewPerfectShutdown() for !ps.IsClose() { + var err error - wd := intimate.GetChromeDriver(3030) + adriver := intimate.GetChromeDriver() + wd := adriver.Webdriver weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" err = wd.Get(weburl) @@ -119,9 +121,8 @@ func Execute() { log.Println("hrefs len:", len(hrefs)) // sstore.Deduplicate(intimate.TTwitchChannel, "source") - - wd.Close() - wd.Quit() + // wd.Close() + // wd.Quit() time.Sleep(time.Minute * 30) } } diff --git a/utils.go b/utils.go index 1a699b7..5f3683f 100644 --- a/utils.go +++ b/utils.go @@ -5,9 +5,10 @@ import ( "database/sql" "fmt" "log" + "net" "os" + "os/exec" "os/signal" - "runtime" "strconv" "strings" "sync/atomic" @@ -93,7 +94,27 @@ func ParseDuration(dt string) (time.Duration, error) { return tdt.Sub(zeroTime), nil } -func GetChromeDriver(port int) selenium.WebDriver { +type AutoCloseDriver struct { + Webdriver selenium.WebDriver + Port int +} + +func (adriver *AutoCloseDriver) Close() { + killshell := fmt.Sprintf("pkill -P `pgrep -f 'port=%d '` && pkill -f 'port=%d '", adriver.Port, adriver.Port) + log.Println(killshell) + + // log.Printf(fmt.Sprintf("kill -9 $(lsof -t -i:%d)", port)) + cmd := exec.Command("sh", "-c", killshell) + err := cmd.Run() + if err != nil { + log.Println(err) + } +} + +func GetChromeDriver() *AutoCloseDriver { + + port := GetFreePort() + var err error caps := selenium.Capabilities{"browserName": "chrome"} @@ -131,28 +152,44 @@ func GetChromeDriver(port int) selenium.WebDriver { chromecaps.ExcludeSwitches = append(chromecaps.ExcludeSwitches, "enable-automation") caps.AddChrome(chromecaps) + _, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", port) if err != nil { panic(err) } + wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", port)) if err != nil { panic(err) } - runtime.SetFinalizer(wd, func(obj interface{}) { - if err := obj.(selenium.WebDriver).Close(); err != nil { - log.Println(err) - } - if err := obj.(selenium.WebDriver).Quit(); err != nil { - log.Println(err) - } - }) + + adriver := &AutoCloseDriver{} + adriver.Port = port + adriver.Webdriver = wd + + // runtime.SetFinalizer(adriver, func(obj interface{}) { + + // adriver := obj.(*AutoCloseDriver) + // adriver.Webdriver.Close() + // adriver.Webdriver.Quit() + + // killshell := fmt.Sprintf("pkill -P `pgrep -f 'port=%d '` && pkill -f 'port=%d '", port, port) + // log.Println(killshell) + + // // log.Printf(fmt.Sprintf("kill -9 $(lsof -t -i:%d)", port)) + // // cmd := exec.Command("sh", "-c", killshell) + // // err = cmd.Run() + // // if err != nil { + // // log.Println(err) + // // } + // }) + wd.ExecuteScript("windows.navigator.webdriver = undefined", nil) if err != nil { panic(err) } - return wd + return adriver } // PerfectShutdown 完美关闭程序 @@ -305,3 +342,17 @@ func (wf *WaitFor) WaitWithTimeout(xpath string, timeout time.Duration, do func( }, timeout) } + +func GetFreePort() int { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + panic(err) + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + panic(err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +}