twitch source 非常多, 需要把增量的架构设计好. 修改原来架构.

This commit is contained in:
eson 2020-07-23 18:29:56 +08:00
parent 6d688b8450
commit cbdedb6795
11 changed files with 148 additions and 48 deletions

View File

@ -60,7 +60,7 @@ func (oe *OpenrecExtractor) Execute() {
// log.Println("1000次执行, gc 重新建立sql链接") // log.Println("1000次执行, gc 重新建立sql链接")
// } // }
source, err := sstore.Pop(string(intimate.TTOpenrecUser), 0) source, err := sstore.Pop(intimate.TOpenrecUser, 0)
if err != nil { if err != nil {
if err != lasterr { if err != lasterr {
log.Println(err, lasterr) log.Println(err, lasterr)
@ -78,7 +78,7 @@ func (oe *OpenrecExtractor) Execute() {
streamer := &intimate.Streamer{} streamer := &intimate.Streamer{}
streamer.UserId = userId streamer.UserId = userId
streamer.Platform = string(intimate.Popenrec) streamer.Platform = intimate.Popenrec
htmlUser := datamap["html_user"] htmlUser := datamap["html_user"]
oe.user = intimate.NewExtractorSource(&htmlUser) oe.user = intimate.NewExtractorSource(&htmlUser)

View File

@ -12,9 +12,9 @@ type GetSet struct {
} }
type Streamer struct { type Streamer struct {
Uid int64 // Uid int64 //
Platform string // Platform Platform //
UserId string // UserId string //
UserName sql.NullString // UserName sql.NullString //
LiveUrl sql.NullString // LiveUrl sql.NullString //

View File

@ -19,8 +19,8 @@ type Source struct {
UpdateTime sql.NullTime // UpdateTime sql.NullTime //
ErrorMsg sql.NullString // ErrorMsg sql.NullString //
TargetType string // Target Target //
Operator int32 // Operator int32 //
LastOperator int32 LastOperator int32
} }

View File

@ -12,6 +12,26 @@ CREATE TABLE IF NOT EXISTS `source_openrec` (
`update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据', `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据',
`error_msg` text DEFAULT NULL COMMENT '错误信息', `error_msg` text DEFAULT NULL COMMENT '错误信息',
`target_type` varchar(64) NOT NULL COMMENT '目标类型',
`operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志',
PRIMARY KEY(`uid`),
KEY `operator_idx` (`operator`),
KEY `update_time_idx` (`update_time`),
KEY `target_type_idx` (`target_type`)
);
CREATE TABLE IF NOT EXISTS `source_twitch` (
uid bigint AUTO_INCREMENT COMMENT '自增UID',
`streamer_id` bigint DEFAULT NULL COMMENT 'streamer uid, 关联主播',
`url` text NOT NULL COMMENT '获取源数据地址',
`source` longtext DEFAULT NULL COMMENT '源数据',
`ext` json DEFAULT NULL COMMENT '扩展字段',
`serialize` blob DEFAULT NULL COMMENT '需要给下个任务传递 序列花数据, 非必要不用',
`update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据',
`error_msg` text DEFAULT NULL COMMENT '错误信息',
`target_type` varchar(64) NOT NULL COMMENT '目标类型', `target_type` varchar(64) NOT NULL COMMENT '目标类型',
`operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志',
PRIMARY KEY(`uid`), PRIMARY KEY(`uid`),

View File

@ -79,7 +79,7 @@ func (store *StoreSource) errorAlarm(err error) {
// Insert 插入数据 // Insert 插入数据
func (store *StoreSource) Insert(isource IGet) { func (store *StoreSource) Insert(isource IGet) {
_, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId")) _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("Target"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId"))
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -122,13 +122,13 @@ func (store *StoreSource) Restore(isource IGet) {
} }
// Pop 弹出一条未处理的数据 // Pop 弹出一条未处理的数据
func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, error) { func (store *StoreSource) Pop(targetType Target, operators ...int32) (*Source, error) {
tx, err := store.db.Begin() tx, err := store.db.Begin()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var args = []interface{}{targetType} var args = []interface{}{string(targetType)}
selectSQL := `select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store.table + ` where target_type = ?` selectSQL := `select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store.table + ` where target_type = ?`
if len(operators) == 0 { if len(operators) == 0 {
selectSQL += " and operator = ?" selectSQL += " and operator = ?"
@ -157,7 +157,7 @@ func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, e
s := &Source{} s := &Source{}
// uid, url, target_type, source, ext, operator // uid, url, target_type, source, ext, operator
err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId) err = row.Scan(&s.Uid, &s.Url, &s.Target, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -214,13 +214,13 @@ func NewStoreExtractor() *StoreExtractor {
} }
// Pop 弹出一条未处理的数据 // Pop 弹出一条未处理的数据
func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer, error) { func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Streamer, error) {
tx, err := store.db.Begin() tx, err := store.db.Begin()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var args = []interface{}{platform} var args = []interface{}{string(platform)}
selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` selectSQL := `select uid, update_time, user_id, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval`
if len(operators) == 0 { if len(operators) == 0 {
selectSQL += " and operator = ?" selectSQL += " and operator = ?"

View File

@ -6,5 +6,7 @@ type SourceTable string
const ( const (
// STOpenrec openrec源table名称 // STOpenrec openrec源table名称
STOpenrec SourceTable = "source_openrec" STOpenrec SourceTable = "source_openrec"
)
// STTwitch twitch源table名称
STTwitch SourceTable = "source_twitch"
)

View File

@ -1,12 +1,15 @@
package intimate package intimate
// TargetType 源的 目标类型 列表 // Target 源的 目标类型 列表
type TargetType string type Target string
const ( const (
// TTOpenrecRanking openrec源TargetType名称 // TOpenrecRanking 获取排名 Target名称
TTOpenrecRanking TargetType = "openrec_ranking" TOpenrecRanking Target = "openrec_ranking"
// TTOpenrecUser openrec源TargetType名称 // TOpenrecUser 获取用户列表 源Target名称
TTOpenrecUser TargetType = "openrec_user" TOpenrecUser Target = "openrec_user"
// TTwitchChannel twitch 获取类别操作目标
TTwitchChannel Target = "twitch_channel"
) )

View File

@ -72,8 +72,8 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) {
content := resp.Content() content := resp.Content()
if len(content) <= 200 { // 末页退出 if len(content) <= 200 { // 末页退出
finishpoint := time.Now() finishpoint := time.Now()
log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*60)) log.Println("任务Ranking UserId结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*120))
for time.Now().Sub(finishpoint) < time.Minute*60 { for time.Now().Sub(finishpoint) < time.Minute*120 {
time.Sleep(time.Second) time.Sleep(time.Second)
if atomic.LoadInt32(&loop) > 0 { if atomic.LoadInt32(&loop) > 0 {
return return
@ -102,7 +102,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) {
streamer := &intimate.Streamer{} streamer := &intimate.Streamer{}
streamer.UserId = userid streamer.UserId = userid
streamer.Platform = string(intimate.Popenrec) streamer.Platform = intimate.Popenrec
updateUrl := make(map[string]interface{}) updateUrl := make(map[string]interface{})

View File

@ -51,7 +51,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) {
for atomic.LoadInt32(&loop) > 0 { for atomic.LoadInt32(&loop) > 0 {
streamer, err := estore.Pop(string(intimate.Popenrec)) streamer, err := estore.Pop(intimate.Popenrec)
if streamer == nil || err != nil { if streamer == nil || err != nil {
if err != lasterr { if err != lasterr {
@ -167,7 +167,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) {
streamer.Operator = int32(intimate.OperatorOK) streamer.Operator = int32(intimate.OperatorOK)
source := &intimate.Source{} source := &intimate.Source{}
source.TargetType = string(intimate.TTOpenrecUser) source.Target = intimate.TOpenrecUser
source.Ext = string(extJsonBytes) source.Ext = string(extJsonBytes)
source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true}
sstore.Insert(source) sstore.Insert(source)

View File

@ -1 +1,97 @@
package main package main
import (
"database/sql"
"fmt"
"intimate"
"log"
"time"
"github.com/tebeka/selenium"
"github.com/tebeka/selenium/chrome"
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// 获取类型的所有频道链接
// ChannelLink 频道链接
type ChannelLink struct {
}
// Execute 执行任务
func (cl *ChannelLink) Execute() {
caps := selenium.Capabilities{"browserName": "chrome"}
chromecaps := chrome.Capabilities{}
err := chromecaps.AddExtension("/home/eson/test/ssh-key/0.1.2_0.crx")
if err != nil {
panic(err)
}
chromecaps.Args = append(chromecaps.Args, "--disk-cache-dir=/home/eson/test/ssh-key/cache")
chromecaps.ExcludeSwitches = append(chromecaps.ExcludeSwitches, "enable-automation")
caps.AddChrome(chromecaps)
_, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", 3030)
if err != nil {
panic(err)
}
wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", 3030))
defer func() {
if err := wd.Close(); err != nil {
log.Println(err)
}
}()
wd.ExecuteScript("windows.navigator.webdriver = undefined", nil)
if err != nil {
panic(err)
}
weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT"
err = wd.Get(weburl)
if err != nil {
panic(err)
}
cardCondition := func(wd selenium.WebDriver) (bool, error) {
elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if err != nil {
return false, err
}
return len(elements) > 0, nil
}
wd.WaitWithTimeout(cardCondition, time.Second*30)
time.Sleep(time.Second)
e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
panic(err)
}
e.Click()
for i := 0; i <= 200; i++ {
wd.KeyDown(selenium.EndKey)
time.Sleep(time.Second * 3)
}
elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if err != nil {
panic(err)
}
for _, ele := range elements {
href, err := ele.GetAttribute("href")
if err != nil {
log.Println(err)
}
log.Println(href) // TODO: Save href
source := &intimate.Source{}
source.Source = sql.NullString{String: href, Valid: true}
source.Operator = 0
source.Target = intimate.TTwitchChannel
source.Url = weburl
sstore.Insert(source)
}
}

View File

@ -1,31 +1,10 @@
package main package main
import ( import (
"fmt"
"testing" "testing"
"github.com/tebeka/selenium"
"github.com/tebeka/selenium/chrome"
) )
func TestCase1(t *testing.T) { func TestCase1(t *testing.T) {
caps := selenium.Capabilities{"browserName": "chrome"} e := ChannelLink{}
chromecaps := chrome.Capabilities{} e.Execute()
err := chromecaps.AddExtension("/home/eson/test/ssh-key/0.1.2_0.crx")
if err != nil {
panic(err)
}
caps.AddChrome(chromecaps)
_, err = selenium.NewChromeDriverService("/usr/bin/chromedriver", 3030)
if err != nil {
panic(err)
}
wd, err := selenium.NewRemote(caps, fmt.Sprintf("http://localhost:%d/wd/hub", 3030))
if err != nil {
panic(err)
}
err = wd.Get("https://www.twitch.tv/directory/all")
if err != nil {
panic(err)
}
} }