package main import ( "database/sql" "encoding/json" "fmt" "intimate" "log" "regexp" "time" "github.com/tebeka/selenium" ) // // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql // var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) // // estore 解析存储连接实例 // var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func main() { adriver := intimate.GetChromeDriver() ps := intimate.NewPerfectShutdown() slqueue := intimate.TStreamerList.Queue(intimate.StreamerList{}, intimate.ConditionDefault(intimate.Ptwitch)) squeue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Ptwitch)) var count = 0 var countlimt = 200 var recreate = time.Now() var lasterr error = nil // var err error for !ps.IsClose() { wd := adriver.Webdriver // sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) isl, err := slqueue.Pop() if err != nil { if lasterr != err { lasterr = err log.Println(err) } istreamer, err := squeue.Pop() if err != nil { if lasterr != err { lasterr = err log.Println(err) ps.Wait(time.Minute) continue } } streamer := istreamer.(*intimate.Streamer) Extractor(wd, streamer) if err = intimate.TStreamer.Update(streamer); err != nil { log.Println(err) } count++ if count >= countlimt || time.Now().Sub(recreate) >= time.Minute*120 { count = 0 adriver.Close() adriver = intimate.GetChromeDriver() recreate = time.Now() } continue } streamerlist := isl.(*intimate.StreamerList) weburl := streamerlist.Url + "?sort=VIEWER_COUNT" err = wd.Get(weburl) if err != nil { log.Println(err) // sstore.UpdateError(sourceChannel, err) intimate.TStreamerList.UpdateError(streamerlist, err) time.Sleep(time.Second * 10) continue } wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) { _, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]") if err != nil { return false, err } return true, nil }, time.Second*10) btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") if err != nil { log.Println(err) continue } btn.Click() var elements []selenium.WebElement var liveurls = 0 var delayerror = 2 for i := 0; i < 200 && !ps.IsClose(); i++ { elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") if err != nil { log.Println(err) break } time.Sleep(time.Millisecond * 200) wd.KeyDown(selenium.EndKey) time.Sleep(time.Millisecond * 200) wd.KeyUp(selenium.EndKey) time.Sleep(time.Millisecond * 2000) if len(elements) == liveurls { delayerror-- if delayerror <= 0 { break } } else { delayerror = 2 } liveurls = len(elements) } articles, err := wd.FindElements(selenium.ByXPATH, "//article") if err != nil { log.Println(err) continue } var streamers []*intimate.Streamer for _, article := range articles { e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]") if err != nil { log.Println(err) continue } href, err := e.GetAttribute("href") if err != nil { log.Println(err) continue } btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button") if err != nil { log.Println(err) continue } var tags []string for _, btn := range btns { tag, err := btn.GetAttribute("data-a-target") if err == nil { tags = append(tags, tag) } } streamer := &intimate.Streamer{} matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href) if len(matches) == 2 { mc := matches[1] streamer.UserId = &mc } else { log.Println(href) continue } jtags, err := json.Marshal(tags) if err != nil { log.Println(err) } else { streamer.Tags = jtags } streamer.Platform = intimate.Ptwitch streamer.LiveUrl = &sql.NullString{String: href, Valid: true} streamer.Operator = 0 streamers = append(streamers, streamer) // if estore.InsertStreamer(streamer) { // // log.Println("streamer update tags", streamer.Uid, tags) // if streamer.Tags != nil { // estore.Update(streamer, "Tags", streamer.Tags) // } // } } for _, streamer := range streamers { Extractor(wd, streamer) streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} if err = intimate.TStreamer.InsertOrUpdate(streamer, intimate.DUpdate{Field: "tags"}, intimate.DUpdate{Field: "update_time"}, intimate.DUpdate{Field: "update_interval"}, ); err != nil { log.Println(err) } } log.Println("streamer find", len(articles)) if len(articles) == 0 { intimate.TStreamerList.UpdateError(streamerlist, fmt.Errorf("")) } count++ if count >= countlimt || time.Now().Sub(recreate) >= time.Minute*120 { count = 0 adriver.Close() adriver = intimate.GetChromeDriver() recreate = time.Now() } } adriver.Close() } func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) { // streamer, err := estore.Pop(intimate.Ptwitch) // if streamer == nil || err != nil { // if err != lasterr { // log.Println(err, lasterr) // lasterr = err // } // time.Sleep(time.Second * 2) // continue // } // var updateUrl map[string]string // json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) liveUrl := "https://www.twitch.tv/" + (*streamer.UserId) // liveUrl = strings.Replace(liveUrl, "/watchparty", "", -1) log.Println(liveUrl) // err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about") err := wd.Get(liveUrl + "/about") if err != nil { errstr := fmt.Errorf("%s: %s", err.Error(), liveUrl+"/about") log.Println(errstr) intimate.TStreamer.UpdateError(streamer, errstr) time.Sleep(time.Second * 5) return } streamer.LiveUrl = &sql.NullString{String: liveUrl, Valid: true} clog := &intimate.CollectLog{} clog.UserId = *streamer.UserId clog.Gratuity = &sql.NullInt64{Int64: 0, Valid: false} time.Sleep(time.Millisecond * 500) err = extractUserName(wd, streamer) if err != nil { _, err = wd.FindElement(selenium.ByXPATH, "//a[@data-a-target='browse-channels-button']") if err == nil { log.Println(*streamer.UserId, "may be cancell") streamer.Operator = 5 intimate.TStreamer.UpdateError(streamer, fmt.Errorf(*streamer.UserId, "may be cancell")) } return } err = extractFollowers(wd, clog) if err != nil { // log.Println(err) streamer.UpdateInterval += 30 return } err = extractViews(wd, clog) // views + tags + gratuity if err != nil { // 不直播时提取礼物 gratuity wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { channelchat, err := wd.FindElement(selenium.ByXPATH, `//a[@data-a-target="channel-home-tab-Chat"]`) btn, _ := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) if (err == nil && channelchat != nil) || btn != nil { if channelchat != nil { channelchat.Click() } time.Sleep(time.Second) extractGratuity(wd, clog) return true, nil } return false, nil }, time.Second*4) } streamer.Platform = intimate.Ptwitch clog.Platform = streamer.Platform clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} // clog.StreamerUid = streamer.Uid lastClogId, err := intimate.TClog.InsertRetAutoID(clog) if err != nil { log.Println(err) return } streamer.LatestLogUid = lastClogId if clog.Tags != nil { streamer.Tags = clog.Tags } switch fl := clog.Followers.Int64; { case fl > 100000: streamer.UpdateInterval = 120 case fl > 10000: streamer.UpdateInterval = 240 * 2 case fl > 1000: streamer.UpdateInterval = 360 * 2 case fl > 100: streamer.UpdateInterval = 720 * 2 case fl > 0: streamer.UpdateInterval = 1440 * 4 } streamer.UpdateTime = clog.UpdateTime // intimate.TStreamer.InsertOrUpdate(streamer) // count++ // if count >= countlimt { // count = 0 // // wd.Quit() // wd = intimate.GetChromeDriver(3030) // } } func extractUserName(wd selenium.WebDriver, streamer *intimate.Streamer) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { label, err := web.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1") if err == nil { if ltxt, err := label.Text(); err == nil && ltxt != "" { // log.Println("label:", ltxt) streamer.UserName = &sql.NullString{String: ltxt, Valid: true} return true, nil } } return false, err }, 15*time.Second) } func extractFollowers(wd selenium.WebDriver, clog *intimate.CollectLog) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { efollowers, err := web.FindElement(selenium.ByXPATH, "//div[@data-a-target='about-panel']//div[@class='tw-align-center']") if err != nil { return false, err } followers, err := efollowers.Text() if err != nil || followers == "" { return false, err } followers = regexp.MustCompile(`[\d,]+`).FindString(followers) fint, _ := intimate.ParseNumber(followers) clog.Followers = &sql.NullInt64{Int64: int64(fint), Valid: true} // log.Println("followers: ", followers, fint) return true, nil }, 4*time.Second) } func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { views, err := web.FindElement(selenium.ByXPATH, "//a[@data-a-target='home-live-overlay-button']/span") if views != nil { if txt, err := views.Text(); err == nil { vint, _ := intimate.ParseNumber(txt) clog.Views = &sql.NullInt64{Int64: vint, Valid: true} // log.Println("views:", txt) views.Click() extractTags(wd, clog) extractTitle(wd, clog) extractGratuity(wd, clog) return true, nil } } return false, err }, time.Second*4) } func extractTitle(wd selenium.WebDriver, clog *intimate.CollectLog) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { title, err := web.FindElement(selenium.ByXPATH, `//h2[@data-a-target='stream-title']`) if err == nil { if txt, err := title.Text(); err == nil { clog.LiveTitle = &sql.NullString{String: txt, Valid: true} return true, nil } } return false, err }, time.Second*4) } func extractTags(wd selenium.WebDriver, clog *intimate.CollectLog) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { tags, err := web.FindElements(selenium.ByXPATH, "//a[@aria-label and @data-a-target and @href]/div[@class and text()]") if len(tags) == 0 { return false, err } var stags []string for _, tag := range tags { if txt, err := tag.Text(); err == nil { stags = append(stags, txt) } else { log.Println(err) } } if len(stags) > 0 { if tagbuf, err := json.Marshal(stags); err == nil { clog.Tags = tagbuf } else { log.Println(err) } } return true, nil }, time.Second*4) } func extractGratuity(wd selenium.WebDriver, clog *intimate.CollectLog) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { btn, err := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) if err == nil { btn.Click() time.Sleep(time.Second) gifcount, err := web.FindElements(selenium.ByXPATH, `//div[@class="sub-gift-count tw-flex"]/p`) if err == nil { var gratuity int64 = 0 for _, gc := range gifcount { if gtxt, err := gc.Text(); err == nil { gint, _ := intimate.ParseNumber(gtxt) gratuity += gint } else { log.Println(err) } } clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true} } return true, nil } return false, err }, time.Second*4) }