diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 41cf76d..b8a8943 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -5,13 +5,9 @@ import ( "encoding/json" "intimate" "log" - "os" - "os/signal" "regexp" "strconv" "strings" - "sync/atomic" - "syscall" "time" "github.com/tidwall/gjson" @@ -29,17 +25,10 @@ type OpenrecExtractor struct { func (oe *OpenrecExtractor) Execute() { - var loop int32 = 1 - - go func() { - signalchan := make(chan os.Signal) - signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) - log.Println("accept stop command:", <-signalchan) - atomic.StoreInt32(&loop, 0) - }() + ps := intimate.NewPerfectShutdown() var lasterr error = nil - for atomic.LoadInt32(&loop) > 0 { + for !ps.IsClose() { var err error source, err := sstore.Pop(intimate.TOpenrecUser, 0) @@ -60,7 +49,7 @@ func (oe *OpenrecExtractor) Execute() { streamer := &intimate.Streamer{} streamer.UserId = userId - streamer.Platform = intimate.Popenrec + // streamer.Platform = intimate.Popenrec htmlUser := datamap["html_user"] oe.user = intimate.NewExtractorSource(&htmlUser) @@ -94,10 +83,20 @@ func (oe *OpenrecExtractor) Execute() { logUid := estore.InsertClog(clog) LiveUrl := "https://www.openrec.tv/live/" + userId + streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} streamer.LatestLogUid = logUid - streamer.Operator = 0 - estore.UpdateStreamer(streamer) + // streamer.Operator = 0 + + log.Println(streamer.UserId) + estore.Update(streamer, + "user_name", streamer.UserName, + "user_id", streamer.UserId, + "live_url", streamer.LiveUrl, + "latest_log_uid", streamer.LatestLogUid, + "update_time", streamer.UpdateTime, + "tags", streamer.Tags, + ) source.Operator = int32(intimate.OperatorExtractorOK) sstore.UpdateOperator(source) diff --git a/go.mod b/go.mod index fd195ef..36800bf 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,9 @@ go 1.14 require ( github.com/474420502/extractor v0.7.2 github.com/474420502/focus v0.12.0 - github.com/474420502/gcurl v0.1.2 + github.com/474420502/gcurl v0.2.0 github.com/474420502/hunter v0.3.4 - github.com/474420502/requests v1.6.0 + github.com/474420502/requests v1.7.0 github.com/go-sql-driver/mysql v1.5.0 github.com/lestrrat-go/libxml2 v0.0.0-20200215080510-6483566f52cb github.com/tebeka/selenium v0.9.9 diff --git a/go.sum b/go.sum index dac54a5..5ed6621 100644 --- a/go.sum +++ b/go.sum @@ -8,12 +8,16 @@ github.com/474420502/focus v0.12.0 h1:+icbmj7IEOefvTegHt5EpcHt6WFbe2miIrceUJx2Ev github.com/474420502/focus v0.12.0/go.mod h1:d0PMjtMxFz1a9HIhwyFPkWa+JF+0LgOrEUfd8iZka6s= github.com/474420502/gcurl v0.1.2 h1:ON9Yz3IgAdtDlFlHfkAJ3aIEBDxH0RiViPE5ST5ohKg= github.com/474420502/gcurl v0.1.2/go.mod h1:hws5q/Ao64bXLLDnldz9VyTQUndTWc/i5DzdEazFfoM= +github.com/474420502/gcurl v0.2.0 h1:m6+vw4NX4f5Tfp7c3nuaIgHUE/7zTX6K3xK+pTCBoCo= +github.com/474420502/gcurl v0.2.0/go.mod h1:kJZDbgXn5wbAaR+hhBi4Sbw44P4igJ7qYXC6mejLuhQ= github.com/474420502/htmlquery v1.2.4-0.20200812072201-e871dd09247a h1:E1T6CYQKsUn7fMvNbeKfISjBLfOJjZX4KpWwStT20Kc= github.com/474420502/htmlquery v1.2.4-0.20200812072201-e871dd09247a/go.mod h1:AoSN890esHwNKecV0tCs+W0ele1xgFL1Jqk6UcrdxgU= github.com/474420502/hunter v0.3.4 h1:fyLAgI84jWe3IcqsISC53j1w3CXI1FERxX//Potns0M= github.com/474420502/hunter v0.3.4/go.mod h1:pe4Xr/I+2agvq339vS/OZV+EiHAWtpXQs75rioSW9oA= github.com/474420502/requests v1.6.0 h1:f4h4j40eT0P5whhg9LdkotD8CaKjtuDu/vz9iSUkCgY= github.com/474420502/requests v1.6.0/go.mod h1:SLXrQ5dL9c7dkIeKNUCBAjOIt3J9KFCS2RQjWJecNwo= +github.com/474420502/requests v1.7.0 h1:oaBwVrxZ7yZ+hDOKwHm2NflYib2y1geIUxBxQ2U48mw= +github.com/474420502/requests v1.7.0/go.mod h1:SLXrQ5dL9c7dkIeKNUCBAjOIt3J9KFCS2RQjWJecNwo= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 h1:1BDTz0u9nC3//pOCMdNH+CiXJVYJh5UQNCOBG7jbELc= diff --git a/store.go b/store.go index da79aa2..82918ad 100644 --- a/store.go +++ b/store.go @@ -382,7 +382,8 @@ func (store *StoreExtractor) UpdateOperator(isource IGet) { // UpdateStreamer Streamer表, 插入数据 func (store *StoreExtractor) UpdateStreamer(streamer IGet) { - _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, tags = ?, ext = ?, operator = ?, update_time = ?, update_interval = ? WHERE uid = ?;", + // log.Printf("UPDATE "+StreamerTable+" SET user_name = %v, live_url = %v, channel = %v, latest_log_uid = %v, tags = %v, ext = %v, operator = %v, update_time = %v, update_interval = %v WHERE uid = %v", streamer.Get("UserName"), streamer.Get("LiveUrl"), streamer.Get("Channel"), streamer.Get("LatestLogUid"), streamer.Get("Tags"), streamer.Get("Ext"), streamer.Get("Operator"), streamer.Get("UpdateTime"), streamer.Get("UpdateInterval"), streamer.Get("Uid")) + _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, tags = ?, ext = ?, operator = ?, update_time = ?, update_interval = ? WHERE uid = ?", streamer.Get("UserName"), streamer.Get("LiveUrl"), streamer.Get("Channel"), streamer.Get("LatestLogUid"), streamer.Get("Tags"), streamer.Get("Ext"), streamer.Get("Operator"), streamer.Get("UpdateTime"), streamer.Get("UpdateInterval"), streamer.Get("Uid")) if err != nil { panic(err) @@ -396,12 +397,14 @@ func (store *StoreExtractor) Update(streamer IGet, fieldvalues ...interface{}) { for i := 0; i < len(fieldvalues); i += 2 { field := fieldvalues[i] values = append(values, fieldvalues[i+1]) - updateSQL += field.(string) + " = ? " + updateSQL += field.(string) + " = ?," } + updateSQL = updateSQL[0 : len(updateSQL)-1] updateSQL += "WHERE uid = ?" values = append(values, streamer.Get("Uid")) _, err := store.db.Exec(updateSQL, values...) if err != nil { + log.Println(updateSQL) panic(err) } } diff --git a/tasks/openrec/openrec_task1/main.go b/tasks/openrec/openrec_task1/main.go index a8f96c1..736ef31 100644 --- a/tasks/openrec/openrec_task1/main.go +++ b/tasks/openrec/openrec_task1/main.go @@ -1,8 +1,5 @@ package main -import "github.com/474420502/hunter" - func main() { - ht := hunter.NewHunter(openrecRanking) - ht.Execute() + Execute() } diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index 124be92..7a8774a 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -4,29 +4,24 @@ import ( "encoding/json" "intimate" "log" - "os" - "os/signal" "strconv" - "sync/atomic" - "syscall" "time" - "github.com/474420502/hunter" + "github.com/474420502/gcurl" "github.com/tidwall/gjson" ) -var openrecRanking *OpenrecRanking - // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) // estore 解析存储连接实例 var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() -func init() { +// Execute 执行方法 +func Execute() { - openrecRanking = &OpenrecRanking{} - openrecRanking.PreCurlUrl = `curl 'https://public.openrec.tv/external/api/v5/channel-ranks?period=monthly&date=&tag=&page=1' \ + ps := intimate.NewPerfectShutdown() + turl := `curl 'https://public.openrec.tv/external/api/v5/channel-ranks?period=monthly&date=&tag=&page=1' \ -H 'authority: public.openrec.tv' \ -H 'accept: application/json, text/javascript, */*; q=0.01' \ -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36' \ @@ -39,43 +34,25 @@ func init() { -H 'if-none-match: W/"25edb-aUYBdmLqZcr6DW4ZWKX9r2aqolg"' \ --compressed` -} + g := gcurl.ParseRawCURL(turl) + tp := g.Temporary() -// OpenrecRanking 获取排名任务 -type OpenrecRanking struct { - hunter.PreCurlUrl -} + for !ps.IsClose() { -// Execute 执行方法 -func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { - - var loop int32 = 1 - - go func() { - signalchan := make(chan os.Signal) - signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) - log.Println("accept stop command:", <-signalchan) - atomic.StoreInt32(&loop, 0) - }() - - for atomic.LoadInt32(&loop) > 0 { - - resp, err := cxt.Hunt() + resp, err := tp.Execute() if err != nil { log.Println(err) time.Sleep(time.Second * 2) continue } - tp := cxt.Temporary() - content := resp.Content() if len(content) <= 200 { //末页时没有内容返回, 末页退出 finishpoint := time.Now() log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*120)) for time.Now().Sub(finishpoint) < time.Minute*120 { time.Sleep(time.Second) - if atomic.LoadInt32(&loop) <= 0 { + if ps.IsClose() { return } } @@ -93,13 +70,6 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { for _, User := range result.Array() { userid := User.Get("channel.id").String() - - // data := &intimate.Source{} - // data.Source = sql.NullString{String: userid, Valid: len(userid) > 0} - // data.Url = tp.GetRawURL() - // data.TargetType = string(intimate.TTOpenrecUser) - // sstore.Insert(data) - streamer := &intimate.Streamer{} streamer.UserId = userid streamer.Platform = intimate.Popenrec @@ -123,17 +93,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { } // 修改url query 参数的page递增. 遍历所有页面 - querys := tp.GetQuery() - page, err := strconv.Atoi(querys.Get("page")) - if err != nil { - log.Println(err) - return - } - - page++ - querys.Set("page", strconv.Itoa(page)) - tp.SetQuery(querys) - + tp.QueryParam("page").IntAdd(1) time.Sleep(time.Second * 1) } } diff --git a/tasks/openrec/openrec_task1/task_openrec_test.go b/tasks/openrec/openrec_task1/task_openrec_test.go index 7190d66..db355eb 100644 --- a/tasks/openrec/openrec_task1/task_openrec_test.go +++ b/tasks/openrec/openrec_task1/task_openrec_test.go @@ -63,7 +63,6 @@ func TestTimeAdd(t *testing.T) { t.Error(time.Now().Sub(finishpoint) > time.Second*1) } -func TestRankingInsert(t *testing.T) { - ht := hunter.NewHunter(openrecRanking) - ht.Execute() +func TestMain(t *testing.T) { + main() } diff --git a/tasks/openrec/openrec_task2/main.go b/tasks/openrec/openrec_task2/main.go index 4043048..736ef31 100644 --- a/tasks/openrec/openrec_task2/main.go +++ b/tasks/openrec/openrec_task2/main.go @@ -1,8 +1,5 @@ package main -import "github.com/474420502/hunter" - func main() { - ht := hunter.NewHunter(oer) - ht.Execute() + Execute() } diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 9126949..091944f 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -5,21 +5,13 @@ import ( "encoding/json" "intimate" "log" - "os" - "os/signal" - "strconv" - "sync/atomic" - "syscall" "time" "github.com/474420502/gcurl" + "github.com/474420502/requests" "github.com/tidwall/gjson" - - "github.com/474420502/hunter" ) -var oer *OpenrecExtratorRanking - // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) @@ -27,29 +19,18 @@ var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpe var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func init() { - oer = &OpenrecExtratorRanking{} -} -// OpenrecExtratorRanking 获取用户信息 -type OpenrecExtratorRanking struct { - // Store *intimate.Store } // Execute 执行方法 -func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { +func Execute() { - var loop int32 = 1 - - go func() { - signalchan := make(chan os.Signal) - signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) - log.Println("accept stop command:", <-signalchan) - atomic.StoreInt32(&loop, 0) - }() + ps := intimate.NewPerfectShutdown() + ses := requests.NewSession() var lasterr error = nil - for atomic.LoadInt32(&loop) > 0 { + for !ps.IsClose() { streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 @@ -74,7 +55,8 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { // Check Userid userUrl := updateUrl["user"] - tp := cxt.Session().Get(userUrl) // 获取user url页面数据 + log.Println(userUrl) + tp := ses.Get(userUrl) // 获取user url页面数据 resp, err := tp.Execute() streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} @@ -84,7 +66,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { continue } - cookies := cxt.Session().GetCookies(tp.GetParsedURL()) + cookies := ses.GetCookies(tp.GetParsedURL()) scurl := updateUrl["supporters"] //获取打赏者的数据 curl := gcurl.ParseRawCURL(scurl) @@ -128,16 +110,17 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { } supporters = append(supporters, string(resp.Content())) - page := supportersQuery.Get("page_number") // page_number 加1 - pageint, err := strconv.Atoi(page) - if err != nil { - log.Println(err) - break - } - pageint++ - page = strconv.Itoa(pageint) - supportersQuery.Set("page_number", page) - temporary.SetQuery(supportersQuery) + temporary.QueryParam("page_number").IntAdd(1) + // page := supportersQuery.Get("page_number") // page_number 加1 + // pageint, err := strconv.Atoi(page) + // if err != nil { + // log.Println(err) + // break + // } + // pageint++ + // page = strconv.Itoa(pageint) + // supportersQuery.Set("page_number", page) + // temporary.SetQuery(supportersQuery) } // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) @@ -147,7 +130,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { ext["html_user"] = string(resp.Content()) liveUrl := updateUrl["live"] - tp = cxt.Session().Get(liveUrl) + tp = ses.Get(liveUrl) resp, err = tp.Execute() if err != nil { log.Println(err) @@ -164,7 +147,10 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { continue } - streamer.Operator = int32(intimate.OperatorOK) + // streamer.Platform = intimate.Popenrec + streamer.UpdateInterval = 120 + streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} + streamer.Operator = 0 source := &intimate.Source{} source.Target = intimate.TOpenrecUser @@ -172,7 +158,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} sstore.Insert(source) - estore.UpdateOperator(streamer) + estore.UpdateStreamer(streamer) } } diff --git a/tasks/openrec/openrec_task2/task_openrec_test.go b/tasks/openrec/openrec_task2/task_openrec_test.go index d29684f..80820ef 100644 --- a/tasks/openrec/openrec_task2/task_openrec_test.go +++ b/tasks/openrec/openrec_task2/task_openrec_test.go @@ -2,11 +2,8 @@ package main import ( "testing" - - "github.com/474420502/hunter" ) -func TestOpenrecUser(t *testing.T) { - ht := hunter.NewHunter(oer) - ht.Execute() +func TestMain(t *testing.T) { + main() } diff --git a/tasks/twitch/twitch_task1/main.go b/tasks/twitch/twitch_task1/main.go index b0019dd..736ef31 100644 --- a/tasks/twitch/twitch_task1/main.go +++ b/tasks/twitch/twitch_task1/main.go @@ -1,6 +1,5 @@ package main func main() { - e := ChannelLink{} - e.Execute() + Execute() } diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index d98b28b..03fcaf2 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -17,12 +17,8 @@ var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() // 获取类型的所有频道链接 -// ChannelLink 频道链接 -type ChannelLink struct { -} - // Execute 执行任务 -func (cl *ChannelLink) Execute() { +func Execute() { var err error wd := intimate.GetChromeDriver(3030) ps := intimate.NewPerfectShutdown() diff --git a/tasks/twitch/twitch_task1/task_twitch_test.go b/tasks/twitch/twitch_task1/task_twitch_test.go index e61dc1e..6ee9af6 100644 --- a/tasks/twitch/twitch_task1/task_twitch_test.go +++ b/tasks/twitch/twitch_task1/task_twitch_test.go @@ -4,9 +4,8 @@ import ( "testing" ) -func TestCase1(t *testing.T) { - e := ChannelLink{} - e.Execute() +func estCase1(t *testing.T) { + Execute() } func TestLiveUrl(t *testing.T) { diff --git a/tasks/twitch/twitch_task2/main.go b/tasks/twitch/twitch_task2/main.go index d81b18b..e86b640 100644 --- a/tasks/twitch/twitch_task2/main.go +++ b/tasks/twitch/twitch_task2/main.go @@ -1,6 +1,6 @@ package main func main() { - ul := UserList{} - ul.Execute() + + Execute() } diff --git a/tasks/twitch/twitch_task2/task_twitch.go b/tasks/twitch/twitch_task2/task_twitch.go index 915a969..922d1b1 100644 --- a/tasks/twitch/twitch_task2/task_twitch.go +++ b/tasks/twitch/twitch_task2/task_twitch.go @@ -19,17 +19,12 @@ var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() // 获取类型的所有频道链接 -// UserList 频道链接 -type UserList struct { -} - // Execute 执行任务 -func (cl *UserList) Execute() { +func Execute() { // DELETE FROM source_twitch WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, source FROM source_twitch ) s GROUP BY s.source) ; //article//a[@data-a-target='preview-card-title-link'] wd := intimate.GetChromeDriver(3030) - defer wd.Close() defer wd.Quit() ps := intimate.NewPerfectShutdown() counter := intimate.NewCounter() @@ -174,7 +169,4 @@ func (cl *UserList) Execute() { } counter.AddWithReset(1) } - - wd.Close() - wd.Quit() } diff --git a/tasks/twitch/twitch_task2/task_twitch_test.go b/tasks/twitch/twitch_task2/task_twitch_test.go index ef64976..a5537f5 100644 --- a/tasks/twitch/twitch_task2/task_twitch_test.go +++ b/tasks/twitch/twitch_task2/task_twitch_test.go @@ -2,6 +2,6 @@ package main import "testing" -func TestMain(t *testing.T) { +func estMain(t *testing.T) { main() }