diff --git a/extractor/mirrativ_extractor/mirrativ_extractor.go b/extractor/mirrativ_extractor/mirrativ_extractor.go index 06ab7d0..33015b3 100644 --- a/extractor/mirrativ_extractor/mirrativ_extractor.go +++ b/extractor/mirrativ_extractor/mirrativ_extractor.go @@ -1 +1,159 @@ package main + +import ( + "database/sql" + "encoding/json" + "intimate" + "log" + "time" + + "github.com/474420502/gcurl" + "github.com/tidwall/gjson" +) + +func main() { + + sessionstr := ` + -H 'authority: www.mirrativ.com' + -H 'accept: application/json' + -H 'x-timezone: Asia/Shanghai' + -H 'x-csrf-token: F3Ojd6RBtApP6YAZzVn-9jWN1of159VxAqOQL1Zn' + -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36' + -H 'content-type: application/json' + -H 'sec-fetch-site: same-origin' + -H 'sec-fetch-mode: cors' + -H 'sec-fetch-dest: empty' + -H 'referer: https://www.mirrativ.com/live/O5Ia4iX9c5CeZj7DFtg52Q' + -H 'accept-language: zh-CN,zh;q=0.9,ja;q=0.8' + -H 'cookie: f=A2D75F0E-D218-11EA-A042-452BF6D21CE8; _ga=GA1.2.689947597.1596081392; mr_id=kxb65LddGMZf5C28jkR_tGCZD_ZFOAepD5gfXO7eNjfPMB8EKYvU1Vg_Y29V1lsa; _gid=GA1.2.2116692650.1600139685; lang=ja'` + + ps := intimate.NewPerfectShutdown() + gprofile := gcurl.Parse(`curl 'https://www.mirrativ.com/api/user/profile?user_id=103383701'` + sessionstr) + tpProfile := gprofile.CreateTemporary(nil) + tpProfileUserID := tpProfile.QueryParam("user_id") + + g := gcurl.Parse(`culr 'https://www.mirrativ.com/api/live/live?live_id=O5Ia4iX9c5CeZj7DFtg52Q'` + sessionstr) + tpLive := g.CreateTemporary(nil) + tpLiveID := tpLive.QueryParam("live_id") + + var lasterr error + queue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.PMirrativ)) + for !ps.IsClose() { + istreamer, err := queue.Pop() + if err != nil { + if lasterr != err { + lasterr = err + log.Println(err) + } + ps.Wait(time.Second * 5) + continue + } + + now := &sql.NullTime{Time: time.Now(), Valid: true} + streamer := istreamer.(*intimate.Streamer) + streamer.UpdateTime = now + userid := *streamer.UserId + log.Println(userid) + + tpProfileUserID.StringSet(userid) + resp, err := tpProfile.Execute() + if err != nil { + log.Println(err) + time.Sleep(time.Second) + continue + } + + clog := &intimate.CollectLog{} + clog.Platform = intimate.PMirrativ + clog.UpdateTime = now + clog.UserId = userid + clog.StreamerUid = streamer.Uid + + profilejson := gjson.ParseBytes(resp.Content()) + if result := profilejson.Get("follower_num"); result.Exists() { + clog.Followers = &sql.NullInt64{Int64: result.Int(), Valid: true} + } + + if result := profilejson.Get("onlive.live_id"); result.Exists() { + liveID := result.String() + tpLiveID.StringSet(liveID) + resp, err = tpLive.Execute() + if err != nil { + log.Println(err) + continue + } + + livejson := gjson.ParseBytes(resp.Content()) + if result := livejson.Get("total_viewer_num"); result.Exists() { + clog.Views = &sql.NullInt64{Int64: result.Int(), Valid: true} + } + + if result := livejson.Get("title"); result.Exists() { + clog.LiveTitle = &sql.NullString{String: result.String(), Valid: true} + } + + if result := livejson.Get("started_at"); result.Exists() { + clog.LiveStartTime = &sql.NullTime{Time: time.Unix(result.Int(), 0), Valid: true} + } + + if result := livejson.Get("heartbeated_at"); result.Exists() { + clog.LiveEndTime = &sql.NullTime{Time: time.Unix(result.Int(), 0), Valid: true} + } + + if result := livejson.Get("app_id"); result.Exists() { + streamer.Channel = &sql.NullString{String: result.String(), Valid: true} + } + + if result := livejson.Get("timeline.#.app.short_title"); result.Exists() { + for _, tl := range result.Array() { + var tags []string = []string{tl.String()} + jtags, _ := json.Marshal(tags) + streamer.Tags = jtags + clog.Tags = jtags + break + } + } else { + log.Println(string(resp.Content())) + return + } + + if result := livejson.Get("gift_ranking_url"); result.Exists() { + // streamer.Channel = &sql.NullString{String: result.String(), Valid: true} + gifturl := "curl '" + result.String() + "&type=monthly&cursor='" + sessionstr + ggift := gcurl.Parse(gifturl) + tp := ggift.CreateTemporary(nil) + tp.SetURLRawPath("/api/gift/ranking") + pcursor := tp.QueryParam("cursor") + var gratuity int64 = 0 + + for { + giftdata, err := tp.Execute() + giftjson := gjson.ParseBytes(giftdata.Content()) + if err != nil { + log.Println(err) + } else { + for _, rpoint := range giftjson.Get("ranking.#.point").Array() { + gratuity += rpoint.Int() + } + } + ncursor := giftjson.Get("next_cursor").String() + if ncursor == "" { + break + } + pcursor.StringSet(ncursor) + } + // https://www.mirrativ.com/gift/ranking?live_id=O5Ia4iX9c5CeZj7DFtg52Q&obfuscated_user_id=PgIBEgc6jVc + clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true} + } + + cid, err := intimate.TClog.InsertRetAutoID(clog) + if err != nil { + log.Println(err) + } + streamer.LatestLogUid = cid + } + + intimate.TStreamer.Update(streamer) + time.Sleep(time.Second * 2) + } +} diff --git a/extractor/mirrativ_extractor/mirrativ_extractor_test.go b/extractor/mirrativ_extractor/mirrativ_extractor_test.go index acc2abd..b443cf2 100644 --- a/extractor/mirrativ_extractor/mirrativ_extractor_test.go +++ b/extractor/mirrativ_extractor/mirrativ_extractor_test.go @@ -1,159 +1,9 @@ package main import ( - "database/sql" - "encoding/json" - "intimate" - "log" "testing" - "time" - - "github.com/474420502/gcurl" - "github.com/tidwall/gjson" ) -func main() { - - sessionstr := ` - -H 'authority: www.mirrativ.com' - -H 'accept: application/json' - -H 'x-timezone: Asia/Shanghai' - -H 'x-csrf-token: F3Ojd6RBtApP6YAZzVn-9jWN1of159VxAqOQL1Zn' - -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36' - -H 'content-type: application/json' - -H 'sec-fetch-site: same-origin' - -H 'sec-fetch-mode: cors' - -H 'sec-fetch-dest: empty' - -H 'referer: https://www.mirrativ.com/live/O5Ia4iX9c5CeZj7DFtg52Q' - -H 'accept-language: zh-CN,zh;q=0.9,ja;q=0.8' - -H 'cookie: f=A2D75F0E-D218-11EA-A042-452BF6D21CE8; _ga=GA1.2.689947597.1596081392; mr_id=kxb65LddGMZf5C28jkR_tGCZD_ZFOAepD5gfXO7eNjfPMB8EKYvU1Vg_Y29V1lsa; _gid=GA1.2.2116692650.1600139685; lang=ja'` - - ps := intimate.NewPerfectShutdown() - gprofile := gcurl.Parse(`curl 'https://www.mirrativ.com/api/user/profile?user_id=103383701'` + sessionstr) - tpProfile := gprofile.CreateTemporary(nil) - tpProfileUserID := tpProfile.QueryParam("user_id") - - g := gcurl.Parse(`culr 'https://www.mirrativ.com/api/live/live?live_id=O5Ia4iX9c5CeZj7DFtg52Q'` + sessionstr) - tpLive := g.CreateTemporary(nil) - tpLiveID := tpLive.QueryParam("live_id") - - queue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.PMirrativ)) - for !ps.IsClose() { - istreamer, err := queue.Pop() - if err != nil { - log.Println(err) - time.Sleep(time.Second * 2) - continue - } - now := &sql.NullTime{Time: time.Now(), Valid: true} - streamer := istreamer.(*intimate.Streamer) - streamer.UpdateTime = now - userid := *streamer.UserId - log.Println(userid) - - tpProfileUserID.StringSet(userid) - resp, err := tpProfile.Execute() - if err != nil { - log.Println(err) - time.Sleep(time.Second) - continue - } - - clog := &intimate.CollectLog{} - clog.Platform = intimate.PMirrativ - clog.UpdateTime = now - clog.UserId = userid - clog.StreamerUid = streamer.Uid - - profilejson := gjson.ParseBytes(resp.Content()) - if result := profilejson.Get("follower_num"); result.Exists() { - clog.Followers = &sql.NullInt64{Int64: result.Int(), Valid: true} - } - - if result := profilejson.Get("onlive.live_id"); result.Exists() { - liveID := result.String() - tpLiveID.StringSet(liveID) - resp, err = tpLive.Execute() - if err != nil { - log.Println(err) - continue - } - - livejson := gjson.ParseBytes(resp.Content()) - if result := livejson.Get("total_viewer_num"); result.Exists() { - clog.Views = &sql.NullInt64{Int64: result.Int(), Valid: true} - } - - if result := livejson.Get("title"); result.Exists() { - clog.LiveTitle = &sql.NullString{String: result.String(), Valid: true} - } - - if result := livejson.Get("started_at"); result.Exists() { - clog.LiveStartTime = &sql.NullTime{Time: time.Unix(result.Int(), 0), Valid: true} - } - - if result := livejson.Get("heartbeated_at"); result.Exists() { - clog.LiveEndTime = &sql.NullTime{Time: time.Unix(result.Int(), 0), Valid: true} - } - - if result := livejson.Get("app_id"); result.Exists() { - streamer.Channel = &sql.NullString{String: result.String(), Valid: true} - } - - if result := livejson.Get("timeline.#.app.short_title"); result.Exists() { - for _, tl := range result.Array() { - var tags []string = []string{tl.String()} - jtags, _ := json.Marshal(tags) - streamer.Tags = jtags - clog.Tags = jtags - break - } - } else { - log.Println(string(resp.Content())) - return - } - - if result := livejson.Get("gift_ranking_url"); result.Exists() { - // streamer.Channel = &sql.NullString{String: result.String(), Valid: true} - gifturl := "curl '" + result.String() + "&type=monthly&cursor='" + sessionstr - ggift := gcurl.Parse(gifturl) - tp := ggift.CreateTemporary(nil) - tp.SetURLRawPath("/api/gift/ranking") - pcursor := tp.QueryParam("cursor") - var gratuity int64 = 0 - - for { - giftdata, err := tp.Execute() - giftjson := gjson.ParseBytes(giftdata.Content()) - if err != nil { - log.Println(err) - } else { - for _, rpoint := range giftjson.Get("ranking.#.point").Array() { - gratuity += rpoint.Int() - } - } - ncursor := giftjson.Get("next_cursor").String() - if ncursor == "" { - break - } - pcursor.StringSet(ncursor) - } - // https://www.mirrativ.com/gift/ranking?live_id=O5Ia4iX9c5CeZj7DFtg52Q&obfuscated_user_id=PgIBEgc6jVc - clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true} - } - - cid, err := intimate.TClog.InsertRetAutoID(clog) - if err != nil { - log.Println(err) - } - streamer.LatestLogUid = cid - } - intimate.TStreamer.Update(streamer) - - time.Sleep(time.Second * 2) - } -} - func TestDo(t *testing.T) { main() } diff --git a/supervisor_conf/clear_log.sh b/supervisor_conf/clear_log.sh index 1cce37f..61892e8 100644 --- a/supervisor_conf/clear_log.sh +++ b/supervisor_conf/clear_log.sh @@ -1,6 +1,6 @@ CURPATH=`pwd` BINPATH=$(dirname "$CURPATH")/bin -find $BINPATH -type f -name 'log' -exec rm {} + +find $BINPATH -type f -name 'log' -exec truncate -s 0 {} + diff --git a/supervisor_conf/mirrativ_extractor.conf b/supervisor_conf/mirrativ_extractor.conf new file mode 100644 index 0000000..4036857 --- /dev/null +++ b/supervisor_conf/mirrativ_extractor.conf @@ -0,0 +1,10 @@ +[supervisord] +nodaemon=true + +[program:mirrativ_extractor] +directory = MYPATH/bin/mirrativ_extractor/ +command= MYPATH/bin/mirrativ_extractor/mirrativ_extractor +autorestart=true +stderr_logfile=MYPATH/bin/mirrativ_extractor/log +stderr_logfile_maxbytes=0 +stopsignal=QUIT diff --git a/supervisor_conf/mirrativ_task1.conf b/supervisor_conf/mirrativ_task1.conf new file mode 100644 index 0000000..f945ee6 --- /dev/null +++ b/supervisor_conf/mirrativ_task1.conf @@ -0,0 +1,13 @@ +[supervisord] +nodaemon=false + +[program:mirrativ_task1] +environment=DISPLAY=":99" +directory = MYPATH/bin/mirrativ_task1/ +command= MYPATH/bin/mirrativ_task1/mirrativ_task1 +# process_name=%(program_name)s_%(process_num)02d ;多进程名称 +# numprocs=1 ;启动多个进程 +autorestart=true +stderr_logfile=MYPATH/bin/mirrativ_task1/log +stderr_logfile_maxbytes=0 +stopsignal=QUIT diff --git a/tasks/mirrativ/mirrativ_task1/mirrativ_task1.go b/tasks/mirrativ/mirrativ_task1/mirrativ_task1.go index 5fd7146..84a756d 100644 --- a/tasks/mirrativ/mirrativ_task1/mirrativ_task1.go +++ b/tasks/mirrativ/mirrativ_task1/mirrativ_task1.go @@ -31,7 +31,8 @@ func main() { cursor := tp.QueryParam(`cursor`) cursor.StringSet("") - for { + ps := intimate.NewPerfectShutdown() + for !ps.IsClose() { log.Println(tp.ParsedURL.String()) resp, err := tp.Execute() @@ -58,10 +59,14 @@ func main() { streamer.Operator = 0 streamer.UserId = &guserid.Str streamer.UserName = &sql.NullString{String: owner.Get("name").String(), Valid: true} - streamer.UpdateInterval = 60 + streamer.UpdateInterval = 600 streamer.UpdateTime = intimate.GetUpdateTimeNow() - err = intimate.TStreamer.Insert(streamer) + err = intimate.TStreamer.InsertOrUpdate( + streamer, + intimate.DUpdate{Field: "update_time"}, + ) + if err != nil { log.Println(err) panic(err) @@ -69,9 +74,9 @@ func main() { } } if next == "" { - time.Sleep(time.Minute * 5) + ps.Wait(time.Minute * 10) } else { - time.Sleep(time.Second * 2) + ps.Wait(time.Second * 2) } cursor.StringSet(next) } diff --git a/utils.go b/utils.go index edbd926..8079c0f 100644 --- a/utils.go +++ b/utils.go @@ -110,16 +110,13 @@ func (adriver *AutoCloseDriver) Close() { // log.Println(string(data)) killshell := fmt.Sprintf("pkill -9 -P %s", data) - // log.Println(killshell) - // pkill -f \"port=%d\" - // log.Printf(fmt.Sprintf("kill -9 $(lsof -t -i:%d)", port)) err = exec.Command("/bin/bash", "-c", killshell).Run() if err != nil { log.Println(err) return } - err = exec.Command("/bin/bash", "-c", fmt.Sprintf("kill %s", data)).Run() + err = exec.Command("/bin/bash", "-c", fmt.Sprintf("kill -9 %s", data)).Run() if err != nil { log.Println(err) return @@ -134,8 +131,6 @@ func GetChromeDriver() *AutoCloseDriver { caps := selenium.Capabilities{"browserName": "chrome"} chromecaps := chrome.Capabilities{} - - // chromecaps.AddExtension("/home/eson/test/myblock.crx") for _, epath := range []string{"../../../crx/myblock.crx", "../../crx/myblock.crx"} { _, err := os.Stat(epath) if err == nil { @@ -182,23 +177,6 @@ func GetChromeDriver() *AutoCloseDriver { adriver.Port = port adriver.Webdriver = wd - // runtime.SetFinalizer(adriver, func(obj interface{}) { - - // adriver := obj.(*AutoCloseDriver) - // adriver.Webdriver.Close() - // adriver.Webdriver.Quit() - - // killshell := fmt.Sprintf("pkill -P `pgrep -f 'port=%d '` && pkill -f 'port=%d '", port, port) - // log.Println(killshell) - - // // log.Printf(fmt.Sprintf("kill -9 $(lsof -t -i:%d)", port)) - // // cmd := exec.Command("sh", "-c", killshell) - // // err = cmd.Run() - // // if err != nil { - // // log.Println(err) - // // } - // }) - wd.ExecuteScript("windows.navigator.webdriver = undefined", nil) if err != nil { panic(err)