package main import ( "database/sql" "encoding/json" "intimate" "log" "strconv" "strings" "time" "github.com/474420502/extractor" "github.com/474420502/requests" ) // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) // estore 解析存储连接实例 var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() type LiveData struct { UserName string `exp:"//span[@class='tw-live-author__info-username']" method:"Text"` Follower int64 `exp:"(//span[@class='tw-user-nav-list-count'])[2]" method:"r:ExtractNumber"` MaxViews int64 `exp:"//span[@id='max_viewer_count']/text()" method:"r:ExtractNumber"` LiveTitle string `exp:"//meta[@property='og:title']" method:"AttributeValue,content"` LiveStart string `exp:"//time[@data-kind='relative']" method:"AttributeValue,datetime"` LiveDuration string `exp:"//span[@id='updatetimer']" method:"AttributeValue,data-duration"` Tags []string `exp:"//div[@class='tw-live-author__commandbox--tags']//a[@class='tag tag-info']" method:"Text"` } func main() { ps := intimate.NewPerfectShutdown() ses := requests.NewSession() streamerQueue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Ptwitcasting)) var lasterr error for !ps.IsClose() { // streamer, err := estore.Pop(intimate.Ptwitcasting) isteamer, err := streamerQueue.Pop() if err != nil { if lasterr != err { lasterr = err log.Println(err) } time.Sleep(time.Minute) continue } streamer := isteamer.(*intimate.Streamer) streamer.LiveUrl = &sql.NullString{String: "https://twitcasting.tv/" + *streamer.UserId, Valid: true} resp, err := ses.Get(streamer.LiveUrl.String).Execute() if err != nil { intimate.TStreamer.UpdateError(streamer, err) log.Println(err, *streamer.UserId) continue } var ldata *LiveData // f, _ := os.OpenFile("./twistcasting.html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) // f.Write(resp.Content()) etor := extractor.ExtractHtml(resp.Content()) ildata := etor.GetObjectByTag(LiveData{}) if ildata == nil { log.Println(streamer.LiveUrl.String) continue } ldata = ildata.(*LiveData) // ldata.MaxViews = regexp.MustCompile("\\d+").FindString(ldata.MaxViews) coincount := 0 for i := 0; ; i++ { giverurl := streamer.LiveUrl.String + "/backers/" + strconv.Itoa(i) resp, err = ses.Get(giverurl).Execute() if err != nil { intimate.TStreamer.UpdateError(streamer, err) log.Panic(err) } etor := extractor.ExtractHtml(resp.Content()) xp, err := etor.XPaths("//td[@class='tw-memorial-table-recent-point']") if err != nil { intimate.TStreamer.UpdateError(streamer, err) log.Panic(err) } coins := xp.GetTexts() for _, cointxt := range coins { scointxt := strings.Split(cointxt, "/") if len(scointxt) == 2 { coin := strings.Trim(scointxt[1], " ") c, err := strconv.Atoi(coin) if err == nil { coincount += c } // log.Println(coin, coincount) } else { log.Println("coin error: ", cointxt) } } if len(coins) < 20 { break } } var tags []byte tags, err = json.Marshal(ldata.Tags) if err != nil { log.Println(err, streamer.UserId) } streamer.Platform = intimate.Ptwitcasting streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} streamer.UserName = &sql.NullString{String: ldata.UserName, Valid: true} streamer.Operator = 0 streamer.Tags = tags // streamer.UpdateInterval = 60 clog := &intimate.CollectLog{} clog.UserId = *streamer.UserId clog.Gratuity = &sql.NullInt64{Int64: int64(coincount), Valid: true} clog.Platform = streamer.Platform clog.UpdateTime = streamer.UpdateTime clog.LiveTitle = &sql.NullString{String: ldata.LiveTitle, Valid: true} clog.Tags = tags clog.Followers = &sql.NullInt64{Int64: int64(ldata.Follower), Valid: true} switch { case ldata.Follower <= 100: streamer.UpdateInterval = 720 case ldata.Follower <= 1000: streamer.UpdateInterval = 320 case ldata.Follower <= 10000: streamer.UpdateInterval = 240 default: streamer.UpdateInterval = 120 } clog.Views = &sql.NullInt64{Int64: ldata.MaxViews, Valid: true} if ldata.LiveStart != "" { st, err := time.Parse("Mon, 02 Jan 2006 15:04:05 -0700", ldata.LiveStart) if err == nil { startTime := st clog.LiveStartTime = &sql.NullTime{Time: startTime, Valid: true} dt, err := strconv.Atoi(ldata.LiveDuration) liveduration := time.Now().Sub(startTime) switch { case liveduration >= time.Hour*24*240: streamer.Operator = 5 case liveduration >= time.Hour*24*60: streamer.UpdateInterval = 60 * 24 * 30 case liveduration >= time.Hour*24*30: streamer.UpdateInterval = 60 * 24 * 15 case liveduration >= time.Hour*24*15: streamer.UpdateInterval = 60 * 24 * 7 case liveduration >= time.Hour*24*7: streamer.UpdateInterval = 60 * 24 * 3 } if err == nil { endTime := startTime.Add((time.Duration)(dt) * time.Millisecond) clog.LiveEndTime = &sql.NullTime{Time: endTime, Valid: true} } else { log.Println(err, streamer.UserId) } } else { log.Println(err, streamer.UserId) } } clog.StreamerUid = streamer.Uid uid, err := intimate.TClog.InsertRetAutoID(clog) if err != nil { log.Println(err) continue } streamer.LatestLogUid = uid intimate.TStreamer.Update(streamer) // estore.UpdateStreamer(streamer) log.Println(*streamer.UserId) } }