重构完成

This commit is contained in:
eson 2020-09-09 17:25:36 +08:00
parent 30a6c35980
commit a9bb448351
14 changed files with 47 additions and 249 deletions

View File

@ -11,10 +11,10 @@ import (
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STNimo))
// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STNimo))
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// // estore 解析存储连接实例
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
func main() {
Execute()
@ -35,14 +35,18 @@ func Execute() {
waitfor := intimate.NewWaitFor(wd)
ps := intimate.NewPerfectShutdown()
queue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.PNimo))
for !ps.IsClose() {
streamer, err := estore.Pop(intimate.PNimo)
istreamer, err := queue.Pop()
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
intimate.TStreamer.UpdateError(istreamer, err)
continue
}
streamer := istreamer.(*intimate.Streamer)
wd.Get(streamer.LiveUrl.String)
// wd.Get("https://www.nimo.tv/live/1253835677")
@ -71,22 +75,25 @@ func Execute() {
clog := &intimate.CollectLog{}
clog.Platform = intimate.PNimo
clog.Followers = sql.NullInt64{Int64: li.Followers, Valid: true}
clog.Views = sql.NullInt64{Int64: li.Views, Valid: true}
clog.UpdateTime = utime
clog.Followers = &sql.NullInt64{Int64: li.Followers, Valid: true}
clog.Views = &sql.NullInt64{Int64: li.Views, Valid: true}
clog.UpdateTime = &utime
clog.StreamerUid = streamer.Uid
var sum int64 = 0
for _, v := range li.Gratuity {
sum += v
}
clog.Gratuity = sql.NullInt64{Int64: sum, Valid: true}
clog.Gratuity = &sql.NullInt64{Int64: sum, Valid: true}
cuid := estore.InsertClog(clog)
cuid, err := intimate.TClog.InsertRetAutoID(clog)
if err != nil {
panic(err)
}
streamer.Channel = sql.NullString{String: li.Channel, Valid: true}
streamer.Channel = &sql.NullString{String: li.Channel, Valid: true}
streamer.LatestLogUid = cuid
streamer.UpdateTime = utime
streamer.UpdateTime = &utime
streamer.Operator = 0
switch {
@ -102,7 +109,11 @@ func Execute() {
streamer.UpdateInterval = 60
}
estore.Update(streamer, "update_interval", streamer.UpdateInterval, "operator", streamer.Operator, "channel", streamer.Channel, "latest_log_uid", streamer.LatestLogUid, "update_time", streamer.UpdateTime)
// estore.Update(streamer, "update_interval", streamer.UpdateInterval, "operator", streamer.Operator, "channel", streamer.Channel, "latest_log_uid", streamer.LatestLogUid, "update_time", streamer.UpdateTime)
err = intimate.TStreamer.Update(streamer)
if err != nil {
panic(err)
}
count++
if count >= countlimit {

View File

@ -123,7 +123,6 @@ func main() {
}
streamer := &intimate.Streamer{}
matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href)
if len(matches) == 2 {
mc := matches[1]

View File

@ -1,12 +0,0 @@
[supervisord]
nodaemon=true
[program:openrec_source]
directory = MYPATH/bin/openrec_task2/
command= MYPATH/bin/openrec_task2/openrec_task2
process_name=%(program_name)s_%(process_num)02d ;多进程名称
numprocs=4 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/openrec_task2/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -1,5 +1,5 @@
[supervisord]
nodaemon=false
nodaemon=true
[program:twitch_extractor]
environment=DISPLAY=":99"

View File

@ -1,13 +0,0 @@
[supervisord]
nodaemon=false
[program:twitch_extractor_p2]
environment=DISPLAY=":99",pac_proxy=http://localhost:1090/pac1
directory = MYPATH/bin/twitch_extractor
command= MYPATH/bin/twitch_extractor/twitch_extractor
process_name=%(program_name)s_%(process_num)02d ;多进程名称
numprocs=2 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/twitch_extractor/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -0,0 +1,13 @@
[supervisord]
nodaemon=false
[program:twitch_task1]
environment=DISPLAY=":99"
directory = MYPATH/bin/twitch_task1
command= MYPATH/bin/twitch_task1/twitch_task1
# process_name=%(program_name)s_%(process_num)02d ;多进程名称
# numprocs=1 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/twitch_task1/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -1,13 +0,0 @@
[supervisord]
nodaemon=false
[program:twitch_task2]
environment=DISPLAY=":99"
directory = MYPATH/bin/twitch_task2
command= MYPATH/bin/twitch_task2/twitch_task2
process_name=%(program_name)s_%(process_num)02d ;多进程名称
numprocs=6 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/twitch_task2/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -12,7 +12,7 @@ import (
)
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// Execute 执行
func Execute() {
@ -95,7 +95,10 @@ func Execute() {
}
streamer.UpdateInterval = 120
estore.InsertStreamer(streamer)
err = intimate.TStreamer.Insert(streamer)
if err != nil {
panic(err)
}
} else {
log.Println("userid is null.", room.String())

View File

@ -70,11 +70,11 @@ func Execute() {
if ok := queuedict[wurl]; !ok {
log.Println(wurl)
sl := &intimate.StreamerList{}
sl.Platform = intimate.Ptwitcasting
sl.Platform = string(intimate.Ptwitcasting)
sl.Url = wurl
sl.Operator = 0
sl.UpdateInterval = 120
sl.UpdateTime = time.Now()
sl.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
sl.UrlHash = intimate.GetUrlHash(sl.Url)
intimate.TStreamerList.Insert(sl)
@ -104,11 +104,11 @@ func Execute() {
sp.TagUrl[i] = wurl
if ok := queuedict[wurl]; !ok {
sl := &intimate.StreamerList{}
sl.Platform = intimate.Ptwitcasting
sl.Platform = string(intimate.Ptwitcasting)
sl.Url = wurl
sl.Operator = 0
sl.UpdateInterval = 120
sl.UpdateTime = time.Now()
sl.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
sl.UrlHash = intimate.GetUrlHash(sl.Url)
intimate.TStreamerList.Insert(sl)

View File

@ -1,2 +0,0 @@
twitch_task2
log

View File

@ -1,6 +0,0 @@
package main
func main() {
Execute()
}

View File

@ -1,175 +0,0 @@
package main
import (
"database/sql"
"encoding/json"
"intimate"
"log"
"regexp"
"time"
"github.com/tebeka/selenium"
)
// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// // estore 解析存储连接实例
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// 获取类型的所有频道链接
// Execute 执行任务
func Execute() {
// DELETE FROM source_twitch WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, source FROM source_twitch ) s GROUP BY s.source) ;
//article//a[@data-a-target='preview-card-title-link']
wd := intimate.GetChromeDriver(3030)
defer wd.Quit()
ps := intimate.NewPerfectShutdown()
counter := intimate.NewCounter()
counter.SetMaxLimit(100)
counter.SetMaxToDo(func(olist ...interface{}) error {
owd := olist[0].(*selenium.WebDriver)
if err := (*owd).Quit(); err != nil {
log.Println(err)
}
*owd = intimate.GetChromeDriver(3030)
return nil
}, &wd)
for !ps.IsClose() {
var err error
sourceChannel, err := sstore.Pop(intimate.TTwitchChannel)
if err != nil {
panic(err)
}
weburl := sourceChannel.Source.String + "?sort=VIEWER_COUNT"
err = wd.Get(weburl)
if err != nil {
log.Println(err)
sstore.UpdateError(sourceChannel, err)
time.Sleep(time.Second * 10)
continue
}
wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) {
_, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]")
if err != nil {
return false, err
}
return true, nil
}, time.Second*10)
btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
log.Println(err)
continue
}
btn.Click()
var elements []selenium.WebElement
var liveurls = 0
var delayerror = 2
for i := 0; i < 200 && !ps.IsClose(); i++ {
elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]")
if err != nil {
log.Println(err)
break
}
time.Sleep(time.Millisecond * 200)
wd.KeyDown(selenium.EndKey)
time.Sleep(time.Millisecond * 200)
wd.KeyUp(selenium.EndKey)
time.Sleep(time.Millisecond * 2000)
if len(elements) == liveurls {
delayerror--
if delayerror <= 0 {
break
}
} else {
delayerror = 2
}
liveurls = len(elements)
}
articles, err := wd.FindElements(selenium.ByXPATH, "//article")
if err != nil {
log.Println(err)
continue
}
for _, article := range articles {
e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]")
if err != nil {
log.Println(err)
continue
}
href, err := e.GetAttribute("href")
if err != nil {
log.Println(err)
continue
}
btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button")
if err != nil {
log.Println(err)
continue
}
var tags []string
for _, btn := range btns {
tag, err := btn.GetAttribute("data-a-target")
if err == nil {
tags = append(tags, tag)
}
}
streamer := &intimate.Streamer{}
matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href)
if len(matches) == 2 {
mc := matches[1]
streamer.UserId = &mc
} else {
log.Println(href)
continue
}
jtags, err := json.Marshal(tags)
if err != nil {
log.Println(err)
} else {
streamer.Tags = jtags
}
streamer.Platform = intimate.Ptwitch
updateUrl := make(map[string]string)
updateUrl["live"] = href
streamer.LiveUrl = &sql.NullString{String: href, Valid: true}
data, err := json.Marshal(updateUrl)
if err != nil {
log.Println(err)
continue
}
streamer.UpdateUrl = data
streamer.Operator = 0
if estore.InsertStreamer(streamer) {
// log.Println("streamer update tags", streamer.Uid, tags)
if streamer.Tags != nil {
estore.Update(streamer, "Tags", streamer.Tags)
}
}
}
log.Println("streamer find", len(articles))
if len(articles) == 0 {
sourceChannel.Operator = 5
sstore.UpdateOperator(sourceChannel)
}
counter.AddWithReset(1)
}
}

View File

@ -1,7 +0,0 @@
package main
import "testing"
func TestMain(t *testing.T) {
main()
}