diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 0e0f738..6052389 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -60,7 +60,7 @@ func (oe *OpenrecExtractor) Execute() { // log.Println("1000次执行, gc 重新建立sql链接") // } - source, err := sstore.Pop(string(intimate.TTOpenrecUser), 0) + source, err := sstore.Pop(intimate.TOpenrecUser, 0) if err != nil { if err != lasterr { log.Println(err, lasterr) @@ -78,7 +78,7 @@ func (oe *OpenrecExtractor) Execute() { streamer := &intimate.Streamer{} streamer.UserId = userId - streamer.Platform = string(intimate.Popenrec) + streamer.Platform = intimate.Popenrec htmlUser := datamap["html_user"] oe.user = intimate.NewExtractorSource(&htmlUser) diff --git a/extractor_field.go b/extractor_field.go index f959eaa..0f94cd6 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -12,9 +12,9 @@ type GetSet struct { } type Streamer struct { - Uid int64 // - Platform string // - UserId string // + Uid int64 // + Platform Platform // + UserId string // UserName sql.NullString // LiveUrl sql.NullString // diff --git a/source_field.go b/source_field.go index 1236c8c..50a682f 100644 --- a/source_field.go +++ b/source_field.go @@ -19,8 +19,8 @@ type Source struct { UpdateTime sql.NullTime // ErrorMsg sql.NullString // - TargetType string // - Operator int32 // + Target Target // + Operator int32 // LastOperator int32 } diff --git a/sql/intimate_source.sql b/sql/intimate_source.sql index cf49f79..4fce51b 100644 --- a/sql/intimate_source.sql +++ b/sql/intimate_source.sql @@ -12,6 +12,26 @@ CREATE TABLE IF NOT EXISTS `source_openrec` ( `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据', `error_msg` text DEFAULT NULL COMMENT '错误信息', + `target_type` varchar(64) NOT NULL COMMENT '目标类型', + `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', + PRIMARY KEY(`uid`), + KEY `operator_idx` (`operator`), + KEY `update_time_idx` (`update_time`), + KEY `target_type_idx` (`target_type`) + ); + + + CREATE TABLE IF NOT EXISTS `source_twitch` ( + uid bigint AUTO_INCREMENT COMMENT '自增UID', + + `streamer_id` bigint DEFAULT NULL COMMENT 'streamer uid, 关联主播', + `url` text NOT NULL COMMENT '获取源数据地址', + `source` longtext DEFAULT NULL COMMENT '源数据', + `ext` json DEFAULT NULL COMMENT '扩展字段', + `serialize` blob DEFAULT NULL COMMENT '需要给下个任务传递 序列花数据, 非必要不用', + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据', + `error_msg` text DEFAULT NULL COMMENT '错误信息', + `target_type` varchar(64) NOT NULL COMMENT '目标类型', `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', PRIMARY KEY(`uid`), diff --git a/store.go b/store.go index 609534e..e3e1625 100644 --- a/store.go +++ b/store.go @@ -79,7 +79,7 @@ func (store *StoreSource) errorAlarm(err error) { // Insert 插入数据 func (store *StoreSource) Insert(isource IGet) { - _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId")) + _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("Target"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId")) if err != nil { panic(err) } @@ -122,13 +122,13 @@ func (store *StoreSource) Restore(isource IGet) { } // Pop 弹出一条未处理的数据 -func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, error) { +func (store *StoreSource) Pop(targetType Target, operators ...int32) (*Source, error) { tx, err := store.db.Begin() if err != nil { return nil, err } - var args = []interface{}{targetType} + var args = []interface{}{string(targetType)} selectSQL := `select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store.table + ` where target_type = ?` if len(operators) == 0 { selectSQL += " and operator = ?" @@ -157,7 +157,7 @@ func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, e s := &Source{} // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId) + err = row.Scan(&s.Uid, &s.Url, &s.Target, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId) if err != nil { return nil, err } @@ -214,13 +214,13 @@ func NewStoreExtractor() *StoreExtractor { } // Pop 弹出一条未处理的数据 -func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer, error) { +func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Streamer, error) { tx, err := store.db.Begin() if err != nil { return nil, err } - var args = []interface{}{platform} + var args = []interface{}{string(platform)} selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` if len(operators) == 0 { selectSQL += " and operator = ?" diff --git a/table_list.go b/table_list.go index 2a9fa5b..04002dc 100644 --- a/table_list.go +++ b/table_list.go @@ -6,5 +6,7 @@ type SourceTable string const ( // STOpenrec openrec源table名称 STOpenrec SourceTable = "source_openrec" -) + // STTwitch twitch源table名称 + STTwitch SourceTable = "source_twitch" +) diff --git a/target_type_list.go b/target_type_list.go index 8869ec3..e21a40a 100644 --- a/target_type_list.go +++ b/target_type_list.go @@ -1,12 +1,15 @@ package intimate -// TargetType 源的 目标类型 列表 -type TargetType string +// Target 源的 目标类型 列表 +type Target string const ( - // TTOpenrecRanking openrec源TargetType名称 - TTOpenrecRanking TargetType = "openrec_ranking" + // TOpenrecRanking 获取排名 Target名称 + TOpenrecRanking Target = "openrec_ranking" - // TTOpenrecUser openrec源TargetType名称 - TTOpenrecUser TargetType = "openrec_user" + // TOpenrecUser 获取用户列表 源Target名称 + TOpenrecUser Target = "openrec_user" + + // TTwitchChannel twitch 获取类别操作目标 + TTwitchChannel Target = "twitch_channel" ) diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index acff785..feeeb60 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -72,8 +72,8 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { content := resp.Content() if len(content) <= 200 { // 末页退出 finishpoint := time.Now() - log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*60)) - for time.Now().Sub(finishpoint) < time.Minute*60 { + log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*120)) + for time.Now().Sub(finishpoint) < time.Minute*120 { time.Sleep(time.Second) if atomic.LoadInt32(&loop) > 0 { return @@ -102,7 +102,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { streamer := &intimate.Streamer{} streamer.UserId = userid - streamer.Platform = string(intimate.Popenrec) + streamer.Platform = intimate.Popenrec updateUrl := make(map[string]interface{}) diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 74526e0..76ea4c8 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -51,7 +51,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { for atomic.LoadInt32(&loop) > 0 { - streamer, err := estore.Pop(string(intimate.Popenrec)) + streamer, err := estore.Pop(intimate.Popenrec) if streamer == nil || err != nil { if err != lasterr { @@ -167,7 +167,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { streamer.Operator = int32(intimate.OperatorOK) source := &intimate.Source{} - source.TargetType = string(intimate.TTOpenrecUser) + source.Target = intimate.TOpenrecUser source.Ext = string(extJsonBytes) source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} sstore.Insert(source) diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index 06ab7d0..2fe5560 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -1 +1,97 @@ package main + +import ( + "database/sql" + "fmt" + "intimate" + "log" + "time" + + "github.com/tebeka/selenium" + "github.com/tebeka/selenium/chrome" +) + +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() + +// 获取类型的所有频道链接 + +// ChannelLink 频道链接 +type ChannelLink struct { +} + +// Execute 执行任务 +func (cl *ChannelLink) Execute() { + caps := selenium.Capabilities{"browserName": "chrome"} + chromecaps := chrome.Capabilities{} + err := chromecaps.AddExtension("/home/eson/test/ssh-key/0.1.2_0.crx") + if err != nil { + panic(err) + } + chromecaps.Args = append(chromecaps.Args, "--disk-cache-dir=/home/eson/test/ssh-key/cache") + chromecaps.ExcludeSwitches = append(chromecaps.ExcludeSwitches, "enable-automation") + caps.AddChrome(chromecaps) + _, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", 3030) + if err != nil { + panic(err) + } + wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", 3030)) + defer func() { + if err := wd.Close(); err != nil { + log.Println(err) + } + }() + wd.ExecuteScript("windows.navigator.webdriver = undefined", nil) + if err != nil { + panic(err) + } + + weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" + + err = wd.Get(weburl) + if err != nil { + panic(err) + } + + cardCondition := func(wd selenium.WebDriver) (bool, error) { + elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") + if err != nil { + return false, err + } + return len(elements) > 0, nil + } + wd.WaitWithTimeout(cardCondition, time.Second*30) + time.Sleep(time.Second) + + e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") + if err != nil { + panic(err) + } + e.Click() + + for i := 0; i <= 200; i++ { + wd.KeyDown(selenium.EndKey) + time.Sleep(time.Second * 3) + } + + elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") + if err != nil { + panic(err) + } + for _, ele := range elements { + href, err := ele.GetAttribute("href") + if err != nil { + log.Println(err) + } + log.Println(href) // TODO: Save href + source := &intimate.Source{} + source.Source = sql.NullString{String: href, Valid: true} + source.Operator = 0 + source.Target = intimate.TTwitchChannel + source.Url = weburl + sstore.Insert(source) + } +} diff --git a/tasks/twitch/twitch_task1/task_twitch_test.go b/tasks/twitch/twitch_task1/task_twitch_test.go index 7c59497..71666a7 100644 --- a/tasks/twitch/twitch_task1/task_twitch_test.go +++ b/tasks/twitch/twitch_task1/task_twitch_test.go @@ -1,31 +1,10 @@ package main import ( - "fmt" "testing" - - "github.com/tebeka/selenium" - "github.com/tebeka/selenium/chrome" ) func TestCase1(t *testing.T) { - caps := selenium.Capabilities{"browserName": "chrome"} - chromecaps := chrome.Capabilities{} - err := chromecaps.AddExtension("/home/eson/test/ssh-key/0.1.2_0.crx") - if err != nil { - panic(err) - } - caps.AddChrome(chromecaps) - _, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", 3030) - if err != nil { - panic(err) - } - wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", 3030)) - if err != nil { - panic(err) - } - err = wd.Get("https://www.twitch.tv/directory/all") - if err != nil { - panic(err) - } + e := ChannelLink{} + e.Execute() }