package main import ( "database/sql" "encoding/json" "intimate" "log" "os" "os/signal" "strconv" "sync/atomic" "syscall" "time" "github.com/474420502/gcurl" "github.com/tidwall/gjson" "github.com/474420502/hunter" ) var oer *OpenrecExtratorRanking // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) // estore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_extractor.sql var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func init() { oer = &OpenrecExtratorRanking{} } // OpenrecExtratorRanking 获取用户信息 type OpenrecExtratorRanking struct { // Store *intimate.Store } // Execute 执行方法 func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { var loop int32 = 1 go func() { signalchan := make(chan os.Signal) signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) log.Println("accept stop command:", <-signalchan) atomic.StoreInt32(&loop, 0) }() var lasterr error = nil for atomic.LoadInt32(&loop) > 0 { streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 if streamer == nil || err != nil { if err != lasterr { log.Println(err, lasterr) lasterr = err } time.Sleep(time.Second * 2) continue } userId := streamer.UserId var updateUrl map[string]string err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url if err != nil { log.Println(err) continue } // Check Userid userUrl := updateUrl["user"] tp := cxt.Session().Get(userUrl) // 获取user url页面数据 resp, err := tp.Execute() streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} if err != nil { log.Println(err) estore.UpdateError(streamer, err) continue } cookies := cxt.Session().GetCookies(tp.GetParsedURL()) scurl := updateUrl["supporters"] //获取打赏者的数据 curl := gcurl.ParseRawCURL(scurl) supportersSession := curl.CreateSession() temporary := curl.CreateTemporary(supportersSession) supportersSession.SetCookies(temporary.GetParsedURL(), cookies) var supporters []string for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 supportersQuery := temporary.GetQuery() for _, cookie := range cookies { if cookie.Name == "uuid" { supportersQuery.Set("Uuid", cookie.Value) continue } if cookie.Name == "token" { supportersQuery.Set("Token", cookie.Value) continue } if cookie.Name == "random" { supportersQuery.Set("Random", cookie.Value) continue } } supportersQuery.Set("identify_id", userId) temporary.SetQuery(supportersQuery) resp, err := temporary.Execute() if err != nil { log.Println(err) } supporterjson := gjson.ParseBytes(resp.Content()) supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 if supporterdata.Type == gjson.Null { break } supporters = append(supporters, string(resp.Content())) page := supportersQuery.Get("page_number") // page_number 加1 pageint, err := strconv.Atoi(page) if err != nil { log.Println(err) break } pageint++ page = strconv.Itoa(pageint) supportersQuery.Set("page_number", page) temporary.SetQuery(supportersQuery) } // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) ext := make(map[string]interface{}) ext["json_supporters"] = supporters ext["html_user"] = string(resp.Content()) liveUrl := updateUrl["live"] tp = cxt.Session().Get(liveUrl) resp, err = tp.Execute() if err != nil { log.Println(err) estore.UpdateError(streamer, err) continue } ext["html_live"] = string(resp.Content()) ext["var_user_id"] = userId extJsonBytes, err := json.Marshal(ext) if err != nil { log.Println(err) estore.UpdateError(streamer, err) continue } streamer.Operator = int32(intimate.OperatorOK) source := &intimate.Source{} source.Target = intimate.TOpenrecUser source.Ext = string(extJsonBytes) source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} sstore.Insert(source) estore.UpdateOperator(streamer) } }