diff --git a/extractor/nimo_extractor/nimo_extractor.go b/extractor/nimo_extractor/nimo_extractor.go index f1bb1b5..d823de5 100644 --- a/extractor/nimo_extractor/nimo_extractor.go +++ b/extractor/nimo_extractor/nimo_extractor.go @@ -11,10 +11,10 @@ import ( ) // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STNimo)) +// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STNimo)) -// estore 解析存储连接实例 -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() +// // estore 解析存储连接实例 +// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func main() { Execute() @@ -35,14 +35,18 @@ func Execute() { waitfor := intimate.NewWaitFor(wd) ps := intimate.NewPerfectShutdown() + queue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.PNimo)) + for !ps.IsClose() { - streamer, err := estore.Pop(intimate.PNimo) + istreamer, err := queue.Pop() if err != nil { log.Println(err) - estore.UpdateError(streamer, err) + intimate.TStreamer.UpdateError(istreamer, err) continue } + streamer := istreamer.(*intimate.Streamer) + wd.Get(streamer.LiveUrl.String) // wd.Get("https://www.nimo.tv/live/1253835677") @@ -71,22 +75,25 @@ func Execute() { clog := &intimate.CollectLog{} clog.Platform = intimate.PNimo - clog.Followers = sql.NullInt64{Int64: li.Followers, Valid: true} - clog.Views = sql.NullInt64{Int64: li.Views, Valid: true} - clog.UpdateTime = utime + clog.Followers = &sql.NullInt64{Int64: li.Followers, Valid: true} + clog.Views = &sql.NullInt64{Int64: li.Views, Valid: true} + clog.UpdateTime = &utime clog.StreamerUid = streamer.Uid var sum int64 = 0 for _, v := range li.Gratuity { sum += v } - clog.Gratuity = sql.NullInt64{Int64: sum, Valid: true} + clog.Gratuity = &sql.NullInt64{Int64: sum, Valid: true} - cuid := estore.InsertClog(clog) + cuid, err := intimate.TClog.InsertRetAutoID(clog) + if err != nil { + panic(err) + } - streamer.Channel = sql.NullString{String: li.Channel, Valid: true} + streamer.Channel = &sql.NullString{String: li.Channel, Valid: true} streamer.LatestLogUid = cuid - streamer.UpdateTime = utime + streamer.UpdateTime = &utime streamer.Operator = 0 switch { @@ -102,7 +109,11 @@ func Execute() { streamer.UpdateInterval = 60 } - estore.Update(streamer, "update_interval", streamer.UpdateInterval, "operator", streamer.Operator, "channel", streamer.Channel, "latest_log_uid", streamer.LatestLogUid, "update_time", streamer.UpdateTime) + // estore.Update(streamer, "update_interval", streamer.UpdateInterval, "operator", streamer.Operator, "channel", streamer.Channel, "latest_log_uid", streamer.LatestLogUid, "update_time", streamer.UpdateTime) + err = intimate.TStreamer.Update(streamer) + if err != nil { + panic(err) + } count++ if count >= countlimit { diff --git a/extractor/twitch_extractor/tiwtch_extractor.go b/extractor/twitch_extractor/tiwtch_extractor.go index ce65608..8a00fcd 100644 --- a/extractor/twitch_extractor/tiwtch_extractor.go +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -123,7 +123,6 @@ func main() { } streamer := &intimate.Streamer{} - matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href) if len(matches) == 2 { mc := matches[1] diff --git a/supervisor_conf/openrec_task2.conf b/supervisor_conf/openrec_task2.conf deleted file mode 100644 index 1da544f..0000000 --- a/supervisor_conf/openrec_task2.conf +++ /dev/null @@ -1,12 +0,0 @@ -[supervisord] -nodaemon=true - -[program:openrec_source] -directory = MYPATH/bin/openrec_task2/ -command= MYPATH/bin/openrec_task2/openrec_task2 -process_name=%(program_name)s_%(process_num)02d ;多进程名称 -numprocs=4 ;启动多个进程 -autorestart=true -stderr_logfile=MYPATH/bin/openrec_task2/log -stderr_logfile_maxbytes=0 -stopsignal=QUIT diff --git a/supervisor_conf/twitch_extractor.conf b/supervisor_conf/twitch_extractor.conf index 61b82ca..051978a 100644 --- a/supervisor_conf/twitch_extractor.conf +++ b/supervisor_conf/twitch_extractor.conf @@ -1,5 +1,5 @@ [supervisord] -nodaemon=false +nodaemon=true [program:twitch_extractor] environment=DISPLAY=":99" diff --git a/supervisor_conf/twitch_extractor_p1.conf b/supervisor_conf/twitch_extractor_p1.conf_eg similarity index 100% rename from supervisor_conf/twitch_extractor_p1.conf rename to supervisor_conf/twitch_extractor_p1.conf_eg diff --git a/supervisor_conf/twitch_extractor_p2.conf b/supervisor_conf/twitch_extractor_p2.conf deleted file mode 100644 index 4c47d15..0000000 --- a/supervisor_conf/twitch_extractor_p2.conf +++ /dev/null @@ -1,13 +0,0 @@ -[supervisord] -nodaemon=false - -[program:twitch_extractor_p2] -environment=DISPLAY=":99",pac_proxy=http://localhost:1090/pac1 -directory = MYPATH/bin/twitch_extractor -command= MYPATH/bin/twitch_extractor/twitch_extractor -process_name=%(program_name)s_%(process_num)02d ;多进程名称 -numprocs=2 ;启动多个进程 -autorestart=true -stderr_logfile=MYPATH/bin/twitch_extractor/log -stderr_logfile_maxbytes=0 -stopsignal=QUIT diff --git a/supervisor_conf/twitch_task1.conf b/supervisor_conf/twitch_task1.conf new file mode 100644 index 0000000..17a70f6 --- /dev/null +++ b/supervisor_conf/twitch_task1.conf @@ -0,0 +1,13 @@ +[supervisord] +nodaemon=false + +[program:twitch_task1] +environment=DISPLAY=":99" +directory = MYPATH/bin/twitch_task1 +command= MYPATH/bin/twitch_task1/twitch_task1 +# process_name=%(program_name)s_%(process_num)02d ;多进程名称 +# numprocs=1 ;启动多个进程 +autorestart=true +stderr_logfile=MYPATH/bin/twitch_task1/log +stderr_logfile_maxbytes=0 +stopsignal=QUIT diff --git a/supervisor_conf/twitch_task2.conf b/supervisor_conf/twitch_task2.conf deleted file mode 100644 index 8a12047..0000000 --- a/supervisor_conf/twitch_task2.conf +++ /dev/null @@ -1,13 +0,0 @@ -[supervisord] -nodaemon=false - -[program:twitch_task2] -environment=DISPLAY=":99" -directory = MYPATH/bin/twitch_task2 -command= MYPATH/bin/twitch_task2/twitch_task2 -process_name=%(program_name)s_%(process_num)02d ;多进程名称 -numprocs=6 ;启动多个进程 -autorestart=true -stderr_logfile=MYPATH/bin/twitch_task2/log -stderr_logfile_maxbytes=0 -stopsignal=QUIT diff --git a/tasks/nimo/nimo_task1/nimo_task1.go b/tasks/nimo/nimo_task1/nimo_task1.go index a5def5c..e1382c0 100644 --- a/tasks/nimo/nimo_task1/nimo_task1.go +++ b/tasks/nimo/nimo_task1/nimo_task1.go @@ -12,7 +12,7 @@ import ( ) // estore 解析存储连接实例 -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() +// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() // Execute 执行 func Execute() { @@ -95,7 +95,10 @@ func Execute() { } streamer.UpdateInterval = 120 - estore.InsertStreamer(streamer) + err = intimate.TStreamer.Insert(streamer) + if err != nil { + panic(err) + } } else { log.Println("userid is null.", room.String()) diff --git a/tasks/twitcasting/twitcasting_task1/twitcasting.go b/tasks/twitcasting/twitcasting_task1/twitcasting.go index 44a733f..efb9ea1 100644 --- a/tasks/twitcasting/twitcasting_task1/twitcasting.go +++ b/tasks/twitcasting/twitcasting_task1/twitcasting.go @@ -70,11 +70,11 @@ func Execute() { if ok := queuedict[wurl]; !ok { log.Println(wurl) sl := &intimate.StreamerList{} - sl.Platform = intimate.Ptwitcasting + sl.Platform = string(intimate.Ptwitcasting) sl.Url = wurl sl.Operator = 0 sl.UpdateInterval = 120 - sl.UpdateTime = time.Now() + sl.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} sl.UrlHash = intimate.GetUrlHash(sl.Url) intimate.TStreamerList.Insert(sl) @@ -104,11 +104,11 @@ func Execute() { sp.TagUrl[i] = wurl if ok := queuedict[wurl]; !ok { sl := &intimate.StreamerList{} - sl.Platform = intimate.Ptwitcasting + sl.Platform = string(intimate.Ptwitcasting) sl.Url = wurl sl.Operator = 0 sl.UpdateInterval = 120 - sl.UpdateTime = time.Now() + sl.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} sl.UrlHash = intimate.GetUrlHash(sl.Url) intimate.TStreamerList.Insert(sl) diff --git a/tasks/twitch/twitch_task2/.gitignore b/tasks/twitch/twitch_task2/.gitignore deleted file mode 100644 index 846a6b4..0000000 --- a/tasks/twitch/twitch_task2/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -twitch_task2 -log \ No newline at end of file diff --git a/tasks/twitch/twitch_task2/main.go b/tasks/twitch/twitch_task2/main.go deleted file mode 100644 index e86b640..0000000 --- a/tasks/twitch/twitch_task2/main.go +++ /dev/null @@ -1,6 +0,0 @@ -package main - -func main() { - - Execute() -} diff --git a/tasks/twitch/twitch_task2/task_twitch.go b/tasks/twitch/twitch_task2/task_twitch.go deleted file mode 100644 index 6f3d399..0000000 --- a/tasks/twitch/twitch_task2/task_twitch.go +++ /dev/null @@ -1,175 +0,0 @@ -package main - -import ( - "database/sql" - "encoding/json" - "intimate" - "log" - "regexp" - "time" - - "github.com/tebeka/selenium" -) - -// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) - -// // estore 解析存储连接实例 -// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() - -// 获取类型的所有频道链接 - -// Execute 执行任务 -func Execute() { - // DELETE FROM source_twitch WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, source FROM source_twitch ) s GROUP BY s.source) ; - //article//a[@data-a-target='preview-card-title-link'] - - wd := intimate.GetChromeDriver(3030) - defer wd.Quit() - ps := intimate.NewPerfectShutdown() - counter := intimate.NewCounter() - counter.SetMaxLimit(100) - counter.SetMaxToDo(func(olist ...interface{}) error { - owd := olist[0].(*selenium.WebDriver) - if err := (*owd).Quit(); err != nil { - log.Println(err) - } - *owd = intimate.GetChromeDriver(3030) - return nil - }, &wd) - - for !ps.IsClose() { - - var err error - sourceChannel, err := sstore.Pop(intimate.TTwitchChannel) - if err != nil { - panic(err) - } - - weburl := sourceChannel.Source.String + "?sort=VIEWER_COUNT" - err = wd.Get(weburl) - if err != nil { - log.Println(err) - sstore.UpdateError(sourceChannel, err) - time.Sleep(time.Second * 10) - continue - } - - wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) { - _, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]") - if err != nil { - return false, err - } - return true, nil - }, time.Second*10) - - btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") - if err != nil { - log.Println(err) - continue - } - btn.Click() - - var elements []selenium.WebElement - var liveurls = 0 - var delayerror = 2 - for i := 0; i < 200 && !ps.IsClose(); i++ { - elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]") - if err != nil { - log.Println(err) - break - } - time.Sleep(time.Millisecond * 200) - wd.KeyDown(selenium.EndKey) - time.Sleep(time.Millisecond * 200) - wd.KeyUp(selenium.EndKey) - time.Sleep(time.Millisecond * 2000) - if len(elements) == liveurls { - delayerror-- - if delayerror <= 0 { - break - } - } else { - delayerror = 2 - } - liveurls = len(elements) - } - articles, err := wd.FindElements(selenium.ByXPATH, "//article") - if err != nil { - log.Println(err) - continue - } - - for _, article := range articles { - - e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]") - if err != nil { - log.Println(err) - continue - } - - href, err := e.GetAttribute("href") - if err != nil { - log.Println(err) - continue - } - - btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button") - if err != nil { - log.Println(err) - continue - } - - var tags []string - for _, btn := range btns { - tag, err := btn.GetAttribute("data-a-target") - if err == nil { - tags = append(tags, tag) - } - } - - streamer := &intimate.Streamer{} - - matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href) - if len(matches) == 2 { - mc := matches[1] - streamer.UserId = &mc - } else { - log.Println(href) - continue - } - - jtags, err := json.Marshal(tags) - if err != nil { - log.Println(err) - } else { - streamer.Tags = jtags - } - - streamer.Platform = intimate.Ptwitch - - updateUrl := make(map[string]string) - updateUrl["live"] = href - streamer.LiveUrl = &sql.NullString{String: href, Valid: true} - data, err := json.Marshal(updateUrl) - if err != nil { - log.Println(err) - continue - } - streamer.UpdateUrl = data - streamer.Operator = 0 - if estore.InsertStreamer(streamer) { - // log.Println("streamer update tags", streamer.Uid, tags) - if streamer.Tags != nil { - estore.Update(streamer, "Tags", streamer.Tags) - } - } - } - log.Println("streamer find", len(articles)) - if len(articles) == 0 { - sourceChannel.Operator = 5 - sstore.UpdateOperator(sourceChannel) - } - counter.AddWithReset(1) - } -} diff --git a/tasks/twitch/twitch_task2/task_twitch_test.go b/tasks/twitch/twitch_task2/task_twitch_test.go deleted file mode 100644 index ef64976..0000000 --- a/tasks/twitch/twitch_task2/task_twitch_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "testing" - -func TestMain(t *testing.T) { - main() -}