package main import ( "database/sql" "encoding/json" "errors" "intimate" "log" "time" "github.com/474420502/extractor" "github.com/474420502/gcurl" "github.com/474420502/requests" "github.com/tidwall/gjson" ) //UserInfo 提取信息的结构体 type UserInfo struct { UserName string `exp:"//p[ contains(@class, 'c-global__user__profile__list__name__text')]"` Followers int64 `exp:"//p[@class='c-global__user__count__row__right js-userCountFollowers']" mth:"r:ParseNumber"` Views int64 `exp:"//ul[@class='c-contents']//p[@class='c-thumbnailVideo__footer__liveCount']" mth:"r:ExtractNumber"` } //UserLive 提取信息的结构体 type UserLive struct { Title string `exp:"//h1[contains(@class,'MovieTitle__Title')]"` LiveStartTime string `exp:"//meta[@itemprop='uploadDate']/@content"` LiveEndTime string `exp:"//meta[@itemprop='duration']/@content"` Tags []string `exp:"//div[contains(@class,'MovieMetaContent__TagContainer')]//a[@role ='button']"` } // Execute 执行 func Execute() { ps := intimate.NewPerfectShutdown() ses := requests.NewSession() squeue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Popenrec)) var lasterr error = nil for !ps.IsClose() { istreamer, err := squeue.Pop() // streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 if istreamer == nil || err != nil { if err != lasterr { log.Println(err, lasterr) lasterr = err } time.Sleep(time.Second * 2) continue } streamer := istreamer.(*intimate.Streamer) userId := *streamer.UserId var updateUrl map[string]string err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url if err != nil { log.Println(err) continue } // Check Userid userUrl := updateUrl["user"] log.Println(userUrl) tp := ses.Get(userUrl) // 获取user url页面数据 resp, err := tp.Execute() streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} if err != nil { log.Println(err) intimate.TStreamer.UpdateError(streamer, err) continue } cookies := ses.GetCookies(tp.GetParsedURL()) scurl := updateUrl["supporters"] //获取打赏者的数据 curl := gcurl.Parse(scurl) supportersSession := curl.CreateSession() temporary := curl.CreateTemporary(supportersSession) supportersSession.SetCookies(temporary.GetParsedURL(), cookies) var supporters []string for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 supportersQuery := temporary.GetQuery() for _, cookie := range cookies { if cookie.Name == "uuid" { supportersQuery.Set("Uuid", cookie.Value) continue } if cookie.Name == "token" { supportersQuery.Set("Token", cookie.Value) continue } if cookie.Name == "random" { supportersQuery.Set("Random", cookie.Value) continue } } supportersQuery.Set("identify_id", userId) temporary.SetQuery(supportersQuery) resp, err := temporary.Execute() if err != nil { log.Println(err) } supporterjson := gjson.ParseBytes(resp.Content()) supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 if supporterdata.Type == gjson.Null { break } supporters = append(supporters, string(resp.Content())) temporary.QueryParam("page_number").IntAdd(1) } // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) // ext := make(map[string]interface{}) jsonSupporters := supporters htmlUser := string(resp.Content()) liveUrl := updateUrl["live"] tp = ses.Get(liveUrl) resp, err = tp.Execute() if err != nil { log.Println(err) intimate.TStreamer.UpdateError(streamer, err) continue } htmlLive := string(resp.Content()) // ext["var_user_id"] = userId // streamer.Platform = intimate.Popenrec streamer.UpdateInterval = 120 streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} streamer.Operator = 0 Extractor(streamer, userId, htmlUser, htmlLive, jsonSupporters) } } func Extractor(streamer *intimate.Streamer, userId string, htmlUser, htmlLive string, jsonSupporters []string) { // sdata := source.Ext.([]byte) // datamap := gjson.ParseBytes(sdata).Map() // userId := datamap["var_user_id"].String() // streamer := &intimate.Streamer{} // streamer.UserId = &userId // streamer.Platform = intimate.Popenrec 不需要更新字段 // htmlUser := datamap["html_user"] userEtor := extractor.ExtractHtmlString(htmlUser) ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo) // htmlLive := datamap["html_live"] liveEtor := extractor.ExtractHtmlString(htmlLive) ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive) // jsonSupporters := datamap["json_supporters"] clog := &intimate.CollectLog{} if ok1 { clog.Followers = &sql.NullInt64{Int64: ui.Followers, Valid: true} clog.Views = &sql.NullInt64{Int64: ui.Views, Valid: true} if ui.Views != 0 { clog.IsLiveStreaming = true } streamer.UserName = &sql.NullString{String: ui.UserName, Valid: true} // giverjson := jsonSupporters var givers []interface{} var gratuity int64 = 0 for _, v := range jsonSupporters { giverSource := gjson.Parse(v) for _, item := range giverSource.Get("data.items").Array() { givers = append(givers, item.Map()) gratuity += item.Get("total_yells").Int() } } giversbytes, err := json.Marshal(givers) if err != nil { log.Println(err) clog.ErrorMsg = &sql.NullString{String: err.Error(), Valid: true} } else { clog.Giver = giversbytes } clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true} } else { log.Println("UserInfo may be not exists") intimate.TStreamer.UpdateError(streamer, errors.New("UserInfo may be not exists")) return } //log.Println(ul) if ok2 { clog.LiveTitle = &sql.NullString{String: ul.Title, Valid: true} startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local) if err != nil { log.Println(err) } else { clog.LiveStartTime = &sql.NullTime{Time: startTime.Local(), Valid: true} duration, err := intimate.ParseDuration(ul.LiveEndTime) if err != nil { log.Println(err) } else { endTime := startTime.Add(duration) clog.LiveEndTime = &sql.NullTime{Time: endTime.Local(), Valid: true} } } if tags, err := json.Marshal(ul.Tags); err == nil { clog.Tags = tags } else { log.Println("json error", ul.Tags, clog.Tags) } } // streamer.Uid = source.StreamerId.Int64 // streamer.UpdateTime = &source.UpdateTime if clog.Tags != nil { streamer.Tags = clog.Tags } clog.Platform = intimate.Popenrec clog.UserId = userId clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} clog.StreamerUid = streamer.Uid logUid, err := intimate.TClog.InsertRetAutoID(clog) if err != nil { log.Println(err) return } LiveUrl := "https://www.openrec.tv/live/" + userId streamer.LiveUrl = &sql.NullString{String: LiveUrl, Valid: true} streamer.LatestLogUid = logUid // streamer.Operator = 0 // log.Println(*streamer.UserId) intimate.TStreamer.Update(streamer) // source.Operator = int32(intimate.OperatorExtractorOK) // sstore.UpdateOperator(source) }