add: 优雅停机
This commit is contained in:
parent
fe25921f9a
commit
93efe36d0b
3
extractor/openrec/.gitignore
vendored
3
extractor/openrec/.gitignore
vendored
|
@ -1,3 +0,0 @@
|
|||
*.html
|
||||
screenlog.*
|
||||
openrec
|
|
@ -1,170 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"intimate"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/lestrrat-go/libxml2"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func TestCase0(t *testing.T) {
|
||||
f, err := os.Open("./test.html")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
data, err := ioutil.ReadAll(f)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
matheslist := regexp.MustCompile(`TagButton__Button[^>]+>(.{1,100})</a`).FindAllStringSubmatch(string(data), -1)
|
||||
t.Error(matheslist)
|
||||
}
|
||||
|
||||
func TestCase1(t *testing.T) {
|
||||
date := "2020-07-13T18:58:24+09:00"
|
||||
|
||||
tm, err := time.Parse("2006-01-02T15:04:05Z07:00", date)
|
||||
t.Error(err)
|
||||
t.Error(time.Now())
|
||||
t.Error(tm.Local().UTC(), tm.Local())
|
||||
|
||||
}
|
||||
|
||||
func TestCase2(t *testing.T) {
|
||||
duration1 := "0:00:00"
|
||||
duration2 := "4:56:04"
|
||||
tm2, err := time.Parse("15:04:05", duration2)
|
||||
tm1, err := time.Parse("15:04:05", duration1)
|
||||
|
||||
tm2.Sub(tm1)
|
||||
|
||||
t.Error(err)
|
||||
t.Error(tm2.Sub(tm1))
|
||||
|
||||
}
|
||||
|
||||
func TestCase(t *testing.T) {
|
||||
f, _ := os.Open("./test.html")
|
||||
data, _ := ioutil.ReadAll(f)
|
||||
|
||||
doc, err := libxml2.ParseHTML(data)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// doc.CreateElement("meta")
|
||||
// "<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=utf-8">"
|
||||
|
||||
xresult, err := doc.Find("/html/head")
|
||||
ele, err := doc.CreateElement(`META`)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ele.SetAttribute("charset", "utf-8")
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
iter := xresult.NodeIter()
|
||||
if iter.Next() {
|
||||
n := iter.Node()
|
||||
|
||||
err = n.AddChild(ele)
|
||||
// childs, err := n.ChildNodes()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
t.Error(n)
|
||||
}
|
||||
|
||||
xr, err := doc.Find("//h1[ contains(@class, 'MovieTitle__Title')]")
|
||||
if err != nil {
|
||||
panic(nil)
|
||||
}
|
||||
|
||||
t.Error(xr)
|
||||
}
|
||||
|
||||
func TestExtractor(t *testing.T) {
|
||||
collect := intimate.NewExtractorStore()
|
||||
store := intimate.NewSourceStore("source_openrec")
|
||||
|
||||
for {
|
||||
source, err := store.Pop(string(intimate.TTOpenrecRanking), 100)
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorError))
|
||||
anchorId := source.GetSource().String
|
||||
|
||||
ai := &intimate.AnchorInfo{}
|
||||
ai.SetAnchorId(anchorId)
|
||||
ai.SetPlatform(string(intimate.Popenrec))
|
||||
|
||||
sdata := source.GetExt().([]byte)
|
||||
if gjson.ValidBytes(sdata) {
|
||||
result := gjson.ParseBytes(sdata)
|
||||
datamap := result.Map()
|
||||
|
||||
oe := &OpenrecExtractor{}
|
||||
oe.user = intimate.NewExtractorSource(datamap["user"])
|
||||
oe.user.CreateExtractor()
|
||||
|
||||
oe.userLive = intimate.NewExtractorSource(datamap["user_live"])
|
||||
oe.userLive.CreateExtractor()
|
||||
|
||||
oe.supporters = intimate.NewExtractorSource(datamap["supporters"])
|
||||
|
||||
clog := &intimate.CollectLog{}
|
||||
|
||||
oe.extractFollowers(clog)
|
||||
oe.extractAnchorName(ai)
|
||||
oe.extractViewsAndLiveStreaming(clog)
|
||||
oe.extractGiversAndGratuity(clog)
|
||||
oe.extractLive(clog)
|
||||
oe.extractTags(clog)
|
||||
|
||||
ai.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
LiveUrl := "https://www.openrec.tv/live/" + anchorId
|
||||
ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true})
|
||||
|
||||
Uid, err := collect.InsertAnchorInfo(ai)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
clog.Set("Uid", Uid)
|
||||
clog.Set("Platform", string(intimate.Popenrec))
|
||||
clog.Set("AnchorId", anchorId)
|
||||
clog.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
if err = collect.InsertCollectLog(clog); err != nil {
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorExtractorOK))
|
||||
store.UpdateOperator(source)
|
||||
} else {
|
||||
t.Error("data is not json:\n", string(sdata))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
4
extractor/openrec_extractor/.gitignore
vendored
Normal file
4
extractor/openrec_extractor/.gitignore
vendored
Normal file
|
@ -0,0 +1,4 @@
|
|||
*.html
|
||||
log
|
||||
screenlog.*
|
||||
openrec_extractor
|
|
@ -5,9 +5,13 @@ import (
|
|||
"encoding/json"
|
||||
"intimate"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
|
@ -20,6 +24,96 @@ type OpenrecExtractor struct {
|
|||
supporters *intimate.ExtractorSource
|
||||
}
|
||||
|
||||
func (oe *OpenrecExtractor) Execute() {
|
||||
|
||||
var loop int32 = 1
|
||||
|
||||
go func() {
|
||||
signalchan := make(chan os.Signal)
|
||||
signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP)
|
||||
log.Println("accept stop command:", <-signalchan)
|
||||
atomic.StoreInt32(&loop, 0)
|
||||
}()
|
||||
|
||||
collect := intimate.NewExtractorStore()
|
||||
store := intimate.NewSourceStore("source_openrec")
|
||||
var lasterr error = nil
|
||||
|
||||
for atomic.LoadInt32(&loop) > 0 {
|
||||
|
||||
source, err := store.Pop(string(intimate.TTOpenrecRanking), 100)
|
||||
if err != nil {
|
||||
if err != lasterr {
|
||||
log.Println(err, lasterr)
|
||||
lasterr = err
|
||||
}
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorError))
|
||||
anchorId := source.GetSource().String
|
||||
|
||||
ai := &intimate.AnchorInfo{}
|
||||
ai.SetAnchorId(anchorId)
|
||||
ai.SetPlatform(string(intimate.Popenrec))
|
||||
|
||||
sdata := source.GetExt().([]byte)
|
||||
if gjson.ValidBytes(sdata) {
|
||||
result := gjson.ParseBytes(sdata)
|
||||
datamap := result.Map()
|
||||
|
||||
oe.user = intimate.NewExtractorSource(datamap["user"])
|
||||
oe.user.CreateExtractor()
|
||||
|
||||
oe.userLive = intimate.NewExtractorSource(datamap["user_live"])
|
||||
oe.userLive.CreateExtractor()
|
||||
|
||||
oe.supporters = intimate.NewExtractorSource(datamap["supporters"])
|
||||
|
||||
clog := &intimate.CollectLog{}
|
||||
|
||||
log.Println(anchorId)
|
||||
|
||||
oe.extractFollowers(clog)
|
||||
oe.extractAnchorName(ai)
|
||||
oe.extractViewsAndLiveStreaming(clog)
|
||||
oe.extractGiversAndGratuity(clog)
|
||||
oe.extractLive(clog)
|
||||
oe.extractTags(clog)
|
||||
|
||||
ai.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
LiveUrl := "https://www.openrec.tv/live/" + anchorId
|
||||
ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true})
|
||||
|
||||
Uid, err := collect.InsertAnchorInfo(ai)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
clog.Set("Uid", Uid)
|
||||
clog.Set("Platform", string(intimate.Popenrec))
|
||||
clog.Set("AnchorId", anchorId)
|
||||
clog.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
if err = collect.InsertCollectLog(clog); err != nil {
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorExtractorOK))
|
||||
store.UpdateOperator(source)
|
||||
} else {
|
||||
log.Println("data is not json:\n", string(sdata))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) {
|
||||
extractor := oe.user.GetExtractor()
|
||||
xp, err := extractor.XPathResult("//p[@class='c-global__user__count__row__right js-userCountFollowers']/text()")
|
||||
|
@ -136,7 +230,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) {
|
|||
|
||||
func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) {
|
||||
var tags []string
|
||||
matheslist := regexp.MustCompile(`<a.+TagButton__Button[^>]+>(.{1,100})</a>`).FindAllStringSubmatch(oe.userLive.GetSource().Str, -1)
|
||||
matheslist := regexp.MustCompile(`<[^>]+TagButton[^>]+>([^<]{1,100})<`).FindAllStringSubmatch(oe.userLive.GetSource().Str, -1)
|
||||
for _, m := range matheslist {
|
||||
tags = append(tags, m[1])
|
||||
}
|
||||
|
@ -148,78 +242,3 @@ func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) {
|
|||
|
||||
clog.Set("Tags", tagsBytes)
|
||||
}
|
||||
|
||||
func (oe *OpenrecExtractor) Execute() {
|
||||
|
||||
collect := intimate.NewExtractorStore()
|
||||
store := intimate.NewSourceStore("source_openrec")
|
||||
|
||||
for {
|
||||
source, err := store.Pop(string(intimate.TTOpenrecRanking), 100)
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorError))
|
||||
anchorId := source.GetSource().String
|
||||
|
||||
ai := &intimate.AnchorInfo{}
|
||||
ai.SetAnchorId(anchorId)
|
||||
ai.SetPlatform(string(intimate.Popenrec))
|
||||
|
||||
sdata := source.GetExt().([]byte)
|
||||
if gjson.ValidBytes(sdata) {
|
||||
result := gjson.ParseBytes(sdata)
|
||||
datamap := result.Map()
|
||||
|
||||
oe.user = intimate.NewExtractorSource(datamap["user"])
|
||||
oe.user.CreateExtractor()
|
||||
|
||||
oe.userLive = intimate.NewExtractorSource(datamap["user_live"])
|
||||
oe.userLive.CreateExtractor()
|
||||
|
||||
oe.supporters = intimate.NewExtractorSource(datamap["supporters"])
|
||||
|
||||
clog := &intimate.CollectLog{}
|
||||
|
||||
oe.extractFollowers(clog)
|
||||
oe.extractAnchorName(ai)
|
||||
oe.extractViewsAndLiveStreaming(clog)
|
||||
oe.extractGiversAndGratuity(clog)
|
||||
oe.extractLive(clog)
|
||||
oe.extractTags(clog)
|
||||
|
||||
ai.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
LiveUrl := "https://www.openrec.tv/live/" + anchorId
|
||||
ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true})
|
||||
|
||||
Uid, err := collect.InsertAnchorInfo(ai)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
clog.Set("Uid", Uid)
|
||||
clog.Set("Platform", string(intimate.Popenrec))
|
||||
clog.Set("AnchorId", anchorId)
|
||||
clog.Set("UpdateTime", source.GetUpdateTime())
|
||||
|
||||
if err = collect.InsertCollectLog(clog); err != nil {
|
||||
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
|
||||
store.UpdateOperator(source)
|
||||
return
|
||||
}
|
||||
|
||||
source.SetOperator(int32(intimate.OperatorExtractorOK))
|
||||
store.UpdateOperator(source)
|
||||
} else {
|
||||
log.Println("data is not json:\n", string(sdata))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
96
extractor/openrec_extractor/openrec_test.go
Normal file
96
extractor/openrec_extractor/openrec_test.go
Normal file
|
@ -0,0 +1,96 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/lestrrat-go/libxml2"
|
||||
)
|
||||
|
||||
func TestCase0(t *testing.T) {
|
||||
f, err := os.Open("./test.html")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
data, err := ioutil.ReadAll(f)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
matheslist := regexp.MustCompile(`TagButton__Button[^>]+>(.{1,100})</a`).FindAllStringSubmatch(string(data), -1)
|
||||
t.Error(matheslist)
|
||||
}
|
||||
|
||||
func TestCase1(t *testing.T) {
|
||||
date := "2020-07-13T18:58:24+09:00"
|
||||
|
||||
tm, err := time.Parse("2006-01-02T15:04:05Z07:00", date)
|
||||
t.Error(err)
|
||||
t.Error(time.Now())
|
||||
t.Error(tm.Local().UTC(), tm.Local())
|
||||
|
||||
}
|
||||
|
||||
func TestCase2(t *testing.T) {
|
||||
duration1 := "0:00:00"
|
||||
duration2 := "4:56:04"
|
||||
tm2, err := time.Parse("15:04:05", duration2)
|
||||
tm1, err := time.Parse("15:04:05", duration1)
|
||||
|
||||
tm2.Sub(tm1)
|
||||
|
||||
t.Error(err)
|
||||
t.Error(tm2.Sub(tm1))
|
||||
|
||||
}
|
||||
|
||||
func TestCase(t *testing.T) {
|
||||
f, _ := os.Open("./test.html")
|
||||
data, _ := ioutil.ReadAll(f)
|
||||
|
||||
doc, err := libxml2.ParseHTML(data)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// doc.CreateElement("meta")
|
||||
// "<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=utf-8">"
|
||||
|
||||
xresult, err := doc.Find("/html/head")
|
||||
ele, err := doc.CreateElement(`META`)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ele.SetAttribute("charset", "utf-8")
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
iter := xresult.NodeIter()
|
||||
if iter.Next() {
|
||||
n := iter.Node()
|
||||
|
||||
err = n.AddChild(ele)
|
||||
// childs, err := n.ChildNodes()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
t.Error(n)
|
||||
}
|
||||
|
||||
xr, err := doc.Find("//h1[ contains(@class, 'MovieTitle__Title')]")
|
||||
if err != nil {
|
||||
panic(nil)
|
||||
}
|
||||
|
||||
t.Error(xr)
|
||||
}
|
||||
|
||||
func TestExtractor(t *testing.T) {
|
||||
oe := &OpenrecExtractor{}
|
||||
oe.Execute()
|
||||
}
|
4
store.go
4
store.go
|
@ -94,7 +94,6 @@ func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSou
|
|||
|
||||
tx, err := store.db.Begin()
|
||||
if err != nil {
|
||||
log.Println(err, targetType)
|
||||
return nil, err
|
||||
}
|
||||
var args = []interface{}{targetType}
|
||||
|
@ -127,7 +126,6 @@ func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSou
|
|||
// uid, url, target_type, source, ext, operator
|
||||
err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime)
|
||||
if err != nil {
|
||||
log.Println(err, targetType)
|
||||
return nil, err
|
||||
}
|
||||
s.SetLastOperator(s.Operator)
|
||||
|
@ -190,7 +188,7 @@ func (store *ExtractorStore) InsertAnchorInfo(isource IGetAnchorInfo) (Uid int64
|
|||
log.Println(err)
|
||||
return 0, err
|
||||
}
|
||||
log.Println(isource.GetPlatform(), isource.GetAnchorId())
|
||||
|
||||
row := tx.QueryRow(selectSQL+` limit 1 for update`, isource.GetPlatform(), isource.GetAnchorId())
|
||||
|
||||
var uid int64
|
||||
|
|
1
tasks/openrec/openrec_task1/.gitignore
vendored
1
tasks/openrec/openrec_task1/.gitignore
vendored
|
@ -1 +1,2 @@
|
|||
openrec_task1
|
||||
log
|
|
@ -4,7 +4,11 @@ import (
|
|||
"database/sql"
|
||||
"intimate"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/474420502/hunter"
|
||||
|
@ -42,7 +46,16 @@ type OpenrecRanking struct {
|
|||
// Execute 执行方法
|
||||
func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) {
|
||||
|
||||
for {
|
||||
var loop int32 = 1
|
||||
|
||||
go func() {
|
||||
signalchan := make(chan os.Signal)
|
||||
signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP)
|
||||
log.Println("accept stop command:", <-signalchan)
|
||||
atomic.StoreInt32(&loop, 0)
|
||||
}()
|
||||
|
||||
for atomic.LoadInt32(&loop) > 0 {
|
||||
|
||||
resp, err := cxt.Hunt()
|
||||
if err != nil {
|
||||
|
|
|
@ -5,7 +5,11 @@ import (
|
|||
"encoding/json"
|
||||
"intimate"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/474420502/gcurl"
|
||||
|
@ -31,7 +35,16 @@ type OpenrecExtratorRanking struct {
|
|||
// Execute 执行方法
|
||||
func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) {
|
||||
|
||||
for {
|
||||
var loop int32 = 1
|
||||
|
||||
go func() {
|
||||
signalchan := make(chan os.Signal)
|
||||
signal.Notify(signalchan, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP)
|
||||
log.Println("accept stop command:", <-signalchan)
|
||||
atomic.StoreInt32(&loop, 0)
|
||||
}()
|
||||
|
||||
for atomic.LoadInt32(&loop) > 0 {
|
||||
|
||||
source, err := store.Pop(string(intimate.TTOpenrecUser))
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user