From 93efe36d0b624e002e3aa0f623e0608ae1639f63 Mon Sep 17 00:00:00 2001 From: eson Date: Thu, 16 Jul 2020 18:31:13 +0800 Subject: [PATCH] =?UTF-8?q?add:=20=E4=BC=98=E9=9B=85=E5=81=9C=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- extractor/openrec/.gitignore | 3 - extractor/openrec/openrec_test.go | 170 ----------------- extractor/openrec_extractor/.gitignore | 4 + .../{openrec => openrec_extractor}/main.go | 0 .../openrec_extractor.go | 171 ++++++++++-------- extractor/openrec_extractor/openrec_test.go | 96 ++++++++++ store.go | 4 +- tasks/openrec/openrec_task1/.gitignore | 3 +- tasks/openrec/openrec_task1/task_openrec.go | 15 +- tasks/openrec/openrec_task2/task_openrec.go | 15 +- 10 files changed, 226 insertions(+), 255 deletions(-) delete mode 100644 extractor/openrec/.gitignore delete mode 100644 extractor/openrec/openrec_test.go create mode 100644 extractor/openrec_extractor/.gitignore rename extractor/{openrec => openrec_extractor}/main.go (100%) rename extractor/{openrec => openrec_extractor}/openrec_extractor.go (90%) create mode 100644 extractor/openrec_extractor/openrec_test.go diff --git a/extractor/openrec/.gitignore b/extractor/openrec/.gitignore deleted file mode 100644 index f09112e..0000000 --- a/extractor/openrec/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.html -screenlog.* -openrec \ No newline at end of file diff --git a/extractor/openrec/openrec_test.go b/extractor/openrec/openrec_test.go deleted file mode 100644 index 8190607..0000000 --- a/extractor/openrec/openrec_test.go +++ /dev/null @@ -1,170 +0,0 @@ -package main - -import ( - "database/sql" - "intimate" - "io/ioutil" - "log" - "os" - "regexp" - "testing" - "time" - - "github.com/lestrrat-go/libxml2" - "github.com/tidwall/gjson" -) - -func TestCase0(t *testing.T) { - f, err := os.Open("./test.html") - if err != nil { - panic(err) - } - data, err := ioutil.ReadAll(f) - if err != nil { - panic(err) - } - - matheslist := regexp.MustCompile(`TagButton__Button[^>]+>(.{1,100})" - - xresult, err := doc.Find("/html/head") - ele, err := doc.CreateElement(`META`) - - if err != nil { - panic(err) - } - ele.SetAttribute("charset", "utf-8") - - if err != nil { - panic(err) - } - - iter := xresult.NodeIter() - if iter.Next() { - n := iter.Node() - - err = n.AddChild(ele) - // childs, err := n.ChildNodes() - if err != nil { - t.Error(err) - } - t.Error(n) - } - - xr, err := doc.Find("//h1[ contains(@class, 'MovieTitle__Title')]") - if err != nil { - panic(nil) - } - - t.Error(xr) -} - -func TestExtractor(t *testing.T) { - collect := intimate.NewExtractorStore() - store := intimate.NewSourceStore("source_openrec") - - for { - source, err := store.Pop(string(intimate.TTOpenrecRanking), 100) - - if err != nil { - log.Println(err) - return - } - - source.SetOperator(int32(intimate.OperatorError)) - anchorId := source.GetSource().String - - ai := &intimate.AnchorInfo{} - ai.SetAnchorId(anchorId) - ai.SetPlatform(string(intimate.Popenrec)) - - sdata := source.GetExt().([]byte) - if gjson.ValidBytes(sdata) { - result := gjson.ParseBytes(sdata) - datamap := result.Map() - - oe := &OpenrecExtractor{} - oe.user = intimate.NewExtractorSource(datamap["user"]) - oe.user.CreateExtractor() - - oe.userLive = intimate.NewExtractorSource(datamap["user_live"]) - oe.userLive.CreateExtractor() - - oe.supporters = intimate.NewExtractorSource(datamap["supporters"]) - - clog := &intimate.CollectLog{} - - oe.extractFollowers(clog) - oe.extractAnchorName(ai) - oe.extractViewsAndLiveStreaming(clog) - oe.extractGiversAndGratuity(clog) - oe.extractLive(clog) - oe.extractTags(clog) - - ai.Set("UpdateTime", source.GetUpdateTime()) - - LiveUrl := "https://www.openrec.tv/live/" + anchorId - ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true}) - - Uid, err := collect.InsertAnchorInfo(ai) - if err != nil { - t.Error(err) - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - store.UpdateOperator(source) - return - } - - clog.Set("Uid", Uid) - clog.Set("Platform", string(intimate.Popenrec)) - clog.Set("AnchorId", anchorId) - clog.Set("UpdateTime", source.GetUpdateTime()) - - if err = collect.InsertCollectLog(clog); err != nil { - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - store.UpdateOperator(source) - return - } - - source.SetOperator(int32(intimate.OperatorExtractorOK)) - store.UpdateOperator(source) - } else { - t.Error("data is not json:\n", string(sdata)) - } - } - -} diff --git a/extractor/openrec_extractor/.gitignore b/extractor/openrec_extractor/.gitignore new file mode 100644 index 0000000..89d3e55 --- /dev/null +++ b/extractor/openrec_extractor/.gitignore @@ -0,0 +1,4 @@ +*.html +log +screenlog.* +openrec_extractor \ No newline at end of file diff --git a/extractor/openrec/main.go b/extractor/openrec_extractor/main.go similarity index 100% rename from extractor/openrec/main.go rename to extractor/openrec_extractor/main.go diff --git a/extractor/openrec/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go similarity index 90% rename from extractor/openrec/openrec_extractor.go rename to extractor/openrec_extractor/openrec_extractor.go index de14fe8..87c4ee5 100644 --- a/extractor/openrec/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -5,9 +5,13 @@ import ( "encoding/json" "intimate" "log" + "os" + "os/signal" "regexp" "strconv" "strings" + "sync/atomic" + "syscall" "time" "github.com/tidwall/gjson" @@ -20,6 +24,96 @@ type OpenrecExtractor struct { supporters *intimate.ExtractorSource } +func (oe *OpenrecExtractor) Execute() { + + var loop int32 = 1 + + go func() { + signalchan := make(chan os.Signal) + signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) + log.Println("accept stop command:", <-signalchan) + atomic.StoreInt32(&loop, 0) + }() + + collect := intimate.NewExtractorStore() + store := intimate.NewSourceStore("source_openrec") + var lasterr error = nil + + for atomic.LoadInt32(&loop) > 0 { + + source, err := store.Pop(string(intimate.TTOpenrecRanking), 100) + if err != nil { + if err != lasterr { + log.Println(err, lasterr) + lasterr = err + } + time.Sleep(time.Second * 2) + continue + } + + source.SetOperator(int32(intimate.OperatorError)) + anchorId := source.GetSource().String + + ai := &intimate.AnchorInfo{} + ai.SetAnchorId(anchorId) + ai.SetPlatform(string(intimate.Popenrec)) + + sdata := source.GetExt().([]byte) + if gjson.ValidBytes(sdata) { + result := gjson.ParseBytes(sdata) + datamap := result.Map() + + oe.user = intimate.NewExtractorSource(datamap["user"]) + oe.user.CreateExtractor() + + oe.userLive = intimate.NewExtractorSource(datamap["user_live"]) + oe.userLive.CreateExtractor() + + oe.supporters = intimate.NewExtractorSource(datamap["supporters"]) + + clog := &intimate.CollectLog{} + + log.Println(anchorId) + + oe.extractFollowers(clog) + oe.extractAnchorName(ai) + oe.extractViewsAndLiveStreaming(clog) + oe.extractGiversAndGratuity(clog) + oe.extractLive(clog) + oe.extractTags(clog) + + ai.Set("UpdateTime", source.GetUpdateTime()) + + LiveUrl := "https://www.openrec.tv/live/" + anchorId + ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true}) + + Uid, err := collect.InsertAnchorInfo(ai) + if err != nil { + log.Println(err) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + store.UpdateOperator(source) + return + } + + clog.Set("Uid", Uid) + clog.Set("Platform", string(intimate.Popenrec)) + clog.Set("AnchorId", anchorId) + clog.Set("UpdateTime", source.GetUpdateTime()) + + if err = collect.InsertCollectLog(clog); err != nil { + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + store.UpdateOperator(source) + return + } + + source.SetOperator(int32(intimate.OperatorExtractorOK)) + store.UpdateOperator(source) + } else { + log.Println("data is not json:\n", string(sdata)) + } + } +} + func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) { extractor := oe.user.GetExtractor() xp, err := extractor.XPathResult("//p[@class='c-global__user__count__row__right js-userCountFollowers']/text()") @@ -136,7 +230,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) { func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) { var tags []string - matheslist := regexp.MustCompile(`]+>(.{1,100})`).FindAllStringSubmatch(oe.userLive.GetSource().Str, -1) + matheslist := regexp.MustCompile(`<[^>]+TagButton[^>]+>([^<]{1,100})<`).FindAllStringSubmatch(oe.userLive.GetSource().Str, -1) for _, m := range matheslist { tags = append(tags, m[1]) } @@ -148,78 +242,3 @@ func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) { clog.Set("Tags", tagsBytes) } - -func (oe *OpenrecExtractor) Execute() { - - collect := intimate.NewExtractorStore() - store := intimate.NewSourceStore("source_openrec") - - for { - source, err := store.Pop(string(intimate.TTOpenrecRanking), 100) - - if err != nil { - log.Println(err) - return - } - - source.SetOperator(int32(intimate.OperatorError)) - anchorId := source.GetSource().String - - ai := &intimate.AnchorInfo{} - ai.SetAnchorId(anchorId) - ai.SetPlatform(string(intimate.Popenrec)) - - sdata := source.GetExt().([]byte) - if gjson.ValidBytes(sdata) { - result := gjson.ParseBytes(sdata) - datamap := result.Map() - - oe.user = intimate.NewExtractorSource(datamap["user"]) - oe.user.CreateExtractor() - - oe.userLive = intimate.NewExtractorSource(datamap["user_live"]) - oe.userLive.CreateExtractor() - - oe.supporters = intimate.NewExtractorSource(datamap["supporters"]) - - clog := &intimate.CollectLog{} - - oe.extractFollowers(clog) - oe.extractAnchorName(ai) - oe.extractViewsAndLiveStreaming(clog) - oe.extractGiversAndGratuity(clog) - oe.extractLive(clog) - oe.extractTags(clog) - - ai.Set("UpdateTime", source.GetUpdateTime()) - - LiveUrl := "https://www.openrec.tv/live/" + anchorId - ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true}) - - Uid, err := collect.InsertAnchorInfo(ai) - if err != nil { - log.Println(err) - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - store.UpdateOperator(source) - return - } - - clog.Set("Uid", Uid) - clog.Set("Platform", string(intimate.Popenrec)) - clog.Set("AnchorId", anchorId) - clog.Set("UpdateTime", source.GetUpdateTime()) - - if err = collect.InsertCollectLog(clog); err != nil { - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - store.UpdateOperator(source) - return - } - - source.SetOperator(int32(intimate.OperatorExtractorOK)) - store.UpdateOperator(source) - } else { - log.Println("data is not json:\n", string(sdata)) - } - } - -} diff --git a/extractor/openrec_extractor/openrec_test.go b/extractor/openrec_extractor/openrec_test.go new file mode 100644 index 0000000..5212f04 --- /dev/null +++ b/extractor/openrec_extractor/openrec_test.go @@ -0,0 +1,96 @@ +package main + +import ( + "io/ioutil" + "os" + "regexp" + "testing" + "time" + + "github.com/lestrrat-go/libxml2" +) + +func TestCase0(t *testing.T) { + f, err := os.Open("./test.html") + if err != nil { + panic(err) + } + data, err := ioutil.ReadAll(f) + if err != nil { + panic(err) + } + + matheslist := regexp.MustCompile(`TagButton__Button[^>]+>(.{1,100})" + + xresult, err := doc.Find("/html/head") + ele, err := doc.CreateElement(`META`) + + if err != nil { + panic(err) + } + ele.SetAttribute("charset", "utf-8") + + if err != nil { + panic(err) + } + + iter := xresult.NodeIter() + if iter.Next() { + n := iter.Node() + + err = n.AddChild(ele) + // childs, err := n.ChildNodes() + if err != nil { + t.Error(err) + } + t.Error(n) + } + + xr, err := doc.Find("//h1[ contains(@class, 'MovieTitle__Title')]") + if err != nil { + panic(nil) + } + + t.Error(xr) +} + +func TestExtractor(t *testing.T) { + oe := &OpenrecExtractor{} + oe.Execute() +} diff --git a/store.go b/store.go index 900975f..56d9cab 100644 --- a/store.go +++ b/store.go @@ -94,7 +94,6 @@ func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSou tx, err := store.db.Begin() if err != nil { - log.Println(err, targetType) return nil, err } var args = []interface{}{targetType} @@ -127,7 +126,6 @@ func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSou // uid, url, target_type, source, ext, operator err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime) if err != nil { - log.Println(err, targetType) return nil, err } s.SetLastOperator(s.Operator) @@ -190,7 +188,7 @@ func (store *ExtractorStore) InsertAnchorInfo(isource IGetAnchorInfo) (Uid int64 log.Println(err) return 0, err } - log.Println(isource.GetPlatform(), isource.GetAnchorId()) + row := tx.QueryRow(selectSQL+` limit 1 for update`, isource.GetPlatform(), isource.GetAnchorId()) var uid int64 diff --git a/tasks/openrec/openrec_task1/.gitignore b/tasks/openrec/openrec_task1/.gitignore index 42d0e6c..adfb476 100644 --- a/tasks/openrec/openrec_task1/.gitignore +++ b/tasks/openrec/openrec_task1/.gitignore @@ -1 +1,2 @@ -openrec_task1 \ No newline at end of file +openrec_task1 +log \ No newline at end of file diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index 7976b96..d59fd41 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -4,7 +4,11 @@ import ( "database/sql" "intimate" "log" + "os" + "os/signal" "strconv" + "sync/atomic" + "syscall" "time" "github.com/474420502/hunter" @@ -42,7 +46,16 @@ type OpenrecRanking struct { // Execute 执行方法 func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { - for { + var loop int32 = 1 + + go func() { + signalchan := make(chan os.Signal) + signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) + log.Println("accept stop command:", <-signalchan) + atomic.StoreInt32(&loop, 0) + }() + + for atomic.LoadInt32(&loop) > 0 { resp, err := cxt.Hunt() if err != nil { diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 286914c..23c570e 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -5,7 +5,11 @@ import ( "encoding/json" "intimate" "log" + "os" + "os/signal" "strconv" + "sync/atomic" + "syscall" "time" "github.com/474420502/gcurl" @@ -31,7 +35,16 @@ type OpenrecExtratorRanking struct { // Execute 执行方法 func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { - for { + var loop int32 = 1 + + go func() { + signalchan := make(chan os.Signal) + signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP) + log.Println("accept stop command:", <-signalchan) + atomic.StoreInt32(&loop, 0) + }() + + for atomic.LoadInt32(&loop) > 0 { source, err := store.Pop(string(intimate.TTOpenrecUser))