From 0bff7169ec6ba9f690f6d4ae008b449ab54cf9a4 Mon Sep 17 00:00:00 2001 From: eson Date: Wed, 5 Aug 2020 18:49:47 +0800 Subject: [PATCH] Extractor upgrade --- extractor_field.go | 29 ++++++++ go.mod | 1 + go.sum | 8 +++ platform_list.go | 3 + sql/intimate_extractor.sql | 19 +++++ store.go | 48 +++++++++++++ table_list.go | 3 + .../twitcasting_task1/main_test.go | 72 ++++++++++++++++--- tasks/twitch/twitch_task2/task_twitch.go | 5 +- utils.go | 7 +- 10 files changed, 180 insertions(+), 15 deletions(-) diff --git a/extractor_field.go b/extractor_field.go index ad5ef30..c091c5b 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -3,6 +3,7 @@ package intimate import ( "database/sql" "reflect" + "time" "github.com/474420502/hunter" "github.com/tidwall/gjson" @@ -11,6 +12,34 @@ import ( type GetSet struct { } +type StreamerList struct { + UrlHash []byte // + Platform Platform // + Url string // + + Label sql.NullString // + + Serialize interface{} + + UpdateInterval int32 + UpdateTime time.Time // + + ErrorMsg sql.NullString + Operator int32 + + LastOperator int32 +} + +// Get Simple Value +func (sl *StreamerList) Get(field string) interface{} { + return reflect.ValueOf(sl).Elem().FieldByName(field).Interface() +} + +// Set Simple Value +func (sl *StreamerList) Set(field string, value interface{}) { + reflect.ValueOf(sl).Elem().FieldByName(field).Set(reflect.ValueOf(value)) +} + type Streamer struct { Uid int64 // Platform Platform // diff --git a/go.mod b/go.mod index d8b824d..0841fb6 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module intimate go 1.14 require ( + github.com/474420502/extractor v0.2.2 github.com/474420502/focus v0.12.0 github.com/474420502/gcurl v0.1.2 github.com/474420502/hunter v0.3.4 diff --git a/go.sum b/go.sum index a556617..61ec03c 100644 --- a/go.sum +++ b/go.sum @@ -2,12 +2,16 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.41.0/go.mod h1:OauMR7DV8fzvZIl2qg6rkaIhD/vmgk4iwEw/h6ercmg= +github.com/474420502/extractor v0.2.2 h1:hGao2iZt5CEI8oqYjQW938osQdHKgNWL/bwRJQNgHTM= +github.com/474420502/extractor v0.2.2/go.mod h1:OVFijdKLDghigpIYISHzlognL5q8eeVenT2fRhCyFns= github.com/474420502/focus v0.12.0 h1:+icbmj7IEOefvTegHt5EpcHt6WFbe2miIrceUJx2Evo= github.com/474420502/focus v0.12.0/go.mod h1:d0PMjtMxFz1a9HIhwyFPkWa+JF+0LgOrEUfd8iZka6s= github.com/474420502/gcurl v0.1.2 h1:ON9Yz3IgAdtDlFlHfkAJ3aIEBDxH0RiViPE5ST5ohKg= github.com/474420502/gcurl v0.1.2/go.mod h1:hws5q/Ao64bXLLDnldz9VyTQUndTWc/i5DzdEazFfoM= github.com/474420502/hunter v0.3.4 h1:fyLAgI84jWe3IcqsISC53j1w3CXI1FERxX//Potns0M= github.com/474420502/hunter v0.3.4/go.mod h1:pe4Xr/I+2agvq339vS/OZV+EiHAWtpXQs75rioSW9oA= +github.com/474420502/libxml2 v0.0.0-20200803084225-29e441d26406 h1:nLvl2D2y+hxCglLnRmLqwRGwmUsXQt8ga46zGySTU1I= +github.com/474420502/libxml2 v0.0.0-20200803084225-29e441d26406/go.mod h1:bUbcte7hFuLijGG6/+gGxurW3XvxE/CBdfAAlsIWj34= github.com/474420502/requests v1.6.0 h1:f4h4j40eT0P5whhg9LdkotD8CaKjtuDu/vz9iSUkCgY= github.com/474420502/requests v1.6.0/go.mod h1:SLXrQ5dL9c7dkIeKNUCBAjOIt3J9KFCS2RQjWJecNwo= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= @@ -66,6 +70,8 @@ github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYe github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tebeka/selenium v0.9.9 h1:cNziB+etNgyH/7KlNI7RMC1ua5aH1+5wUlFQyzeMh+w= github.com/tebeka/selenium v0.9.9/go.mod h1:5Fr8+pUvU6B1OiPfkdCKdXZyr5znvVkxuPd0NOdZCQc= github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= @@ -162,6 +168,8 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a h1:LJwr7TCTghdatWv40WobzlKXc9c4s8oGa7QKJUtHhWA= diff --git a/platform_list.go b/platform_list.go index 0bdefc0..246e991 100644 --- a/platform_list.go +++ b/platform_list.go @@ -9,4 +9,7 @@ const ( // Ptwitch twitch 平台 Ptwitch Platform = "twitch" + + // Ptwitcasting twitcasting 平台 + Ptwitcasting Platform = "twitcasting" ) diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index 91d3e3b..e8ccc1c 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -1,6 +1,25 @@ create database if not exists `intimate_extractor`; use intimate_extractor; +CREATE TABLE IF NOT EXISTS `streamer_list` ( + `urlhash` varchar(32) NOT NULL COMMENT '平台', + `url` text COMMENT 'url获取streamer列表的url', + `platform` varchar(255) NOT NULL COMMENT '平台', + `label` varchar(255) DEFAULT NULL COMMENT '必须的时候打上标签', + `serialize` blob DEFAULT NULL COMMENT '保存进程的必要计算数据', + + `update_interval` int DEFAULT 120 COMMENT '分钟单位, 默认120分钟, 下次更新的时间间隔', + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + + `error_msg` text DEFAULT NULL COMMENT '错误信息', + `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', + + PRIMARY KEY (`urlhash`), + KEY `platform_idx` (`platform`), + KEY `update_time_idx` (`update_time`), + KEY `operator_idx` (`operator`) +) + CREATE TABLE IF NOT EXISTS `streamer` ( `uid` bigint AUTO_INCREMENT COMMENT '自增UID, 便于查询定位', `platform` varchar(255) NOT NULL COMMENT '平台', diff --git a/store.go b/store.go index cbdbbfc..0f34036 100644 --- a/store.go +++ b/store.go @@ -1,8 +1,11 @@ package intimate import ( + "crypto/md5" "database/sql" + "fmt" "log" + "strings" "time" _ "github.com/go-sql-driver/mysql" @@ -182,6 +185,9 @@ const StreamerTable string = "streamer" // CollectLogTable 采集日志表 const CollectLogTable string = "collect_log" +// StreamerListTable 主播表名称 +const StreamerListTable string = "streamer_list" + type StoreExtractor struct { db *sql.DB @@ -267,6 +273,48 @@ func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Stream return s, nil } +// UpdateStreamerList streamerlist表, 更新数据 +func (store *StoreExtractor) UpdateStreamerList(streamer IGet, fieldvalues ...interface{}) { + updateSQL := "UPDATE " + StreamerListTable + " SET " + var values []interface{} + for i := 0; i < len(fieldvalues); i += 2 { + field := fieldvalues[i] + values = append(values, fieldvalues[i+1]) + updateSQL += field.(string) + " = ? " + } + updateSQL += "WHERE urlhash = ?" + values = append(values, streamer.Get("UrlHash")) + _, err := store.db.Exec(updateSQL, values...) + if err != nil { + panic(err) + } +} + +// InsertStreamer streamerlist表, 插入数据 +func (store *StoreExtractor) InsertStreamerList(streamerlist IGet) (isExists bool) { + urlstr := streamerlist.Get("Url").(string) + + _, err := store.db.Exec("insert into streamer_list(urlhash, url, platform, label, serialize, update_interval, error_msg, operator) values(?,?,?,?,?,?,?,?)", + fmt.Sprintf("%x", md5.Sum([]byte(urlstr))), + urlstr, + streamerlist.Get("Platform"), + streamerlist.Get("Label"), + streamerlist.Get("Serialize"), + streamerlist.Get("UpdateInterval"), + streamerlist.Get("ErrorMsg"), + streamerlist.Get("Operator"), + ) + + if err != nil { + if !strings.HasPrefix(err.Error(), "Error 1062") { + log.Println(err) + } + return true + } + + return false +} + // InsertStreamer Streamer表, 插入数据 func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { // select uid from table where platform = ? and user_id = ? diff --git a/table_list.go b/table_list.go index 04002dc..cb12c70 100644 --- a/table_list.go +++ b/table_list.go @@ -9,4 +9,7 @@ const ( // STTwitch twitch源table名称 STTwitch SourceTable = "source_twitch" + + // STTwitcasting STTwitcasting源table名称 + STTwitcasting SourceTable = "source_twitcasting" ) diff --git a/tasks/twitcasting/twitcasting_task1/main_test.go b/tasks/twitcasting/twitcasting_task1/main_test.go index 4003cf7..2408667 100644 --- a/tasks/twitcasting/twitcasting_task1/main_test.go +++ b/tasks/twitcasting/twitcasting_task1/main_test.go @@ -1,6 +1,10 @@ package main import ( + "intimate" + "time" + + "github.com/474420502/extractor" "github.com/474420502/focus/compare" "github.com/474420502/focus/tree/heap" @@ -8,9 +12,14 @@ import ( "testing" "github.com/474420502/requests" - "github.com/lestrrat-go/libxml2" ) +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitcasting)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() + func TestMain(t *testing.T) { searchurl := "https://twitcasting.tv/rankingindex.php" @@ -27,30 +36,73 @@ func TestMain(t *testing.T) { panic(err) } - doc, err := libxml2.ParseHTML(resp.Content()) + etor := extractor.ExtractXml(resp.Content()) + + // doc, err := libxml2.ParseHTML(resp.Content()) + // if err != nil { + // panic(err) + // } + // defer doc.Free() + + result, err := etor.XPath("//*[contains(@class, 'tag')]/@href") if err != nil { panic(err) } - defer doc.Free() - result, err := doc.Find("//*[contains(@class, 'tag')]/@href") - if err != nil { - panic(err) - } - defer result.Free() + + // result, err := doc.Find("//*[contains(@class, 'tag')]/@href") + // if err != nil { + // panic(err) + // } + // defer result.Free() iter := result.NodeIter() for iter.Next() { - log.Println(iter.Node().NodeValue()) wurl := "https://twitcasting.tv" + iter.Node().NodeValue() if ok := queuedict[wurl]; !ok { + log.Println(wurl) + sl := &intimate.StreamerList{} + sl.Platform = intimate.Ptwitcasting + sl.Url = wurl + sl.Operator = 0 + sl.UpdateInterval = 120 + sl.UpdateTime = time.Now() + estore.InsertStreamerList(sl) queue.Put(wurl) queuedict[wurl] = true } } - doc.Find("//div[@class='tw-search-result-row']") + // doc.Find("//div[@class='tw-search-result-row']") + xps, err := etor.XPaths("//div[@class='tw-search-result-row']") + if err != nil { + log.Println(surl, err) + continue + } + + // xps.ForEachTag(SearchProfile{}) + + // texts, errs := xps.ForEachText(".//span[@class='username']") + // if len(errs) > 0 { + // t.Error(errs) + // } + var splist = xps.ForEachTag(SearchProfile{}) + for _, isp := range splist { + sp := isp.(*SearchProfile) + sp.UserId = sp.LiveUrl[1:] + // log.Println(sp.(SearchProfile)) + } + + for _, isp := range splist { + log.Println(isp.(*SearchProfile)) + } log.Println("finish remain", queue.Size()) } } + +type SearchProfile struct { + UserName string `exp:".//span[@class='username']" method:"Text"` + UserId string // `exp:".//span[@class='fullname']" method:"Text"` + LiveUrl string `exp:".//div[@class='usertext']/a[@href]" method:"Attribute,href Value"` +} diff --git a/tasks/twitch/twitch_task2/task_twitch.go b/tasks/twitch/twitch_task2/task_twitch.go index 95899c0..915a969 100644 --- a/tasks/twitch/twitch_task2/task_twitch.go +++ b/tasks/twitch/twitch_task2/task_twitch.go @@ -29,13 +29,16 @@ func (cl *UserList) Execute() { //article//a[@data-a-target='preview-card-title-link'] wd := intimate.GetChromeDriver(3030) + defer wd.Close() defer wd.Quit() ps := intimate.NewPerfectShutdown() counter := intimate.NewCounter() counter.SetMaxLimit(100) counter.SetMaxToDo(func(olist ...interface{}) error { owd := olist[0].(*selenium.WebDriver) - (*owd).Quit() + if err := (*owd).Quit(); err != nil { + log.Println(err) + } *owd = intimate.GetChromeDriver(3030) return nil }, &wd) diff --git a/utils.go b/utils.go index 53734ab..bad4a08 100644 --- a/utils.go +++ b/utils.go @@ -103,13 +103,12 @@ func GetChromeDriver(port int) selenium.WebDriver { panic(err) } runtime.SetFinalizer(wd, func(obj interface{}) { - // if err := obj.(selenium.WebDriver).Close(); err != nil { - // log.Println(err) - // } + if err := obj.(selenium.WebDriver).Close(); err != nil { + log.Println(err) + } if err := obj.(selenium.WebDriver).Quit(); err != nil { log.Println(err) } - }) wd.ExecuteScript("windows.navigator.webdriver = undefined", nil) if err != nil {