diff --git a/.gitignore b/.gitignore index 331f9bc..657f5f1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,7 @@ screenlog.* intimate *.gz debug.test - +myblock +run.sh +stop.sh diff --git a/config.go b/config.go index 1e73967..369ca21 100644 --- a/config.go +++ b/config.go @@ -17,7 +17,7 @@ func init() { InitConfig.Load() // storeOpenrec = NewStore() - log.SetFlags(log.Llongfile | log.Ldate) + log.SetFlags(log.Llongfile | log.Ltime) } // Config 配置 diff --git a/crx/myblock.crx b/crx/myblock.crx new file mode 100644 index 0000000..6ee88c6 Binary files /dev/null and b/crx/myblock.crx differ diff --git a/crx/myblock.pem b/crx/myblock.pem new file mode 100644 index 0000000..605b6cf --- /dev/null +++ b/crx/myblock.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDSG09DSvB03TOe +eOmQwfiCIf0wa2WRB31ewxa6i/PRgEKeJSUvIsIuaECUer2ss+J3rwSS2lDpGuiw +FnsVyZqKI/+Rcuc83YJGYg6OAzVMz6UL8YCWhXu3huTJ+V+a5iNereIC69ZERRJt +nXlWqsq6HKya+6BP9sX9CI4GTHQrnWBysAxsswhdnnnRvu+GxglWafSIzuS6OizT +1M1CmkZxNvDJhTSOR7SJlIYm2kM5/fIL53BdndF2IGAjfV1WV7AjwhTfun5cViEO +i8niQUIMY4L0AiO9grFD1g1xIYkeuVBoLxOUBzPxJwQmb64gseb9Dvt0BKLRGoou +SIOyE+KVAgMBAAECggEAI4b6J2kR0VUBEDwmVHO0K38HUstqNHSVgrNO0dLt8sAz +I44o5DhGqPW4a9L4ZS5SrkWyKonPcic6buISRIwfPVoacjQBfVWAXJnil6lbtyYK +ZMNcqLcgBRfCcpOgEq91DiKta6yIwekDFXVyCdFd78v+9ML1J+hUsLVkXJTLdP88 +PGamRWVd6vGy3QMRjyM29GLPgS+/6Vrp1cptSuYNqYhlszohmu8lBvzjH9jbPh9d +GFrrd8Bs7IRCdtKZig/3fbln4JEyyOYE+gcT2jplPksB6mR/5DBIdkVbeuFwGB0+ +h1/PKlprNQt7+Ei0HhHnTib7lZP8WGo4HkSi7PsAGQKBgQD1Ptho0wJiI2+6gL1O +iNsEJVKIQ2Sxdx3wI/qudphM99t6xKCpPyVI2Nd9PBf2jbZjGAaz+P/KQYxEqb6i +PRcQ+i99wCQoRfnRvUbKA4goEpKwRXmvn+499dm6D5pEuumOXGQYCmaFXuLTRN/I +BL6GNgLtoZAlLjUXaWtk8TszGQKBgQDbUf3p3HLpCjRvRDW/vA5xj+08t7xtF9uO +NilGK79uOA4VnxE2w3ioYqQ7t3I8J/0rAzGKq3tylg4QX6UpQ4b2koRr2B3cqoAk +dsRdNWAHwCNepz8hTLsZyuihzbNv2nHmoqhzjK/FcrBHx5NAM+T6OBpLzQBnbUzk +3wIcqm223QKBgQDo/IRxyY0pGMtLXoT6ODACF0b6JzRhGG37tuKvngGAlbQQRP7w +6wmL1F2cH1wQon7UU34CupqfVnhgvvZZgToJqfU2PTTcgeYc6Pl4b7SJhWOQTOCX +BZQ7jvYCulHv27aIxaNd53uQVx2cYoFKr58lN+i+QtADUoujq0YYxshb+QKBgQDW +ZOti7kZCeuBRGIu2V56C8uBFp5MBzf2polZsqx1iIFfcWPfZ4fGUIYFMgwKfvbOl +lWSbmxB9LiSnaugoU0OezBG43rYqXV4Qxy0jtKagTPoGcFWtNrX7+7e3XD8Zi6Am +hkFHW3MEAB5EvNq8Oz6OP8Os78SCVn2BimMlJJFF3QKBgQCF+aEAiBv+ivcmHUeP +2eBq9nLltPFAfXJ/p31MMQ6Jgo36DBqUeoLeyq/WfIXvwqbVbP9fANZrKoTPbI97 +dilCHUoO33rafXJy6jtaggtpz14tt9soecTop0vM/rU7tGtfBe6NXg9LRl+oDJCU +37I3a9Is+2CLyAUXWCk9mLfFsQ== +-----END PRIVATE KEY----- diff --git a/extractor/openrec_extractor/main.go b/extractor/openrec_extractor/main.go index 2fb7715..4596421 100644 --- a/extractor/openrec_extractor/main.go +++ b/extractor/openrec_extractor/main.go @@ -1,7 +1,6 @@ package main import ( - "net/http" _ "net/http/pprof" ) @@ -16,10 +15,6 @@ import ( */ func main() { - go func() { - http.ListenAndServe("0.0.0.0:8899", nil) - }() - oe := &OpenrecExtractor{} oe.Execute() } diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 6052389..3b689ad 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -39,35 +39,17 @@ func (oe *OpenrecExtractor) Execute() { }() var lasterr error = nil - execute := func() bool { + for atomic.LoadInt32(&loop) > 0 { var err error - // if sstore.PopCount() >= 1000 { - // if err = estore.Close(); err != nil { - // log.Println(err) - // } - // if err = sstore.Close(); err != nil { - // log.Println(err) - // } - // estore = intimate.NewStoreExtractor() - // sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) - - // oe.supporters.Clear() - // oe.user.Clear() - // oe.userLive.Clear() - - // runtime.GC() // 主动gc - // log.Println("1000次执行, gc 重新建立sql链接") - // } - source, err := sstore.Pop(intimate.TOpenrecUser, 0) if err != nil { if err != lasterr { log.Println(err, lasterr) lasterr = err } - time.Sleep(time.Second * 2) - return true + time.Sleep(time.Second * 5) + continue } sdata := source.Ext.([]byte) @@ -103,12 +85,13 @@ func (oe *OpenrecExtractor) Execute() { streamer.Uid = source.StreamerId.Int64 streamer.UpdateTime = source.UpdateTime + streamer.Tags = clog.Tags clog.Platform = string(intimate.Popenrec) clog.UserId = userId clog.UpdateTime = source.UpdateTime - logUid := estore.InsertCollectLog(clog) + logUid := estore.InsertClog(clog) LiveUrl := "https://www.openrec.tv/live/" + userId streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} @@ -118,15 +101,8 @@ func (oe *OpenrecExtractor) Execute() { source.Operator = int32(intimate.OperatorExtractorOK) sstore.UpdateOperator(source) - - return true } - for atomic.LoadInt32(&loop) > 0 { - if !execute() { - break - } - } } func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) { diff --git a/extractor/twitch_extractor/.gitignore b/extractor/twitch_extractor/.gitignore new file mode 100644 index 0000000..a2523a9 --- /dev/null +++ b/extractor/twitch_extractor/.gitignore @@ -0,0 +1,4 @@ +*.html +log +screenlog.* +twitch_extractor \ No newline at end of file diff --git a/extractor/twitch_extractor/tiwtch_extractor.go b/extractor/twitch_extractor/tiwtch_extractor.go new file mode 100644 index 0000000..bd2c6cf --- /dev/null +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -0,0 +1,247 @@ +package main + +import ( + "database/sql" + "encoding/json" + "intimate" + "log" + "regexp" + "time" + + "github.com/tebeka/selenium" +) + +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() + +func main() { + wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() + + counter := intimate.NewCounter() + counter.SetMaxLimit(200) + counter.SetMaxToDo(func(olist ...interface{}) error { + owd := olist[0].(*selenium.WebDriver) + (*owd).Close() + (*owd).Quit() + *owd = intimate.GetChromeDriver(3030) + return nil + }, &wd) + + var lasterr error = nil + // var err error + + for !ps.IsClose() { + streamer, err := estore.Pop(intimate.Ptwitch, 0) + if streamer == nil || err != nil { + if err != lasterr { + log.Println(err, lasterr) + lasterr = err + } + time.Sleep(time.Second * 2) + continue + } + + var updateUrl map[string]string + json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + liveUrl := updateUrl["live"] + log.Println(liveUrl) + + // err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about") + err = wd.Get(liveUrl + "/about") + if err != nil { + log.Println(err) + estore.UpdateError(streamer, err) + time.Sleep(time.Second * 5) + continue + } + + streamer.LiveUrl = sql.NullString{String: liveUrl, Valid: true} + clog := &intimate.CollectLog{} + clog.UserId = streamer.UserId + clog.Gratuity = sql.NullInt64{Int64: 0, Valid: false} + + time.Sleep(time.Millisecond * 500) + err = extractUserName(wd, streamer) + if err != nil { + continue + } + err = extractFollowers(wd, clog) + if err != nil { + continue + } + + err = extractViews(wd, clog) // views + tags + gratuity + if err != nil { + // 不直播时提取礼物 gratuity + wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + channelchat, err := wd.FindElement(selenium.ByXPATH, `//a[@data-a-target="channel-home-tab-Chat"]`) + btn, _ := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) + if (err == nil && channelchat != nil) || btn != nil { + if channelchat != nil { + channelchat.Click() + } + time.Sleep(time.Second) + extractGratuity(wd, clog) + return true, nil + } + return false, nil + + }, time.Second*4) + } + + streamer.Platform = intimate.Ptwitch + clog.Platform = string(streamer.Platform) + clog.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} + lastClogId := estore.InsertClog(clog) + + streamer.Operator = 10 + streamer.LatestLogUid = lastClogId + if clog.Tags != nil { + streamer.Tags = clog.Tags + } + + switch fl := clog.Followers.Int64; { + case fl > 100000: + streamer.UpdateInterval = 120 + case fl > 10000: + streamer.UpdateInterval = 240 + case fl > 1000: + streamer.UpdateInterval = 360 + case fl > 100: + streamer.UpdateInterval = 720 + case fl > 0: + streamer.UpdateInterval = 1440 + } + + streamer.UpdateTime = clog.UpdateTime + estore.UpdateStreamer(streamer) + counter.AddWithReset(1) + } + + wd.Close() + wd.Quit() +} + +func extractUserName(wd selenium.WebDriver, streamer *intimate.Streamer) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + label, err := web.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1") + if err == nil { + if ltxt, err := label.Text(); err == nil && ltxt != "" { + // log.Println("label:", ltxt) + streamer.UserName = sql.NullString{String: ltxt, Valid: true} + return true, nil + } + } + return false, err + }, 15*time.Second) +} + +func extractFollowers(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + efollowers, err := web.FindElement(selenium.ByXPATH, "//div[@data-a-target='about-panel']//div[@class='tw-align-center']") + if err != nil { + return false, err + } + followers, err := efollowers.Text() + if err != nil || followers == "" { + return false, err + } + followers = regexp.MustCompile(`[\d,]+`).FindString(followers) + fint, _ := intimate.ParseNumber(followers) + clog.Followers = sql.NullInt64{Int64: int64(fint), Valid: true} + // log.Println("followers: ", followers, fint) + return true, nil + }, 4*time.Second) +} + +func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + views, err := web.FindElement(selenium.ByXPATH, "//a[@data-a-target='home-live-overlay-button']/span") + if views != nil { + if txt, err := views.Text(); err == nil { + + vint, _ := intimate.ParseNumber(txt) + clog.Views = sql.NullInt64{Int64: vint, Valid: true} + // log.Println("views:", txt) + views.Click() + + extractTags(wd, clog) + extractTitle(wd, clog) + extractGratuity(wd, clog) + + return true, nil + } + } + return false, err + }, time.Second*4) +} + +func extractTitle(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + title, err := web.FindElement(selenium.ByXPATH, `//h2[@data-a-target='stream-title']`) + if err == nil { + if txt, err := title.Text(); err == nil { + clog.LiveTitle = sql.NullString{String: txt, Valid: true} + return true, nil + } + } + return false, err + }, time.Second*4) +} + +func extractTags(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + tags, err := web.FindElements(selenium.ByXPATH, "//a[@aria-label and @data-a-target and @href]/div[@class and text()]") + if len(tags) == 0 { + return false, err + } + + var stags []string + for _, tag := range tags { + if txt, err := tag.Text(); err == nil { + stags = append(stags, txt) + } else { + log.Println(err) + } + } + if len(stags) > 0 { + if tagbuf, err := json.Marshal(stags); err == nil { + clog.Tags = tagbuf + } else { + log.Println(err) + } + } + + return true, nil + }, time.Second*4) +} + +func extractGratuity(wd selenium.WebDriver, clog *intimate.CollectLog) error { + return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) { + btn, err := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`) + if err == nil { + btn.Click() + time.Sleep(time.Second) + gifcount, err := web.FindElements(selenium.ByXPATH, `//div[@class="sub-gift-count tw-flex"]/p`) + if err == nil { + var gratuity int64 = 0 + for _, gc := range gifcount { + if gtxt, err := gc.Text(); err == nil { + gint, _ := intimate.ParseNumber(gtxt) + gratuity += gint + } else { + log.Println(err) + } + } + clog.Gratuity = sql.NullInt64{Int64: gratuity, Valid: true} + } + return true, nil + } + + return false, err + }, time.Second*4) +} diff --git a/extractor/twitch_extractor/twitch_test.go b/extractor/twitch_extractor/twitch_test.go new file mode 100644 index 0000000..731b2d3 --- /dev/null +++ b/extractor/twitch_extractor/twitch_test.go @@ -0,0 +1,9 @@ +package main + +import ( + "testing" +) + +func TestCase0(t *testing.T) { + main() +} diff --git a/extractor_field.go b/extractor_field.go index 0f94cd6..ad5ef30 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -19,11 +19,12 @@ type Streamer struct { UserName sql.NullString // LiveUrl sql.NullString // Channel sql.NullString // - Ext interface{} // + Tags interface{} + Ext interface{} // IsUpdateStreamer bool // 更新上面的内容 IsUpdateUrl bool - updateInterval int32 + UpdateInterval int32 UpdateUrl interface{} LatestLogUid int64 UpdateTime sql.NullTime // diff --git a/go.mod b/go.mod index 729309d..d8b824d 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,10 @@ module intimate go 1.14 require ( + github.com/474420502/focus v0.12.0 github.com/474420502/gcurl v0.1.2 github.com/474420502/hunter v0.3.4 + github.com/474420502/requests v1.6.0 github.com/go-sql-driver/mysql v1.5.0 github.com/lestrrat-go/libxml2 v0.0.0-20200215080510-6483566f52cb github.com/tebeka/selenium v0.9.9 diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index f588757..91d3e3b 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS `streamer` ( `user_name` varchar(255) DEFAULT NULL COMMENT '用户名字 区别于ID', `live_url` text COMMENT '直播的url', `channel` varchar(128) DEFAULT NULL COMMENT'所属 频道,分类 未必所有平台都有明确的标签', + `tag` json DEFAULT NULL COMMENT 'streamer 最新的tag', `ext` json DEFAULT NULL COMMENT '扩展类型, 把一些可能需要但是没字段的数据放在json扩展', `is_update_streamer` tinyint(1) DEFAULT 0 COMMENT '是否需要持续更新streamer的信息. 1为需要,0则否', diff --git a/sql/remake_database.sh b/sql/remake_database.sh new file mode 100644 index 0000000..180663b --- /dev/null +++ b/sql/remake_database.sh @@ -0,0 +1,10 @@ +# /bin/bash +USER=root +HOST=127.0.0.1 +PORT=4000 + +# mysql -h $HOST -u $USER -P $PORT -c "drop database intimate_source"; +# mysql -h $HOST -u $USER -P $PORT -c "drop database intimate_extractor"; + +mysql -h $HOST -u $USER -P $PORT < ./intimate_extractor.sql; +mysql -h $HOST -u $USER -P $PORT < ./intimate_source.sql; diff --git a/store.go b/store.go index 2c07234..cbdbbfc 100644 --- a/store.go +++ b/store.go @@ -87,7 +87,8 @@ func (store *StoreSource) Insert(isource IGet) { // Deduplicate 去重 func (store *StoreSource) Deduplicate(target Target, field string) { - _, err := store.db.Exec(`DELETE FROM ` + store.table + ` WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, ` + field + ` FROM ` + store.table + `force index(target_type_idx) WHERE target_type = "` + string(target) + `" ) s GROUP BY s.` + string(target) + `) ;`) + sql := `DELETE FROM ` + store.table + ` WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, ` + field + ` FROM ` + store.table + ` force index(target_type_idx) WHERE target_type = "` + string(target) + `" ) s GROUP BY s.` + string(field) + `) ;` + _, err := store.db.Exec(sql) if err != nil { panic(err) } @@ -229,7 +230,7 @@ func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Stream return nil, err } var args = []interface{}{string(platform)} - selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` + selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer, update_interval from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) @@ -257,7 +258,7 @@ func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Stream s := &Streamer{} // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.UpdateUrl, &s.IsUpdateStreamer) + err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.UpdateUrl, &s.IsUpdateStreamer, &s.UpdateInterval) if err != nil { return nil, err } @@ -269,7 +270,7 @@ func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Stream // InsertStreamer Streamer表, 插入数据 func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { // select uid from table where platform = ? and user_id = ? - selectSQL := "SELECT is_update_url FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" + selectSQL := "SELECT is_update_url, uid FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" tx, err := store.db.Begin() if err != nil { panic(err) @@ -288,14 +289,16 @@ func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { row := tx.QueryRow(selectSQL+` LIMIT 1 FOR UPDATE`, streamer.Get("Platform"), streamer.Get("UserId")) var isUpdateUrl bool - if err = row.Scan(&isUpdateUrl); err == nil { + var Uid int64 + if err = row.Scan(&isUpdateUrl, &Uid); err == nil { if isUpdateUrl { tx.Exec("UPDATE "+StreamerTable+" SET update_url = ?", streamer.Get("UpdateUrl")) } + streamer.(ISet).Set("Uid", Uid) return true } - _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*30)) + _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*60)) if err != nil { panic(err) } @@ -330,16 +333,33 @@ func (store *StoreExtractor) UpdateOperator(isource IGet) { } // UpdateStreamer Streamer表, 插入数据 -func (store *StoreExtractor) UpdateStreamer(isource IGet) { - _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, ext = ?, operator = ?, update_time = ? WHERE uid = ?;", - isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("UpdateTime"), isource.Get("Uid")) +func (store *StoreExtractor) UpdateStreamer(streamer IGet) { + _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, tags = ?, ext = ?, operator = ?, update_time = ?, update_interval = ? WHERE uid = ?;", + streamer.Get("UserName"), streamer.Get("LiveUrl"), streamer.Get("Channel"), streamer.Get("LatestLogUid"), streamer.Get("Tags"), streamer.Get("Ext"), streamer.Get("Operator"), streamer.Get("UpdateTime"), streamer.Get("UpdateInterval"), streamer.Get("Uid")) if err != nil { panic(err) } } -// InsertCollectLog CollectLog表插入数据 -func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { +// Update Streamer表, 更新指定的字段 +func (store *StoreExtractor) Update(streamer IGet, fieldvalues ...interface{}) { + updateSQL := "UPDATE " + StreamerTable + " SET " + var values []interface{} + for i := 0; i < len(fieldvalues); i += 2 { + field := fieldvalues[i] + values = append(values, fieldvalues[i+1]) + updateSQL += field.(string) + " = ? " + } + updateSQL += "WHERE uid = ?" + values = append(values, streamer.Get("Uid")) + _, err := store.db.Exec(updateSQL, values...) + if err != nil { + panic(err) + } +} + +// InsertClog CollectLog表插入数据 +func (store *StoreExtractor) InsertClog(clog IGet) int64 { tx, err := store.db.Begin() defer func() { @@ -354,7 +374,7 @@ func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { } result, err := tx.Exec("insert into "+CollectLogTable+"(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - isource.Get("StreamerUid"), isource.Get("Platform"), isource.Get("UserId"), isource.Get("IsLiveStreaming"), isource.Get("IsError"), isource.Get("Followers"), isource.Get("Views"), isource.Get("Giver"), isource.Get("Gratuity"), isource.Get("LiveTitle"), isource.Get("LiveStartTime"), isource.Get("LiveEndTime"), isource.Get("UpdateTime"), isource.Get("Tags"), isource.Get("Ext"), isource.Get("ErrorMsg"), + clog.Get("StreamerUid"), clog.Get("Platform"), clog.Get("UserId"), clog.Get("IsLiveStreaming"), clog.Get("IsError"), clog.Get("Followers"), clog.Get("Views"), clog.Get("Giver"), clog.Get("Gratuity"), clog.Get("LiveTitle"), clog.Get("LiveStartTime"), clog.Get("LiveEndTime"), clog.Get("UpdateTime"), clog.Get("Tags"), clog.Get("Ext"), clog.Get("ErrorMsg"), ) if err != nil { panic(err) @@ -365,7 +385,7 @@ func (store *StoreExtractor) InsertCollectLog(isource IGet) int64 { panic(err) } - _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, isource.Get("StreamerUid")) + _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, clog.Get("StreamerUid")) if err = tx.Commit(); err != nil { panic(err) } diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index feeeb60..124be92 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -70,12 +70,12 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { tp := cxt.Temporary() content := resp.Content() - if len(content) <= 200 { // 末页退出 + if len(content) <= 200 { //末页时没有内容返回, 末页退出 finishpoint := time.Now() log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*120)) for time.Now().Sub(finishpoint) < time.Minute*120 { time.Sleep(time.Second) - if atomic.LoadInt32(&loop) > 0 { + if atomic.LoadInt32(&loop) <= 0 { return } } @@ -122,6 +122,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { } } + // 修改url query 参数的page递增. 遍历所有页面 querys := tp.GetQuery() page, err := strconv.Atoi(querys.Get("page")) if err != nil { diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 76ea4c8..9126949 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -51,7 +51,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { for atomic.LoadInt32(&loop) > 0 { - streamer, err := estore.Pop(intimate.Popenrec) + streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 if streamer == nil || err != nil { if err != lasterr { @@ -66,7 +66,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { var updateUrl map[string]string - err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url if err != nil { log.Println(err) continue @@ -74,7 +74,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { // Check Userid userUrl := updateUrl["user"] - tp := cxt.Session().Get(userUrl) + tp := cxt.Session().Get(userUrl) // 获取user url页面数据 resp, err := tp.Execute() streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} @@ -86,14 +86,14 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { cookies := cxt.Session().GetCookies(tp.GetParsedURL()) - scurl := updateUrl["supporters"] + scurl := updateUrl["supporters"] //获取打赏者的数据 curl := gcurl.ParseRawCURL(scurl) supportersSession := curl.CreateSession() temporary := curl.CreateTemporary(supportersSession) supportersSession.SetCookies(temporary.GetParsedURL(), cookies) var supporters []string - for { + for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 supportersQuery := temporary.GetQuery() @@ -122,13 +122,13 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { log.Println(err) } supporterjson := gjson.ParseBytes(resp.Content()) - supporterdata := supporterjson.Get("data") + supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 if supporterdata.Type == gjson.Null { break } supporters = append(supporters, string(resp.Content())) - page := supportersQuery.Get("page_number") + page := supportersQuery.Get("page_number") // page_number 加1 pageint, err := strconv.Atoi(page) if err != nil { log.Println(err) diff --git a/tasks/twitch/twitch_task1/.gitignore b/tasks/twitch/twitch_task1/.gitignore new file mode 100644 index 0000000..3684d9b --- /dev/null +++ b/tasks/twitch/twitch_task1/.gitignore @@ -0,0 +1,2 @@ +twitch_task1 +log \ No newline at end of file diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index 4f0aa50..2b385ed 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -25,6 +25,7 @@ type ChannelLink struct { func (cl *ChannelLink) Execute() { var err error wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" err = wd.Get(weburl) @@ -39,7 +40,7 @@ func (cl *ChannelLink) Execute() { } return len(elements) > 0, nil } - wd.WaitWithTimeout(cardCondition, time.Second*30) + wd.WaitWithTimeout(cardCondition, time.Second*15) time.Sleep(time.Second) e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") @@ -48,22 +49,57 @@ func (cl *ChannelLink) Execute() { } e.Click() + var hrefs map[string]bool = make(map[string]bool) + var delayerror = 5 + var samecount = 0 for i := 0; i <= 200; i++ { - wd.KeyDown(selenium.EndKey) - time.Sleep(time.Second * 3) - } - - elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") - if err != nil { - panic(err) - } - // xpath: //article//a[@data-a-target='preview-card-title-link'] - for _, ele := range elements { - href, err := ele.GetAttribute("href") + cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") if err != nil { log.Println(err) + break } - log.Println(href) // TODO: Save href + + if len(cards) == samecount { + delayerror-- + if delayerror <= 0 { + break + } + } else { + delayerror = 5 + } + + for ii := 0; ii < 10; ii++ { + for _, card := range cards { + href, err := card.GetAttribute("href") + if err != nil { + log.Println(href, err) + continue + } else { + hrefs[href] = true + } + } + break + } + samecount = len(cards) + if ps.IsClose() { + break + } + + if len(cards) > 10 { + log.Println(len(cards)) + wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null); + for (var i = 0; i < items.snapshotLength - 10; i++) { item = items.snapshotItem(i); item.remove() ;};`, nil) + } + time.Sleep(time.Millisecond * 200) + wd.KeyDown(selenium.EndKey) + time.Sleep(time.Millisecond * 200) + wd.KeyUp(selenium.EndKey) + time.Sleep(time.Millisecond * 2500) + } + + for href := range hrefs { + + // TODO: Save href source := &intimate.Source{} source.Source = sql.NullString{String: href, Valid: true} source.Operator = 0 @@ -72,5 +108,6 @@ func (cl *ChannelLink) Execute() { sstore.Insert(source) } + log.Println("hrefs len:", len(hrefs)) sstore.Deduplicate(intimate.TTwitchChannel, "source") } diff --git a/tasks/twitch/twitch_task2/.gitignore b/tasks/twitch/twitch_task2/.gitignore new file mode 100644 index 0000000..846a6b4 --- /dev/null +++ b/tasks/twitch/twitch_task2/.gitignore @@ -0,0 +1,2 @@ +twitch_task2 +log \ No newline at end of file diff --git a/tasks/twitch/twitch_task2/task_twitch.go b/tasks/twitch/twitch_task2/task_twitch.go index eb3780a..52773de 100644 --- a/tasks/twitch/twitch_task2/task_twitch.go +++ b/tasks/twitch/twitch_task2/task_twitch.go @@ -27,88 +27,151 @@ type UserList struct { func (cl *UserList) Execute() { // DELETE FROM source_twitch WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, source FROM source_twitch ) s GROUP BY s.source) ; //article//a[@data-a-target='preview-card-title-link'] - var err error + wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() + counter := intimate.NewCounter() + counter.SetMaxLimit(100) + counter.SetMaxToDo(func(olist ...interface{}) error { + owd := olist[0].(*selenium.WebDriver) + (*owd).Close() + (*owd).Quit() + *owd = intimate.GetChromeDriver(3030) + return nil + }, &wd) - sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) - if err != nil { - panic(err) - } + for !ps.IsClose() { - weburl := sourceChannel.Source.String + "?sort=VIEWER_COUNT" - err = wd.Get(weburl) - if err != nil { - panic(err) - } - - wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) { - _, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]") - if err != nil { - return false, err - } - return true, nil - }, time.Second*10) - - btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") - if err != nil { - panic(err) - } - btn.Click() - - var elements []selenium.WebElement - var liveurls = 0 - var delayerror = 3 - for i := 0; i < 2; i++ { - elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") + var err error + sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) if err != nil { panic(err) } - wd.KeyDown(selenium.EndKey) - time.Sleep(time.Second * 2) - if len(elements) == liveurls { - delayerror-- - if delayerror <= 0 { + + weburl := sourceChannel.Source.String + "?sort=VIEWER_COUNT" + err = wd.Get(weburl) + if err != nil { + log.Println(err) + sstore.UpdateError(sourceChannel, err) + time.Sleep(time.Second * 10) + continue + } + + wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) { + _, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]") + if err != nil { + return false, err + } + return true, nil + }, time.Second*10) + + btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") + if err != nil { + log.Println(err) + continue + } + btn.Click() + + var elements []selenium.WebElement + var liveurls = 0 + var delayerror = 2 + for i := 0; i < 200 && !ps.IsClose(); i++ { + elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") + if err != nil { + log.Println(err) break } - } else { - delayerror = 3 + time.Sleep(time.Millisecond * 200) + wd.KeyDown(selenium.EndKey) + time.Sleep(time.Millisecond * 200) + wd.KeyUp(selenium.EndKey) + time.Sleep(time.Millisecond * 2000) + if len(elements) == liveurls { + delayerror-- + if delayerror <= 0 { + break + } + } else { + delayerror = 2 + } + liveurls = len(elements) } - } - elements, err = wd.FindElements(selenium.ByXPATH, "//article//a[@data-a-target='preview-card-title-link' and @href]") - if err != nil { - panic(err) - } - - for _, e := range elements { - - attr, err := e.GetAttribute("href") + articles, err := wd.FindElements(selenium.ByXPATH, "//article") if err != nil { log.Println(err) continue } - streamer := &intimate.Streamer{} - matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(attr) - if len(matches) == 2 { - streamer.UserId = matches[1] - } else { - log.Println(attr) - continue + for _, article := range articles { + + e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]") + if err != nil { + log.Println(err) + continue + } + + href, err := e.GetAttribute("href") + if err != nil { + log.Println(err) + continue + } + + btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button") + if err != nil { + log.Println(err) + continue + } + + var tags []string + for _, btn := range btns { + tag, err := btn.GetAttribute("data-a-target") + if err == nil { + tags = append(tags, tag) + } + } + + streamer := &intimate.Streamer{} + + matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href) + if len(matches) == 2 { + streamer.UserId = matches[1] + } else { + log.Println(href) + continue + } + + jtags, err := json.Marshal(tags) + if err != nil { + log.Println(err) + } else { + streamer.Tags = jtags + } + + streamer.Platform = intimate.Ptwitch + + updateUrl := make(map[string]string) + updateUrl["live"] = href + streamer.LiveUrl = sql.NullString{String: href, Valid: true} + data, err := json.Marshal(updateUrl) + if err != nil { + log.Println(err) + continue + } + streamer.UpdateUrl = data + streamer.Operator = 0 + if estore.InsertStreamer(streamer) { + // log.Println("streamer update tags", streamer.Uid, tags) + estore.Update(streamer, "Tags", streamer.Tags) + } } - - streamer.Platform = intimate.Ptwitch - - updateUrl := make(map[string]string) - updateUrl["live"] = attr - streamer.LiveUrl = sql.NullString{String: attr, Valid: true} - data, err := json.Marshal(updateUrl) - if err != nil { - log.Println(err) - continue + log.Println("streamer find", len(articles)) + if len(articles) == 0 { + sourceChannel.Operator = 5 + sstore.UpdateOperator(sourceChannel) } - streamer.UpdateUrl = data - streamer.Operator = 0 - - estore.InsertStreamer(streamer) + counter.AddWithReset(1) } + + wd.Close() + wd.Quit() } diff --git a/utils.go b/utils.go index 64f9efe..ebee59b 100644 --- a/utils.go +++ b/utils.go @@ -3,7 +3,13 @@ package intimate import ( "fmt" "log" + "os" + "os/signal" "runtime" + "strconv" + "strings" + "sync/atomic" + "syscall" "time" "github.com/tebeka/selenium" @@ -22,6 +28,12 @@ func init() { } +// ParseNumber 去逗号解析数字 +func ParseNumber(number string) (int64, error) { + number = strings.ReplaceAll(number, ",", "") + return strconv.ParseInt(number, 10, 64) +} + // ParseDuration time to duration eg: 1:40:00 -> time.Duration func ParseDuration(dt string) (time.Duration, error) { @@ -42,20 +54,44 @@ func ParseDuration(dt string) (time.Duration, error) { tdt, err := time.Parse("15:04:05", string(parse)) if err != nil { - return time.Duration(0), err } return tdt.Sub(zeroTime), nil } func GetChromeDriver(port int) selenium.WebDriver { + var err error caps := selenium.Capabilities{"browserName": "chrome"} + chromecaps := chrome.Capabilities{} - err := chromecaps.AddExtension("/home/eson/test/ssh-key/0.1.2_0.crx") - if err != nil { - panic(err) + for _, epath := range []string{"../../../crx/myblock.crx", "../../crx/myblock.crx"} { + _, err := os.Stat(epath) + if err == nil { + err := chromecaps.AddExtension(epath) + if err != nil { + panic(err) + } + break + } } + + if proxy := os.Getenv("chrome_proxy"); proxy != "" { + log.Println("proxy-server", proxy) + chromecaps.Args = append(chromecaps.Args, "--proxy-server="+proxy) + } + + if proxy := os.Getenv("pac_proxy"); proxy != "" { + log.Println("--proxy-pac-url=" + proxy) + chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url="+proxy) + } + + // chromecaps.Args = append(chromecaps.Args, "--proxy-pac-url=http://127.0.0.1:1081/pac") chromecaps.Args = append(chromecaps.Args, "--disk-cache-dir=/tmp/chromedriver-cache") + chromecaps.Args = append(chromecaps.Args, "--disable-gpu", "--disable-images", "--start-maximized", "--disable-infobars") + // chromecaps.Args = append(chromecaps.Args, "--headless") + chromecaps.Args = append(chromecaps.Args, "--no-sandbox") + chromecaps.Args = append(chromecaps.Args, "--disable-dev-shm-usage", "--mute-audio", "--safebrowsing-disable-auto-update") + chromecaps.ExcludeSwitches = append(chromecaps.ExcludeSwitches, "enable-automation") caps.AddChrome(chromecaps) _, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", port) @@ -63,14 +99,141 @@ func GetChromeDriver(port int) selenium.WebDriver { panic(err) } wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", port)) + if err != nil { + panic(err) + } runtime.SetFinalizer(wd, func(obj interface{}) { - if err := wd.Close(); err != nil { + + if err := obj.(selenium.WebDriver).Close(); err != nil { log.Println(err) } + if err := obj.(selenium.WebDriver).Quit(); err != nil { + log.Println(err) + } + }) wd.ExecuteScript("windows.navigator.webdriver = undefined", nil) if err != nil { panic(err) } + return wd } + +// PerfectShutdown 完美关闭程序 +type PerfectShutdown struct { + loop int32 +} + +// NewPerfectShutdown 创建完美关闭程序 +func NewPerfectShutdown() *PerfectShutdown { + ps := &PerfectShutdown{} + ps.loop = 1 + + go func() { + signalchan := make(chan os.Signal) + signal.Notify(signalchan, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) + log.Println("accept stop command:", <-signalchan) + atomic.StoreInt32(&ps.loop, 0) + }() + + return ps +} + +// IsClose 判断是否要关闭 +func (ps *PerfectShutdown) IsClose() bool { + return atomic.LoadInt32(&ps.loop) == 0 +} + +type Counter struct { + dcount int + count int + maxLimit int + minLimit int + + minobj []interface{} + maxobj []interface{} + maxLimitToDo func(obj ...interface{}) error + minLimitToDo func(obj ...interface{}) error +} + +func NewCounter() *Counter { + c := &Counter{} + return c +} + +// SetDefault 设置默认值 +func (c *Counter) SetDefault(n int) { + c.dcount = n +} + +// Reset 最置count为defaultCount值 +func (c *Counter) Reset() { + c.count = c.dcount +} + +// SetCount 设置count到最大值的时候执行do函数 +func (c *Counter) SetCount(count int) { + c.count = count +} + +// GetCount 设置count到最大值的时候执行do函数 +func (c *Counter) GetCount() int { + return c.count +} + +// SetMinLimit 设置最小限制 +func (c *Counter) SetMinLimit(n int) { + c.minLimit = n +} + +// SetMaxLimit 设置最大限制 +func (c *Counter) SetMaxLimit(n int) { + c.maxLimit = n +} + +// SetMaxToDo 设置count到最大值的时候执行do函数 +func (c *Counter) SetMaxToDo(do func(obj ...interface{}) error, obj ...interface{}) { + c.maxLimitToDo = do + c.maxobj = obj +} + +// SetMinToDo 设置count到最小值的时候执行do函数 +func (c *Counter) SetMinToDo(do func(obj ...interface{}) error, obj ...interface{}) { + c.minLimitToDo = do + c.minobj = obj +} + +// AddWithReset 操作 count 默认值为0, 当触发限制时, 重置为默认值 +func (c *Counter) AddWithReset(n int) error { + c.count += n + if c.maxLimitToDo != nil { + if c.count >= c.maxLimit { + defer c.Reset() + return c.maxLimitToDo(c.maxobj...) + } + } + if c.minLimitToDo != nil { + if c.count <= c.minLimit { + defer c.Reset() + return c.minLimitToDo(c.minobj...) + } + } + return nil +} + +// Add 操作 count 默认值为0 +func (c *Counter) Add(n int) error { + c.count += n + if c.maxLimitToDo != nil { + if c.count >= c.maxLimit { + return c.maxLimitToDo(c.maxobj...) + } + } + if c.minLimitToDo != nil { + if c.count <= c.minLimit { + return c.minLimitToDo(c.minobj...) + } + } + return nil +} diff --git a/xvfb.sh b/xvfb.sh new file mode 100644 index 0000000..d5f6036 --- /dev/null +++ b/xvfb.sh @@ -0,0 +1 @@ +screen -dmS xvfb-99 Xvfb :99 -screen 0 1280x720x24 -ac -nolisten tcp -dpi 96 +extension RANDR -nolisten tcp