diff --git a/autostore.go b/autostore.go index e3c6976..28dcca1 100644 --- a/autostore.go +++ b/autostore.go @@ -34,9 +34,10 @@ type Table struct { name string setting interface{} - updatesql string - selectsql string - insertsql string + updatesql string + selectsql string + insertsql string + duplicatesql string } func NewStore(uri string) *Store { @@ -54,6 +55,7 @@ func (store *Store) Table(name string) *Table { table.name = name table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)` + table.duplicatesql = `INSERT INTO ` + table.name + `(%s) values(%s) ON DUPLICATE KEY UPDATE %s` table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?` table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s ` return table @@ -226,6 +228,83 @@ func (t *Table) Insert(obj interface{}) error { return err } +// DUpdate ON DUPLICATE KEY UPDATE struct. Field对应的tag field字段 +type DUpdate struct { + Field string // selected 字段 + Value interface{} +} + +// InsertOrUpdate nil 不插入. 不支持嵌套. 必须是Ptr类型 +func (t *Table) InsertOrUpdate(obj interface{}, updates ...DUpdate) error { + ov := reflect.ValueOf(obj).Elem() + ot := reflect.TypeOf(obj) + + fieldsql := "" + argssql := "" + + var SourceUpdate []*DUpdate + var OtherUpdate []*DUpdate + for _, u := range updates { + if u.Value == nil { + SourceUpdate = append(SourceUpdate, &u) + } else { + OtherUpdate = append(OtherUpdate, &u) + } + } + + var args []interface{} + for i := 0; i < ov.NumField(); i++ { + field := ov.Field(i) + ftype := ot.Elem().Field(i) + + if fname, ok := ftype.Tag.Lookup("field"); ok { + // if flag, ok := ftype.Tag.Lookup("uid"); ok { + // if flag == "auto" { + // continue + // } + // } + + k := ftype.Type.Kind() + if k == reflect.Ptr || k == reflect.Interface { + if !field.IsNil() { + felem := field.Elem() + args = append(args, felem.Interface()) + fieldsql += fname + "," + argssql += "?," + } + } else { + args = append(args, field.Interface()) + fieldsql += fname + "," + argssql += "?," + } + + for _, u := range SourceUpdate { + if u.Field == fname { + u.Value = args[len(args)-1] + break + } + } + + } + + } + + var duplicateSet string = "" + for _, u := range SourceUpdate { + duplicateSet += u.Field + " = ?," + args = append(args, u.Value) + } + + for _, u := range OtherUpdate { + duplicateSet += u.Field + " = ?," + args = append(args, u.Value) + } + + ssql := fmt.Sprintf(t.duplicatesql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1], duplicateSet[:len(duplicateSet)-1]) + _, err := t.store.db.Exec(ssql, args...) + return err +} + // InsertRetAutoID nil 不插入. 不支持嵌套. 并返回auto uid func (t *Table) InsertRetAutoID(obj interface{}) (int64, error) { ov := reflect.ValueOf(obj).Elem() diff --git a/autostore_test.go b/autostore_test.go index e5a452b..e242fc0 100644 --- a/autostore_test.go +++ b/autostore_test.go @@ -5,26 +5,24 @@ import ( "encoding/json" "testing" "time" - - "github.com/davecgh/go-spew/spew" ) -func estAutoStore(t *testing.T) { +func TestAutoStore(t *testing.T) { uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" store := NewStore(uri) - queue := store.Table("streamer").Queue(TSreamer{}, CondWhere{Condition: "operator = 0"}) - re, _ := queue.Pop() + // queue := store.Table("streamer").Queue(TSreamer{}, CondWhere{Condition: "operator = 0"}) + // re, _ := queue.Pop() - pstreamer := re.(*TSreamer) - m := make(map[string]interface{}) - json.Unmarshal(pstreamer.Iface.([]byte), &m) - spew.Println(re.(*TSreamer), m) + // pstreamer := re.(*TSreamer) + // m := make(map[string]interface{}) + // json.Unmarshal(pstreamer.Iface.([]byte), &m) + // spew.Println(re.(*TSreamer), m) streamer := &TSreamer{} - streamer.Uid = 2 - streamer.UserID = &sql.NullString{String: "hehe", Valid: true} + streamer.Uid = 1 + streamer.UserID = &sql.NullString{String: "xixi", Valid: true} streamer.Name = "streamer" streamer.Operator = 0 streamer.Bit = 0b11 @@ -41,7 +39,7 @@ func estAutoStore(t *testing.T) { now := time.Now() streamer.UpdateTime = &now - err = store.Table("streamer").Insert(streamer) + err = store.Table("streamer").InsertOrUpdate(streamer, DUpdate{Field: "userid"}) if err != nil { t.Error(err) } diff --git a/extractor/twitch_extractor/tiwtch_extractor.go b/extractor/twitch_extractor/tiwtch_extractor.go index 2ae0c23..ce65608 100644 --- a/extractor/twitch_extractor/tiwtch_extractor.go +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -3,6 +3,7 @@ package main import ( "database/sql" "encoding/json" + "fmt" "intimate" "log" "regexp" @@ -12,134 +13,288 @@ import ( "github.com/tebeka/selenium" ) -// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) +// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) -// estore 解析存储连接实例 -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() +// // estore 解析存储连接实例 +// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func main() { - wd := intimate.GetChromeDriver(3030) + wd := intimate.GetChromeDriver(3040) ps := intimate.NewPerfectShutdown() + queue := intimate.TStreamerList.Queue(intimate.StreamerList{}, intimate.ConditionDefault(intimate.Ptwitch)) var count = 0 var countlimt = 200 - var lasterr error = nil + // var lasterr error = nil // var err error - for !ps.IsClose() { - streamer, err := estore.Pop(intimate.Ptwitch) - if streamer == nil || err != nil { - if err != lasterr { - log.Println(err, lasterr) - lasterr = err - } - time.Sleep(time.Second * 2) - continue + + // sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) + isl, err := queue.Pop() + if err != nil { + panic(err) } + streamerlist := isl.(*intimate.StreamerList) - var updateUrl map[string]string - json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) - liveUrl := updateUrl["live"] - liveUrl = strings.Replace(liveUrl, "/watchparty", "", -1) - log.Println(liveUrl) - - // err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about") - err = wd.Get(liveUrl + "/about") + weburl := streamerlist.Url + "?sort=VIEWER_COUNT" + err = wd.Get(weburl) if err != nil { log.Println(err) - estore.UpdateError(streamer, err) - time.Sleep(time.Second * 5) + // sstore.UpdateError(sourceChannel, err) + intimate.TStreamerList.UpdateError(streamerlist, err) + time.Sleep(time.Second * 10) continue } - streamer.LiveUrl = sql.NullString{String: liveUrl, Valid: true} - clog := &intimate.CollectLog{} - clog.UserId = streamer.UserId - clog.Gratuity = sql.NullInt64{Int64: 0, Valid: false} - - time.Sleep(time.Millisecond * 500) - err = extractUserName(wd, streamer) - if err != nil { - _, err = wd.FindElement(selenium.ByXPATH, "//a[@data-a-target='browse-channels-button']") - if err == nil { - log.Println(streamer.UserId, "may be cancell") - streamer.Operator = 5 - streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} - estore.UpdateStreamer(streamer) + wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) { + _, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]") + if err != nil { + return false, err } - continue - } - err = extractFollowers(wd, clog) - if err != nil { - continue - } + return true, nil + }, time.Second*10) - err = extractViews(wd, clog) // views + tags + gratuity + btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") if err != nil { - // 不直播时提取礼物 gratuity - wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { - channelchat, err := wd.FindElement(selenium.ByXPATH, `//a[@data-a-target="channel-home-tab-Chat"]`) - btn, _ := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) - if (err == nil && channelchat != nil) || btn != nil { - if channelchat != nil { - channelchat.Click() - } - time.Sleep(time.Second) - extractGratuity(wd, clog) - return true, nil + log.Println(err) + continue + } + btn.Click() + + var elements []selenium.WebElement + var liveurls = 0 + var delayerror = 2 + for i := 0; i < 200 && !ps.IsClose(); i++ { + elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") + if err != nil { + log.Println(err) + break + } + time.Sleep(time.Millisecond * 200) + wd.KeyDown(selenium.EndKey) + time.Sleep(time.Millisecond * 200) + wd.KeyUp(selenium.EndKey) + time.Sleep(time.Millisecond * 2000) + if len(elements) == liveurls { + delayerror-- + if delayerror <= 0 { + break } - return false, nil - - }, time.Second*4) + } else { + delayerror = 2 + } + liveurls = len(elements) + } + articles, err := wd.FindElements(selenium.ByXPATH, "//article") + if err != nil { + log.Println(err) + continue } - streamer.Platform = intimate.Ptwitch - clog.Platform = streamer.Platform - clog.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} - lastClogId := estore.InsertClog(clog) + var streamers []*intimate.Streamer + for _, article := range articles { - streamer.Operator = 10 - streamer.LatestLogUid = lastClogId - if clog.Tags != nil { - streamer.Tags = clog.Tags + e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]") + if err != nil { + log.Println(err) + continue + } + + href, err := e.GetAttribute("href") + if err != nil { + log.Println(err) + continue + } + + btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button") + if err != nil { + log.Println(err) + continue + } + + var tags []string + for _, btn := range btns { + tag, err := btn.GetAttribute("data-a-target") + if err == nil { + tags = append(tags, tag) + } + } + + streamer := &intimate.Streamer{} + + matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href) + if len(matches) == 2 { + mc := matches[1] + streamer.UserId = &mc + } else { + log.Println(href) + continue + } + + jtags, err := json.Marshal(tags) + if err != nil { + log.Println(err) + } else { + streamer.Tags = jtags + } + + streamer.Platform = intimate.Ptwitch + streamer.LiveUrl = &sql.NullString{String: href, Valid: true} + streamer.Operator = 0 + + streamers = append(streamers, streamer) + + // if estore.InsertStreamer(streamer) { + // // log.Println("streamer update tags", streamer.Uid, tags) + // if streamer.Tags != nil { + // estore.Update(streamer, "Tags", streamer.Tags) + // } + // } } - switch fl := clog.Followers.Int64; { - case fl > 100000: - streamer.UpdateInterval = 120 - case fl > 10000: - streamer.UpdateInterval = 240 * 2 - case fl > 1000: - streamer.UpdateInterval = 360 * 2 - case fl > 100: - streamer.UpdateInterval = 720 * 2 - case fl > 0: - streamer.UpdateInterval = 1440 * 4 + for _, streamer := range streamers { + Extractor(wd, streamer) + if err = intimate.TStreamer.InsertOrUpdate(streamer, + intimate.DUpdate{Field: "tags"}, + intimate.DUpdate{Field: "update_time"}, + ); err != nil { + log.Println(err) + } + } + + log.Println("streamer find", len(articles)) + if len(articles) == 0 { + intimate.TStreamerList.UpdateError(streamerlist, fmt.Errorf("")) } - streamer.UpdateTime = clog.UpdateTime - estore.UpdateStreamer(streamer) count++ if count >= countlimt { count = 0 - // wd.Quit() - wd = intimate.GetChromeDriver(3030) + wd = intimate.GetChromeDriver(3031) } + } wd.Close() wd.Quit() } +func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) { + // streamer, err := estore.Pop(intimate.Ptwitch) + // if streamer == nil || err != nil { + // if err != lasterr { + // log.Println(err, lasterr) + // lasterr = err + // } + // time.Sleep(time.Second * 2) + // continue + // } + + // var updateUrl map[string]string + // json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + liveUrl := streamer.LiveUrl.String + + liveUrl = strings.Replace(liveUrl, "/watchparty", "", -1) + log.Println(liveUrl) + + // err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about") + err := wd.Get(liveUrl + "/about") + if err != nil { + log.Println(err) + intimate.TStreamer.UpdateError(streamer, err) + time.Sleep(time.Second * 5) + return + } + + streamer.LiveUrl = &sql.NullString{String: liveUrl, Valid: true} + clog := &intimate.CollectLog{} + clog.UserId = *streamer.UserId + clog.Gratuity = &sql.NullInt64{Int64: 0, Valid: false} + + time.Sleep(time.Millisecond * 500) + err = extractUserName(wd, streamer) + if err != nil { + _, err = wd.FindElement(selenium.ByXPATH, "//a[@data-a-target='browse-channels-button']") + if err == nil { + log.Println(streamer.UserId, "may be cancell") + streamer.Operator = 5 + streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} + intimate.TStreamer.UpdateError(streamer, fmt.Errorf("")) + } + return + } + err = extractFollowers(wd, clog) + if err != nil { + return + } + + err = extractViews(wd, clog) // views + tags + gratuity + if err != nil { + // 不直播时提取礼物 gratuity + wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + channelchat, err := wd.FindElement(selenium.ByXPATH, `//a[@data-a-target="channel-home-tab-Chat"]`) + btn, _ := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) + if (err == nil && channelchat != nil) || btn != nil { + if channelchat != nil { + channelchat.Click() + } + time.Sleep(time.Second) + extractGratuity(wd, clog) + return true, nil + } + return false, nil + + }, time.Second*4) + } + + streamer.Platform = intimate.Ptwitch + clog.Platform = streamer.Platform + clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} + // clog.StreamerUid = streamer.Uid + lastClogId, err := intimate.TClog.InsertRetAutoID(clog) + if err != nil { + log.Println(err) + return + } + + streamer.Operator = 10 + streamer.LatestLogUid = lastClogId + if clog.Tags != nil { + streamer.Tags = clog.Tags + } + + switch fl := clog.Followers.Int64; { + case fl > 100000: + streamer.UpdateInterval = 120 + case fl > 10000: + streamer.UpdateInterval = 240 * 2 + case fl > 1000: + streamer.UpdateInterval = 360 * 2 + case fl > 100: + streamer.UpdateInterval = 720 * 2 + case fl > 0: + streamer.UpdateInterval = 1440 * 4 + } + + streamer.UpdateTime = clog.UpdateTime + // intimate.TStreamer.InsertOrUpdate(streamer) + // count++ + // if count >= countlimt { + // count = 0 + // // wd.Quit() + // wd = intimate.GetChromeDriver(3030) + // } +} + func extractUserName(wd selenium.WebDriver, streamer *intimate.Streamer) error { return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { label, err := web.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1") if err == nil { if ltxt, err := label.Text(); err == nil && ltxt != "" { // log.Println("label:", ltxt) - streamer.UserName = sql.NullString{String: ltxt, Valid: true} + streamer.UserName = &sql.NullString{String: ltxt, Valid: true} return true, nil } } @@ -159,7 +314,7 @@ func extractFollowers(wd selenium.WebDriver, clog *intimate.CollectLog) error { } followers = regexp.MustCompile(`[\d,]+`).FindString(followers) fint, _ := intimate.ParseNumber(followers) - clog.Followers = sql.NullInt64{Int64: int64(fint), Valid: true} + clog.Followers = &sql.NullInt64{Int64: int64(fint), Valid: true} // log.Println("followers: ", followers, fint) return true, nil }, 4*time.Second) @@ -172,7 +327,7 @@ func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error { if txt, err := views.Text(); err == nil { vint, _ := intimate.ParseNumber(txt) - clog.Views = sql.NullInt64{Int64: vint, Valid: true} + clog.Views = &sql.NullInt64{Int64: vint, Valid: true} // log.Println("views:", txt) views.Click() @@ -192,7 +347,7 @@ func extractTitle(wd selenium.WebDriver, clog *intimate.CollectLog) error { title, err := web.FindElement(selenium.ByXPATH, `//h2[@data-a-target='stream-title']`) if err == nil { if txt, err := title.Text(); err == nil { - clog.LiveTitle = sql.NullString{String: txt, Valid: true} + clog.LiveTitle = &sql.NullString{String: txt, Valid: true} return true, nil } } @@ -244,7 +399,7 @@ func extractGratuity(wd selenium.WebDriver, clog *intimate.CollectLog) error { log.Println(err) } } - clog.Gratuity = sql.NullInt64{Int64: gratuity, Valid: true} + clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true} } return true, nil } diff --git a/extractor_field.go b/extractor_field.go index 3107771..4e1b691 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -9,9 +9,9 @@ type GetSet struct { } type StreamerList struct { - UrlHash string `field:"urlhash" ` // - Platform string `field:"platform" ` // - Url string `field:"url" ` // + UrlHash string `field:"urlhash" uid:"true"` // + Platform string `field:"platform" ` // + Url string `field:"url" ` // Label *sql.NullString `field:"label" ` //