diff --git a/.gitignore b/.gitignore index 331f9bc..1796cca 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,6 @@ screenlog.* intimate *.gz debug.test +myblock diff --git a/crx/0.1.2_0.crx b/crx/0.1.2_0.crx deleted file mode 100644 index 54ac5ce..0000000 Binary files a/crx/0.1.2_0.crx and /dev/null differ diff --git a/crx/0.1.2_0.pem b/crx/0.1.2_0.pem deleted file mode 100644 index 6b412b5..0000000 --- a/crx/0.1.2_0.pem +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC4DYEiDPfw094p -axusu7+kP9J9waL3+794PORnMoBnoKEBuEAfeWLZPtX338nq3dmyiVvNsNKFimcH -vVFye5uHdFOfXZ7f6oIBzeY0616dhcASjV7nj0HkuhDhZQGvBWegQrFJrQQDG99T -kXv3mH8e4W6qvwpyRedXqQJC3gE0LOKvUGlWABQwwq9NF5Uf6vJyXYMv7uBMwpWc -liAtJ6v9742IIGjmZhhaQELliOUucbaeilzxJ29XyVXGhEXNERAXbswSGectHvoz -HsBo4YVzKaR2b8wRXGT1W7am1MwFmOzZcX8tQAXTnoThsAL5tLHi+HJ5QRNbSf6h -C44x8bcdAgMBAAECggEAW+G5/pK00FAkj06+MRxFTqzh2w/o2J2f91mGuJH4kkMZ -Pa+Kq2vA0i1RSf45YfvAqyVxZB0K68mtJ6r2Vw2oFhYXjO6C2svKfTYZ339E66SS -v3A92aGlxpawyKTRE1vCYLoKoXozD45BjgmJ9o/1nifyRGE8yNFm7VcdHt6PgUix -914dJWeSwF94tnRqPJwfOXJkpTXdiWQunGWBOH2nK6y/r2xlLiR4EXDA/4LwMegh -5XHWA4YOG0jQc5a/U5w+899/JKvduo5ZU738jKrtcqD8b2G76R+VTxzbv11ROm6E -AMo1nTHRtbPAKDbSSPWgrzjxQPVGbJPJ+BnzF2V3rQKBgQDnC8OxPuhFvEdjmijW -1lPWkB4NBJ9uBtWEMhHe5PTeHs0sfm9AZvM8npObMNcOvmQH3bGX2aY9XoEHZbjK -ixM5miVazdEt0y7UonzcXqjpOvjiqlaAcMP+2Y6ejqi1JD3sflyi/GmLNCtlbRsZ -Fx7sgPNk+LueGvwK35TWIsL7hwKBgQDL7mpjYk5V3osb1AGqcaJYsVa9Qm2izsAl -g13sxollDLazaitwAt3r+FMtLVgJPptTlV37QF1WbSCfGCYjjFRP0WQN1lTlZqUN -4QNKQ6SI/Wp4qjl127T2n/1toc7Mhjs00V+RJiFYpN5cdvXniBXjJC0oh30tL+L8 -Cvws2QYJOwKBgQDJEqD1QSUNg4SxdvkxtwbxhSzR8YL6UzJAwP5yd9lu8Wln3oTd -jHsE95DID6Ipr6IIgnRLDdyyLeumz20ZwB00FSWLN/FiqxZncR2u/yaLC4qMYOe1 -Ee5QfW+0J71FH8xQY8wk//yua/GUbHaXyFpeQv8PkbReLWfJ4rh5/3inQwKBgQCl -7M9dG6BXF6Ihu0a7soeAGJJVnRXtUMFgBFnIi+VAda61nh3Hnl2IYFz0th8aLnlc -8XwtMLqA1nujVpe5drUm2FzLMWeT2wdSmpD9vLnDyET39rCX53J+87/UksHbASBt -IinaxKZ/JG3T1+rOPphoXofroQnFWWAa6KkzqETT/wKBgDv2KDnZqYMSNy4xtz96 -IdOpYioocSvRS5kUUwokAIU9CYIo5+iyaJYok25u6OaRNOr1vqzqeG5j8Rdc/kQc -70df/gZ2Gejn+3BYJtwtgeB25KfCjd+jhTHOOgLbnK0tX3h8X5wkpA0628inwMhg -Q9GxE6HDisGIr0S5PWnZFh34 ------END PRIVATE KEY----- diff --git a/crx/myblock.crx b/crx/myblock.crx new file mode 100644 index 0000000..6ee88c6 Binary files /dev/null and b/crx/myblock.crx differ diff --git a/crx/myblock.pem b/crx/myblock.pem new file mode 100644 index 0000000..605b6cf --- /dev/null +++ b/crx/myblock.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDSG09DSvB03TOe +eOmQwfiCIf0wa2WRB31ewxa6i/PRgEKeJSUvIsIuaECUer2ss+J3rwSS2lDpGuiw +FnsVyZqKI/+Rcuc83YJGYg6OAzVMz6UL8YCWhXu3huTJ+V+a5iNereIC69ZERRJt +nXlWqsq6HKya+6BP9sX9CI4GTHQrnWBysAxsswhdnnnRvu+GxglWafSIzuS6OizT +1M1CmkZxNvDJhTSOR7SJlIYm2kM5/fIL53BdndF2IGAjfV1WV7AjwhTfun5cViEO +i8niQUIMY4L0AiO9grFD1g1xIYkeuVBoLxOUBzPxJwQmb64gseb9Dvt0BKLRGoou +SIOyE+KVAgMBAAECggEAI4b6J2kR0VUBEDwmVHO0K38HUstqNHSVgrNO0dLt8sAz +I44o5DhGqPW4a9L4ZS5SrkWyKonPcic6buISRIwfPVoacjQBfVWAXJnil6lbtyYK +ZMNcqLcgBRfCcpOgEq91DiKta6yIwekDFXVyCdFd78v+9ML1J+hUsLVkXJTLdP88 +PGamRWVd6vGy3QMRjyM29GLPgS+/6Vrp1cptSuYNqYhlszohmu8lBvzjH9jbPh9d +GFrrd8Bs7IRCdtKZig/3fbln4JEyyOYE+gcT2jplPksB6mR/5DBIdkVbeuFwGB0+ +h1/PKlprNQt7+Ei0HhHnTib7lZP8WGo4HkSi7PsAGQKBgQD1Ptho0wJiI2+6gL1O +iNsEJVKIQ2Sxdx3wI/qudphM99t6xKCpPyVI2Nd9PBf2jbZjGAaz+P/KQYxEqb6i +PRcQ+i99wCQoRfnRvUbKA4goEpKwRXmvn+499dm6D5pEuumOXGQYCmaFXuLTRN/I +BL6GNgLtoZAlLjUXaWtk8TszGQKBgQDbUf3p3HLpCjRvRDW/vA5xj+08t7xtF9uO +NilGK79uOA4VnxE2w3ioYqQ7t3I8J/0rAzGKq3tylg4QX6UpQ4b2koRr2B3cqoAk +dsRdNWAHwCNepz8hTLsZyuihzbNv2nHmoqhzjK/FcrBHx5NAM+T6OBpLzQBnbUzk +3wIcqm223QKBgQDo/IRxyY0pGMtLXoT6ODACF0b6JzRhGG37tuKvngGAlbQQRP7w +6wmL1F2cH1wQon7UU34CupqfVnhgvvZZgToJqfU2PTTcgeYc6Pl4b7SJhWOQTOCX +BZQ7jvYCulHv27aIxaNd53uQVx2cYoFKr58lN+i+QtADUoujq0YYxshb+QKBgQDW +ZOti7kZCeuBRGIu2V56C8uBFp5MBzf2polZsqx1iIFfcWPfZ4fGUIYFMgwKfvbOl +lWSbmxB9LiSnaugoU0OezBG43rYqXV4Qxy0jtKagTPoGcFWtNrX7+7e3XD8Zi6Am +hkFHW3MEAB5EvNq8Oz6OP8Os78SCVn2BimMlJJFF3QKBgQCF+aEAiBv+ivcmHUeP +2eBq9nLltPFAfXJ/p31MMQ6Jgo36DBqUeoLeyq/WfIXvwqbVbP9fANZrKoTPbI97 +dilCHUoO33rafXJy6jtaggtpz14tt9soecTop0vM/rU7tGtfBe6NXg9LRl+oDJCU +37I3a9Is+2CLyAUXWCk9mLfFsQ== +-----END PRIVATE KEY----- diff --git a/extractor/openrec_extractor/main.go b/extractor/openrec_extractor/main.go index 2fb7715..4596421 100644 --- a/extractor/openrec_extractor/main.go +++ b/extractor/openrec_extractor/main.go @@ -1,7 +1,6 @@ package main import ( - "net/http" _ "net/http/pprof" ) @@ -16,10 +15,6 @@ import ( */ func main() { - go func() { - http.ListenAndServe("0.0.0.0:8899", nil) - }() - oe := &OpenrecExtractor{} oe.Execute() } diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 6052389..45f751e 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -39,27 +39,9 @@ func (oe *OpenrecExtractor) Execute() { }() var lasterr error = nil - execute := func() bool { + for atomic.LoadInt32(&loop) > 0 { var err error - // if sstore.PopCount() >= 1000 { - // if err = estore.Close(); err != nil { - // log.Println(err) - // } - // if err = sstore.Close(); err != nil { - // log.Println(err) - // } - // estore = intimate.NewStoreExtractor() - // sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) - - // oe.supporters.Clear() - // oe.user.Clear() - // oe.userLive.Clear() - - // runtime.GC() // 主动gc - // log.Println("1000次执行, gc 重新建立sql链接") - // } - source, err := sstore.Pop(intimate.TOpenrecUser, 0) if err != nil { if err != lasterr { @@ -67,7 +49,7 @@ func (oe *OpenrecExtractor) Execute() { lasterr = err } time.Sleep(time.Second * 2) - return true + continue } sdata := source.Ext.([]byte) @@ -118,15 +100,8 @@ func (oe *OpenrecExtractor) Execute() { source.Operator = int32(intimate.OperatorExtractorOK) sstore.UpdateOperator(source) - - return true } - for atomic.LoadInt32(&loop) > 0 { - if !execute() { - break - } - } } func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) { diff --git a/extractor/twitch_extractor/.gitignore b/extractor/twitch_extractor/.gitignore new file mode 100644 index 0000000..a2523a9 --- /dev/null +++ b/extractor/twitch_extractor/.gitignore @@ -0,0 +1,4 @@ +*.html +log +screenlog.* +twitch_extractor \ No newline at end of file diff --git a/extractor/twitch_extractor/twitch_test.go b/extractor/twitch_extractor/twitch_test.go new file mode 100644 index 0000000..884215e --- /dev/null +++ b/extractor/twitch_extractor/twitch_test.go @@ -0,0 +1,13 @@ +package main + +import ( + "intimate" + "testing" +) + +var estore = intimate.NewStoreExtractor() +var sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) + +func TestCase0(t *testing.T) { + +} diff --git a/go.mod b/go.mod index 729309d..28e095a 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.14 require ( github.com/474420502/gcurl v0.1.2 github.com/474420502/hunter v0.3.4 + github.com/474420502/requests v1.6.0 github.com/go-sql-driver/mysql v1.5.0 github.com/lestrrat-go/libxml2 v0.0.0-20200215080510-6483566f52cb github.com/tebeka/selenium v0.9.9 diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index feeeb60..124be92 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -70,12 +70,12 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { tp := cxt.Temporary() content := resp.Content() - if len(content) <= 200 { // 末页退出 + if len(content) <= 200 { //末页时没有内容返回, 末页退出 finishpoint := time.Now() log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*120)) for time.Now().Sub(finishpoint) < time.Minute*120 { time.Sleep(time.Second) - if atomic.LoadInt32(&loop) > 0 { + if atomic.LoadInt32(&loop) <= 0 { return } } @@ -122,6 +122,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { } } + // 修改url query 参数的page递增. 遍历所有页面 querys := tp.GetQuery() page, err := strconv.Atoi(querys.Get("page")) if err != nil { diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 76ea4c8..9126949 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -51,7 +51,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { for atomic.LoadInt32(&loop) > 0 { - streamer, err := estore.Pop(intimate.Popenrec) + streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 if streamer == nil || err != nil { if err != lasterr { @@ -66,7 +66,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { var updateUrl map[string]string - err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url if err != nil { log.Println(err) continue @@ -74,7 +74,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { // Check Userid userUrl := updateUrl["user"] - tp := cxt.Session().Get(userUrl) + tp := cxt.Session().Get(userUrl) // 获取user url页面数据 resp, err := tp.Execute() streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} @@ -86,14 +86,14 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { cookies := cxt.Session().GetCookies(tp.GetParsedURL()) - scurl := updateUrl["supporters"] + scurl := updateUrl["supporters"] //获取打赏者的数据 curl := gcurl.ParseRawCURL(scurl) supportersSession := curl.CreateSession() temporary := curl.CreateTemporary(supportersSession) supportersSession.SetCookies(temporary.GetParsedURL(), cookies) var supporters []string - for { + for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 supportersQuery := temporary.GetQuery() @@ -122,13 +122,13 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { log.Println(err) } supporterjson := gjson.ParseBytes(resp.Content()) - supporterdata := supporterjson.Get("data") + supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 if supporterdata.Type == gjson.Null { break } supporters = append(supporters, string(resp.Content())) - page := supportersQuery.Get("page_number") + page := supportersQuery.Get("page_number") // page_number 加1 pageint, err := strconv.Atoi(page) if err != nil { log.Println(err) diff --git a/tasks/twitch/twitch_task1/.gitignore b/tasks/twitch/twitch_task1/.gitignore new file mode 100644 index 0000000..3684d9b --- /dev/null +++ b/tasks/twitch/twitch_task1/.gitignore @@ -0,0 +1,2 @@ +twitch_task1 +log \ No newline at end of file diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index cdff148..bc7805d 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -25,6 +25,7 @@ type ChannelLink struct { func (cl *ChannelLink) Execute() { var err error wd := intimate.GetChromeDriver(3030) + defer wd.Close() weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" err = wd.Get(weburl) diff --git a/tasks/twitch/twitch_task2/.gitignore b/tasks/twitch/twitch_task2/.gitignore new file mode 100644 index 0000000..846a6b4 --- /dev/null +++ b/tasks/twitch/twitch_task2/.gitignore @@ -0,0 +1,2 @@ +twitch_task2 +log \ No newline at end of file diff --git a/tasks/twitch/twitch_task2/task_twitch.go b/tasks/twitch/twitch_task2/task_twitch.go index 178f0b8..7b721a5 100644 --- a/tasks/twitch/twitch_task2/task_twitch.go +++ b/tasks/twitch/twitch_task2/task_twitch.go @@ -33,6 +33,7 @@ func (cl *UserList) Execute() { //article//a[@data-a-target='preview-card-title-link'] wd := intimate.GetChromeDriver(3030) + defer wd.Close() var loop int32 = 1 @@ -74,19 +75,20 @@ func (cl *UserList) Execute() { var elements []selenium.WebElement var liveurls = 0 - var delayerror = 3 + var delayerror = 2 for i := 0; i < 200 && atomic.LoadInt32(&loop) > 0; i++ { elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") if err != nil { log.Println(err) break } + time.Sleep(time.Millisecond * 500) wd.KeyDown(selenium.EndKey) wd.KeyUp(selenium.EndKey) - time.Sleep(time.Second * 2) + time.Sleep(time.Millisecond * 1500) if len(elements) == liveurls { if liveurls == 0 { - delayerror -= 2 + delayerror -= 1 } else { delayerror-- } @@ -95,7 +97,7 @@ func (cl *UserList) Execute() { break } } else { - delayerror = 3 + delayerror = 2 } liveurls = len(elements) } diff --git a/tasks/twitch/twitch_task3/task_twitch_test.go b/tasks/twitch/twitch_task3/task_twitch_test.go new file mode 100644 index 0000000..2c6c19a --- /dev/null +++ b/tasks/twitch/twitch_task3/task_twitch_test.go @@ -0,0 +1,114 @@ +package main + +import ( + "encoding/json" + "intimate" + "log" + "os" + "os/signal" + "regexp" + "sync/atomic" + "syscall" + "testing" + "time" + + "github.com/tebeka/selenium" +) + +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() + +func TestCase(t *testing.T) { + var loop int32 = 1 + wd := intimate.GetChromeDriver(3030) + + go func() { + signalchan := make(chan os.Signal) + signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) + log.Println("accept stop command:", <-signalchan) + atomic.StoreInt32(&loop, 0) + }() + + var lasterr error = nil + // var err error + + for atomic.LoadInt32(&loop) > 0 { + streamer, err := estore.Pop(intimate.Ptwitch, 0) + if streamer == nil || err != nil { + if err != lasterr { + log.Println(err, lasterr) + lasterr = err + } + time.Sleep(time.Second * 2) + continue + } + + var updateUrl map[string]string + json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + liveUrl := updateUrl["live"] + log.Println(liveUrl) + + err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about") + if err != nil { + log.Println(err) + //estore.UpdateError(streamer, err) + continue + } + + time.Sleep(time.Millisecond * 500) + wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + _, err = web.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1/text()") + if err != nil { + return false, err + } + return true, err + }, 12) + + label, err := wd.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1") + if err != nil { + log.Println(err) + //estore.UpdateError(streamer, err) + continue + } + log.Println(label.Text()) + + wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + followers, err := web.FindElement(selenium.ByXPATH, "//div[@data-a-target='about-panel']//div[@class='tw-align-center']/text()") + if err != nil { + return false, err + } + ft, err := followers.Text() + log.Println(ft) + if err != nil || ft != "" { + return false, err + } + return true, nil + }, 12) + + followers, err := wd.FindElement(selenium.ByXPATH, "//div[@data-a-target='about-panel']//div[@class='tw-align-center']") + if err != nil { + log.Println(err) + //estore.UpdateError(streamer, err) + continue + } + fstr, err := followers.Text() + if err != nil { + log.Println(err) + //estore.UpdateError(streamer, err) + continue + } + log.Println(regexp.MustCompile(`[\d,]+`).FindString(fstr)) + //div[@data-a-target="about-panel"] + + if views, err := wd.FindElement(selenium.ByXPATH, "//a[@data-a-target='home-live-overlay-button']/span"); err == nil { + log.Println(views.Text()) + views.Click() + } + + streamer.Operator = 0 + estore.UpdateOperator(streamer) + } +} diff --git a/utils.go b/utils.go index 774de66..899b68d 100644 --- a/utils.go +++ b/utils.go @@ -51,11 +51,11 @@ func ParseDuration(dt string) (time.Duration, error) { func GetChromeDriver(port int) selenium.WebDriver { caps := selenium.Capabilities{"browserName": "chrome"} chromecaps := chrome.Capabilities{} - err := chromecaps.AddExtension("../../../crx/0.1.2_0.crx") + err := chromecaps.AddExtension("../../../crx/myblock.crx") if err != nil { panic(err) } - chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url=http://127.0.0.1:1081/pac") + // chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url=http://127.0.0.1:1081/pac") chromecaps.Args = append(chromecaps.Args, "--disk-cache-dir=/tmp/chromedriver-cache") chromecaps.ExcludeSwitches = append(chromecaps.ExcludeSwitches, "enable-automation") caps.AddChrome(chromecaps)