intimate/extractor/openrec_extractor/openrec_extractor.go
2020-09-17 18:55:01 +08:00

262 lines
7.2 KiB
Go

package main
import (
"database/sql"
"encoding/json"
"errors"
"intimate"
"log"
"time"
"github.com/474420502/extractor"
"github.com/474420502/gcurl"
"github.com/474420502/requests"
"github.com/tidwall/gjson"
)
//UserInfo 提取信息的结构体
type UserInfo struct {
UserName string `exp:"//p[ contains(@class, 'c-global__user__profile__list__name__text')]"`
Followers int64 `exp:"//p[@class='c-global__user__count__row__right js-userCountFollowers']" mth:"r:ParseNumber"`
PCU int64 `exp:"//ul[@class='c-contents']//p[@class='c-thumbnailVideo__footer__liveCount']" mth:"r:ExtractNumber"`
}
//UserLive 提取信息的结构体
type UserLive struct {
Title string `exp:"//h1[contains(@class,'MovieTitle__Title')]"`
LiveStartTime string `exp:"//meta[@itemprop='uploadDate']/@content"`
LiveEndTime string `exp:"//meta[@itemprop='duration']/@content"`
Tags []string `exp:"//div[contains(@class,'MovieMetaContent__TagContainer')]//a[@role ='button']"`
Views int64 `exp:"//meta[@itemprop='interactionCount']/@content"`
}
// Execute 执行
func Execute() {
ps := intimate.NewPerfectShutdown()
ses := requests.NewSession()
squeue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Popenrec))
var lasterr error = nil
for !ps.IsClose() {
istreamer, err := squeue.Pop()
// streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析
if istreamer == nil || err != nil {
if err != lasterr {
log.Println(err, lasterr)
lasterr = err
}
time.Sleep(time.Second * 2)
continue
}
streamer := istreamer.(*intimate.Streamer)
userId := *streamer.UserId
var updateUrl map[string]string
err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url
if err != nil {
log.Println(err)
continue
}
// Check Userid
userUrl := updateUrl["user"]
log.Println(userUrl)
tp := ses.Get(userUrl) // 获取user url页面数据
resp, err := tp.Execute()
streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
if err != nil {
log.Println(err)
intimate.TStreamer.UpdateError(streamer, err)
continue
}
cookies := ses.GetCookies(tp.GetParsedURL())
scurl := updateUrl["supporters"] //获取打赏者的数据
curl := gcurl.Parse(scurl)
supportersSession := curl.CreateSession()
temporary := curl.CreateTemporary(supportersSession)
supportersSession.SetCookies(temporary.GetParsedURL(), cookies)
var supporters []string
for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码
supportersQuery := temporary.GetQuery()
for _, cookie := range cookies {
if cookie.Name == "uuid" {
supportersQuery.Set("Uuid", cookie.Value)
continue
}
if cookie.Name == "token" {
supportersQuery.Set("Token", cookie.Value)
continue
}
if cookie.Name == "random" {
supportersQuery.Set("Random", cookie.Value)
continue
}
}
supportersQuery.Set("identify_id", userId)
temporary.SetQuery(supportersQuery)
resp, err := temporary.Execute()
if err != nil {
log.Println(err)
}
supporterjson := gjson.ParseBytes(resp.Content())
supporterdata := supporterjson.Get("data") //解析supporters获取的json数据
if supporterdata.Type == gjson.Null {
break
}
supporters = append(supporters, string(resp.Content()))
temporary.QueryParam("page_number").IntAdd(1)
}
// cookies := cxt.Session().GetCookies(wf.GetParsedURL())
// ext := make(map[string]interface{})
jsonSupporters := supporters
htmlUser := string(resp.Content())
liveUrl := updateUrl["live"]
tp = ses.Get(liveUrl)
resp, err = tp.Execute()
if err != nil {
log.Println(err)
intimate.TStreamer.UpdateError(streamer, err)
continue
}
htmlLive := string(resp.Content())
// ext["var_user_id"] = userId
// streamer.Platform = intimate.Popenrec
streamer.UpdateInterval = 120
streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
streamer.Operator = 0
Extractor(streamer, userId, htmlUser, htmlLive, jsonSupporters)
}
}
func Extractor(streamer *intimate.Streamer, userId string, htmlUser, htmlLive string, jsonSupporters []string) {
// sdata := source.Ext.([]byte)
// datamap := gjson.ParseBytes(sdata).Map()
// userId := datamap["var_user_id"].String()
// streamer := &intimate.Streamer{}
// streamer.UserId = &userId
// streamer.Platform = intimate.Popenrec 不需要更新字段
// htmlUser := datamap["html_user"]
userEtor := extractor.ExtractHtmlString(htmlUser)
ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo)
// htmlLive := datamap["html_live"]
liveEtor := extractor.ExtractHtmlString(htmlLive)
ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive)
// jsonSupporters := datamap["json_supporters"]
clog := &intimate.CollectLog{}
if ok1 {
clog.Followers = &sql.NullInt64{Int64: ui.Followers, Valid: true}
clog.PCU = &sql.NullInt64{Int64: ui.PCU, Valid: true}
streamer.UserName = &sql.NullString{String: ui.UserName, Valid: true}
// giverjson := jsonSupporters
var givers []interface{}
var gratuity int64 = 0
for _, v := range jsonSupporters {
giverSource := gjson.Parse(v)
for _, item := range giverSource.Get("data.items").Array() {
givers = append(givers, item.Map())
gratuity += item.Get("total_yells").Int()
}
}
giversbytes, err := json.Marshal(givers)
if err != nil {
log.Println(err)
clog.ErrorMsg = &sql.NullString{String: err.Error(), Valid: true}
} else {
clog.Giver = giversbytes
}
clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true}
} else {
log.Println("UserInfo may be not exists")
intimate.TStreamer.UpdateError(streamer, errors.New("UserInfo may be not exists"))
return
}
//log.Println(ul)
if ok2 {
clog.Views = &sql.NullInt64{Int64: ul.Views, Valid: true}
clog.LiveTitle = &sql.NullString{String: ul.Title, Valid: true}
startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local)
if err != nil {
log.Println(err)
} else {
clog.LiveStartTime = &sql.NullTime{Time: startTime.Local(), Valid: true}
duration, err := intimate.ParseDuration(ul.LiveEndTime)
if err != nil {
log.Println(err)
} else {
endTime := startTime.Add(duration)
clog.LiveEndTime = &sql.NullTime{Time: endTime.Local(), Valid: true}
}
}
if tags, err := json.Marshal(ul.Tags); err == nil {
clog.Tags = tags
} else {
log.Println("json error", ul.Tags, clog.Tags)
}
}
// streamer.Uid = source.StreamerId.Int64
// streamer.UpdateTime = &source.UpdateTime
if clog.Tags != nil {
streamer.Tags = clog.Tags
}
clog.Platform = intimate.Popenrec
clog.UserId = userId
clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
clog.StreamerUid = streamer.Uid
logUid, err := intimate.TClog.InsertRetAutoID(clog)
if err != nil {
log.Println(err)
return
}
LiveUrl := "https://www.openrec.tv/live/" + userId
streamer.LiveUrl = &sql.NullString{String: LiveUrl, Valid: true}
streamer.LatestLogUid = logUid
// streamer.Operator = 0
// log.Println(*streamer.UserId)
intimate.TStreamer.Update(streamer)
// source.Operator = int32(intimate.OperatorExtractorOK)
// sstore.UpdateOperator(source)
}