diff --git a/.gitignore b/.gitignore index e602687..331f9bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.log +log screenlog.* *.tar *.7z diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..6bdb63f --- /dev/null +++ b/build.sh @@ -0,0 +1,19 @@ + +source_tasks=./tasks/*/* +extractor_tasks=./extractor/* + +src=`pwd` + +for path in `ls -d $source_tasks` +do + echo $path + cd $path && go build + cd $src +done + +for path in `ls -d $extractor_tasks` +do + echo $path + cd $path && go build + cd $src +done diff --git a/config.yaml b/config.yaml index 5a5df4a..1e031f8 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,3 @@ database: - source_uri: "root:@tcp(127.0.0.1:4000)/intimate_source?parseTime=true" - extractor_uri: "root:@tcp(127.0.0.1:4000)/intimate_extractor?parseTime=true" \ No newline at end of file + source_uri: "root:@tcp(127.0.0.1:4000)/intimate_source?parseTime=true&loc=Local" + extractor_uri: "root:@tcp(127.0.0.1:4000)/intimate_extractor?parseTime=true&loc=Local" \ No newline at end of file diff --git a/extractor/openrec_extractor/main.go b/extractor/openrec_extractor/main.go index c0deaf0..736ccfb 100644 --- a/extractor/openrec_extractor/main.go +++ b/extractor/openrec_extractor/main.go @@ -1,5 +1,10 @@ package main +import ( + "net/http" + _ "net/http/pprof" +) + /* `uid` varchar(36) NOT NULL, `platform` varchar(255) NOT NULL, @@ -11,6 +16,11 @@ package main */ func main() { + + go func() { + http.ListenAndServe("0.0.0.0:8899", nil) + }() + oe := &OpenrecExtractor{} oe.Execute() } diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 87c4ee5..838eaba 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -8,6 +8,7 @@ import ( "os" "os/signal" "regexp" + "runtime" "strconv" "strings" "sync/atomic" @@ -35,12 +36,15 @@ func (oe *OpenrecExtractor) Execute() { atomic.StoreInt32(&loop, 0) }() - collect := intimate.NewExtractorStore() + extractorStore := intimate.NewExtractorStore() store := intimate.NewSourceStore("source_openrec") var lasterr error = nil for atomic.LoadInt32(&loop) > 0 { + runtime.GC() + time.Sleep(time.Nanosecond) + source, err := store.Pop(string(intimate.TTOpenrecRanking), 100) if err != nil { if err != lasterr { @@ -51,14 +55,14 @@ func (oe *OpenrecExtractor) Execute() { continue } - source.SetOperator(int32(intimate.OperatorError)) - anchorId := source.GetSource().String + source.Operator = int32(intimate.OperatorError) + userId := source.Source.String - ai := &intimate.AnchorInfo{} - ai.SetAnchorId(anchorId) - ai.SetPlatform(string(intimate.Popenrec)) + streamer := &intimate.Streamer{} + streamer.UserId = userId + streamer.Platform = string(intimate.Popenrec) - sdata := source.GetExt().([]byte) + sdata := source.Ext.([]byte) if gjson.ValidBytes(sdata) { result := gjson.ParseBytes(sdata) datamap := result.Map() @@ -70,43 +74,44 @@ func (oe *OpenrecExtractor) Execute() { oe.userLive.CreateExtractor() oe.supporters = intimate.NewExtractorSource(datamap["supporters"]) - clog := &intimate.CollectLog{} - log.Println(anchorId) + // log.Println(anchorId) oe.extractFollowers(clog) - oe.extractAnchorName(ai) + oe.extractUserName(streamer) oe.extractViewsAndLiveStreaming(clog) oe.extractGiversAndGratuity(clog) oe.extractLive(clog) oe.extractTags(clog) - ai.Set("UpdateTime", source.GetUpdateTime()) + streamer.UpdateTime = source.UpdateTime - LiveUrl := "https://www.openrec.tv/live/" + anchorId - ai.Set("LiveUrl", sql.NullString{String: LiveUrl, Valid: true}) + LiveUrl := "https://www.openrec.tv/live/" + userId + streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} - Uid, err := collect.InsertAnchorInfo(ai) + streamUid, err := extractorStore.InsertStreamer(streamer) if err != nil { log.Println(err) - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} store.UpdateOperator(source) return } - clog.Set("Uid", Uid) - clog.Set("Platform", string(intimate.Popenrec)) - clog.Set("AnchorId", anchorId) - clog.Set("UpdateTime", source.GetUpdateTime()) + clog.StreamerUid = streamUid + clog.Platform = string(intimate.Popenrec) + clog.UserId = userId + clog.UpdateTime = source.UpdateTime - if err = collect.InsertCollectLog(clog); err != nil { - source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + logUid, err := extractorStore.InsertCollectLog(clog) + if err != nil { + source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} store.UpdateOperator(source) return } - source.SetOperator(int32(intimate.OperatorExtractorOK)) + extractorStore.UpdateStreamerLogUid(logUid, streamUid) + source.Operator = int32(intimate.OperatorExtractorOK) store.UpdateOperator(source) } else { log.Println("data is not json:\n", string(sdata)) @@ -133,14 +138,16 @@ func (oe *OpenrecExtractor) extractFollowers(clog intimate.ISet) { clog.Set("Followers", sql.NullInt64{Int64: followersInt, Valid: true}) } -func (oe *OpenrecExtractor) extractAnchorName(ai intimate.ISet) { +func (oe *OpenrecExtractor) extractUserName(ai intimate.ISet) { extractor := oe.user.GetExtractor() xp, err := extractor.XPathResult("//p[@class='c-global__user__profile__list__name__text official-icon--after']/text()") - if xp.NodeIter().Next() { - anchorName := xp.String() - ai.Set("AnchorName", anchorName) - } else { + if err != nil { log.Println(err) + } else { + if xp.NodeIter().Next() { + userName := xp.String() + ai.Set("UserName", userName) + } } } @@ -207,6 +214,7 @@ func (oe *OpenrecExtractor) extractLive(clog intimate.ISet) { if err != nil { log.Println(err) } + log.Println(iter.Node().NodeValue(), tm.Local()) clog.Set("LiveStartTime", sql.NullTime{Time: tm.Local(), Valid: true}) duration, err := extractor.XPathResult("//meta[@itemprop='duration']/@content") @@ -234,7 +242,6 @@ func (oe *OpenrecExtractor) extractTags(clog intimate.ISet) { for _, m := range matheslist { tags = append(tags, m[1]) } - log.Println(tags) tagsBytes, err := json.Marshal(tags) if err != nil { log.Println(err) diff --git a/extractor_field.go b/extractor_field.go index b8ad0ed..e443866 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -8,179 +8,37 @@ import ( "github.com/tidwall/gjson" ) -type ISetAnchorInfo interface { - SetUid(int64) // - SetPlatform(string) // - SetAnchorId(string) // - SetAnchorName(string) // - SetLiveUrl(sql.NullString) // - SetChannel(sql.NullString) // - SetTags(interface{}) // - SetExt(interface{}) // - SetUpdateTime(sql.NullTime) // +type GetSet struct { } -type IGetAnchorInfo interface { - GetUid() int64 // - GetPlatform() string // - GetAnchorId() string // - GetAnchorName() string // - GetLiveUrl() sql.NullString // - GetChannel() sql.NullString // - GetTags() interface{} - GetExt() interface{} // - GetUpdateTime() sql.NullTime // +type Streamer struct { + Uid int64 // + Platform string // + UserId string // + UserName string // + LiveUrl sql.NullString // + Channel sql.NullString // + LatestLogUid int64 + Ext interface{} // + UpdateTime sql.NullTime // } -type AnchorInfo struct { - Uid int64 // - Platform string // - AnchorId string // - AnchorName string // - LiveUrl sql.NullString // - Channel sql.NullString // - Tags interface{} - Ext interface{} // - UpdateTime sql.NullTime // +// Get Simple Value +func (ai *Streamer) Get(field string) interface{} { + return reflect.ValueOf(ai).Elem().FieldByName(field).Interface() } // Set Simple Value -func (ai *AnchorInfo) Set(field string, value interface{}) { +func (ai *Streamer) Set(field string, value interface{}) { reflect.ValueOf(ai).Elem().FieldByName(field).Set(reflect.ValueOf(value)) } -// GetTags Get return Tags interface{} -func (ai *AnchorInfo) GetTags() interface{} { - return ai.Tags -} - -// SetTags Set Tags interface{} -func (ai *AnchorInfo) SetTags(Tags interface{}) { - ai.Tags = Tags -} - -// GetUpdateTime Get return UpdateTime time.Time -func (ai *AnchorInfo) GetUpdateTime() sql.NullTime { - return ai.UpdateTime -} - -// SetUpdateTime Set UpdateTime time.Time -func (ai *AnchorInfo) SetUpdateTime(UpdateTime sql.NullTime) { - ai.UpdateTime = UpdateTime -} - -// GetExt Get return Ext interface{} -func (ai *AnchorInfo) GetExt() interface{} { - return ai.Ext -} - -// SetExt Set Ext interface{} -func (ai *AnchorInfo) SetExt(Ext interface{}) { - ai.Ext = Ext -} - -// GetChannel Get return Channel sql.NullString -func (ai *AnchorInfo) GetChannel() sql.NullString { - return ai.Channel -} - -// SetChannel Set Channel sql.NullString -func (ai *AnchorInfo) SetChannel(Channel sql.NullString) { - ai.Channel = Channel -} - -// GetLiveUrl Get return LiveUrl sql.NullString -func (ai *AnchorInfo) GetLiveUrl() sql.NullString { - return ai.LiveUrl -} - -// SetLiveUrl Set LiveUrl sql.NullString -func (ai *AnchorInfo) SetLiveUrl(LiveUrl sql.NullString) { - ai.LiveUrl = LiveUrl -} - -// GetAnchorName Get return AnchorName string -func (ai *AnchorInfo) GetAnchorName() string { - return ai.AnchorName -} - -// SetAnchorName Set AnchorName string -func (ai *AnchorInfo) SetAnchorName(AnchorName string) { - ai.AnchorName = AnchorName -} - -// GetAnchorId Get return AnchorId string -func (ai *AnchorInfo) GetAnchorId() string { - return ai.AnchorId -} - -// SetAnchorId Set AnchorId string -func (ai *AnchorInfo) SetAnchorId(AnchorId string) { - ai.AnchorId = AnchorId -} - -// GetPlatform Get return Platform string -func (ai *AnchorInfo) GetPlatform() string { - return ai.Platform -} - -// SetPlatform Set Platform string -func (ai *AnchorInfo) SetPlatform(Platform string) { - ai.Platform = Platform -} - -// GetUid Get return Uid int64 -func (ai *AnchorInfo) GetUid() int64 { - return ai.Uid -} - -// SetUid Set Uid int64 -func (ai *AnchorInfo) SetUid(Uid int64) { - ai.Uid = Uid -} - -type IGetCollectLog interface { - GetUid() int64 // - GetPlatform() string // - GetAnchorId() string // - GetIsLiveStreaming() int32 // - GetIsError() int32 // - GetFollowers() sql.NullInt64 // - GetViews() sql.NullInt64 // - GetGiver() interface{} // - GetGratuity() sql.NullInt64 // - GetLiveTitle() sql.NullString // - GetLiveStartTime() sql.NullTime // - GetLiveEndTime() sql.NullTime // - GetUpdateTime() sql.NullTime // - GetTags() interface{} // - GetExt() interface{} // - GetErrorMsg() sql.NullString // -} - -type ISetCollectLog interface { - SetUid(int64) // - SetPlatform(string) // - SetAnchorId(string) // - SetIsLiveStreaming(int32) // - SetIsError(int32) // - SetFollowers(sql.NullInt64) // - SetViews(sql.NullInt64) // - SetGiver(interface{}) // - SetGratuity(sql.NullInt64) // - SetLiveTitle(sql.NullString) // - SetLiveStartTime(sql.NullTime) // - SetLiveEndTime(sql.NullTime) // - SetUpdateTime(sql.NullTime) // - SetTags(interface{}) // - SetExt(interface{}) // - SetErrorMsg(sql.NullString) // -} - type CollectLog struct { - Uid int64 // + LogUid int64 // 日志id + StreamerUid int64 // StreamerId 表id与 + Platform string // - AnchorId string // + UserId string // 平台的UserId IsLiveStreaming int32 // IsError int32 // Followers sql.NullInt64 // @@ -196,171 +54,16 @@ type CollectLog struct { ErrorMsg sql.NullString // } +// Get Simple Value +func (cl *CollectLog) Get(field string) interface{} { + return reflect.ValueOf(cl).Elem().FieldByName(field).Interface() +} + // Set Simple Value func (cl *CollectLog) Set(field string, value interface{}) { reflect.ValueOf(cl).Elem().FieldByName(field).Set(reflect.ValueOf(value)) } -// GetTags Get return Tags interface{} -func (cl *CollectLog) GetTags() interface{} { - return cl.Tags -} - -// SetTags Set Tags interface{} -func (cl *CollectLog) SetTags(Tags interface{}) { - cl.Tags = Tags -} - -// GetErrorMsg Get return Error sql.NullString -func (cl *CollectLog) GetErrorMsg() sql.NullString { - return cl.ErrorMsg -} - -// SetErrorMsg Set Error sql.NullString -func (cl *CollectLog) SetErrorMsg(ErrorMsg sql.NullString) { - cl.ErrorMsg = ErrorMsg -} - -// GetExt Get return Ext interface{} -func (cl *CollectLog) GetExt() interface{} { - return cl.Ext -} - -// SetExt Set Ext interface{} -func (cl *CollectLog) SetExt(Ext interface{}) { - cl.Ext = Ext -} - -// GetUpdateTime Get return UpdateTime time.Time -func (cl *CollectLog) GetUpdateTime() sql.NullTime { - return cl.UpdateTime -} - -// SetUpdateTime Set UpdateTime time.Time -func (cl *CollectLog) SetUpdateTime(UpdateTime sql.NullTime) { - cl.UpdateTime = UpdateTime -} - -// GetLiveEndTime Get return ShowEndTime sql.NullTime -func (cl *CollectLog) GetLiveEndTime() sql.NullTime { - return cl.LiveEndTime -} - -// SetLiveEndTime Set ShowEndTime sql.NullTime -func (cl *CollectLog) SetLiveEndTime(ShowEndTime sql.NullTime) { - cl.LiveEndTime = ShowEndTime -} - -// GetLiveStartTime Get return ShowStartTime sql.NullTime -func (cl *CollectLog) GetLiveStartTime() sql.NullTime { - return cl.LiveStartTime -} - -// SetLiveStartTime Set ShowStartTime sql.NullTime -func (cl *CollectLog) SetLiveStartTime(ShowStartTime sql.NullTime) { - cl.LiveStartTime = ShowStartTime -} - -// GetLiveTitle Get return ShowTitle sql.NullString -func (cl *CollectLog) GetLiveTitle() sql.NullString { - return cl.LiveTitle -} - -// SetLiveTitle Set ShowTitle sql.NullString -func (cl *CollectLog) SetLiveTitle(ShowTitle sql.NullString) { - cl.LiveTitle = ShowTitle -} - -// GetGratuity Get return Gratuity sql.NullInt32 -func (cl *CollectLog) GetGratuity() sql.NullInt64 { - return cl.Gratuity -} - -// SetGratuity Set Gratuity sql.NullInt32 -func (cl *CollectLog) SetGratuity(Gratuity sql.NullInt64) { - cl.Gratuity = Gratuity -} - -// GetGiver Get return Giver interface{} -func (cl *CollectLog) GetGiver() interface{} { - return cl.Giver -} - -// SetGiver Set Giver interface{} -func (cl *CollectLog) SetGiver(Giver interface{}) { - cl.Giver = Giver -} - -// GetViews Get return Views sql.NullInt64 -func (cl *CollectLog) GetViews() sql.NullInt64 { - return cl.Views -} - -// SetViews Set Views sql.NullInt64 -func (cl *CollectLog) SetViews(Views sql.NullInt64) { - cl.Views = Views -} - -// GetFollowers Get return Followers sql.NullInt64 -func (cl *CollectLog) GetFollowers() sql.NullInt64 { - return cl.Followers -} - -// SetFollowers Set Followers sql.NullInt32 -func (cl *CollectLog) SetFollowers(Followers sql.NullInt64) { - cl.Followers = Followers -} - -// GetIsError Get return IsError int32 -func (cl *CollectLog) GetIsError() int32 { - return cl.IsError -} - -// SetIsError Set IsError int32 -func (cl *CollectLog) SetIsError(IsError int32) { - cl.IsError = IsError -} - -// GetIsLiveStreaming Get return IsShowing int32 -func (cl *CollectLog) GetIsLiveStreaming() int32 { - return cl.IsLiveStreaming -} - -// SetIsLiveStreaming Set IsShowing int32 -func (cl *CollectLog) SetIsLiveStreaming(IsLive int32) { - cl.IsLiveStreaming = IsLive -} - -// GetAnchorId Get return AnchorId string -func (cl *CollectLog) GetAnchorId() string { - return cl.AnchorId -} - -// SetAnchorId Set AnchorId string -func (cl *CollectLog) SetAnchorId(AnchorId string) { - cl.AnchorId = AnchorId -} - -// GetPlatform Get return Platform string -func (cl *CollectLog) GetPlatform() string { - return cl.Platform -} - -// SetPlatform Set Platform string -func (cl *CollectLog) SetPlatform(Platform string) { - cl.Platform = Platform -} - -// GetUid Get return Uid int64 -func (cl *CollectLog) GetUid() int64 { - return cl.Uid -} - -// SetUid Set Uid int64 -func (cl *CollectLog) SetUid(Uid int64) { - cl.Uid = Uid -} - type ExtractorSource struct { source gjson.Result extractor *hunter.Extractor diff --git a/source_field.go b/source_field.go index b8d601c..0e1dc32 100644 --- a/source_field.go +++ b/source_field.go @@ -2,33 +2,9 @@ package intimate import ( "database/sql" + "reflect" ) -// IGetSource 源接口结构 -type IGetSource interface { - GetUid() int64 // - GetUrl() string // - GetTargetType() string // - GetSource() sql.NullString // - GetPassGob() sql.NullString // - GetExt() interface{} // - GetUpdateTime() sql.NullTime // - GetOperator() int32 // - GetErrorMsg() sql.NullString // -} - -type IUpdateSource interface { - IGetSource - - GetLastOperator() int32 - - SetPassGob(sql.NullString) - SetExt(ext interface{}) // - SetUpdateTime(ut sql.NullTime) // - SetOperator(operator int32) // - SetErrorMsg(emsg sql.NullString) // -} - // Source 的结构体 type Source struct { Uid int64 // @@ -41,105 +17,15 @@ type Source struct { Operator int32 // ErrorMsg sql.NullString // - lastOperator int32 + LastOperator int32 } -// GetPassGob Get return PassGob sql.NullString -func (so *Source) GetPassGob() sql.NullString { - return so.PassGob +// Get Simple Value +func (so *Source) Get(field string) interface{} { + return reflect.ValueOf(so).Elem().FieldByName(field).Interface() } -// SetPassGob Set PassGob sql.NullString -func (so *Source) SetPassGob(PassGob sql.NullString) { - so.PassGob = PassGob -} - -// GetLastOperator Get return lastOperator int32 -func (so *Source) GetLastOperator() int32 { - return so.lastOperator -} - -// SetLastOperator Set lastOperator int32 -func (so *Source) SetLastOperator(lastOperator int32) { - so.lastOperator = lastOperator -} - -// GetErrorMsg Get return ErrorMsg sql.NullString -func (so *Source) GetErrorMsg() sql.NullString { - return so.ErrorMsg -} - -// SetErrorMsg Set ErrorMsg sql.NullString -func (so *Source) SetErrorMsg(ErrorMsg sql.NullString) { - so.ErrorMsg = ErrorMsg -} - -// GetOperator Get return Operator sql.NullInt32 -func (so *Source) GetOperator() int32 { - return so.Operator -} - -// SetOperator Set Operator sql.NullInt32 -func (so *Source) SetOperator(Operator int32) { - so.Operator = Operator -} - -// GetUpdateTime Get return UpdateTime time.Time -func (so *Source) GetUpdateTime() sql.NullTime { - return so.UpdateTime -} - -// SetUpdateTime Set UpdateTime time.Time -func (so *Source) SetUpdateTime(UpdateTime sql.NullTime) { - so.UpdateTime = UpdateTime -} - -// GetExt Get return Ext interface{} -func (so *Source) GetExt() interface{} { - return so.Ext -} - -// SetExt Set Ext interface{} -func (so *Source) SetExt(Ext interface{}) { - so.Ext = Ext -} - -// GetSource Get return Source sql.NullString -func (so *Source) GetSource() sql.NullString { - return so.Source -} - -// SetSource Set Source sql.NullString -func (so *Source) SetSource(Source sql.NullString) { - so.Source = Source -} - -// GetTargetType Get return TargetType string -func (so *Source) GetTargetType() string { - return so.TargetType -} - -// SetTargetType Set TargetType string -func (so *Source) SetTargetType(TargetType string) { - so.TargetType = TargetType -} - -// GetUrl Get return Url string -func (so *Source) GetUrl() string { - return so.Url -} - -// SetUrl Set Url string -func (so *Source) SetUrl(Url string) { - so.Url = Url -} - -// GetUid Get return Uid int64 -func (so *Source) GetUid() int64 { - return so.Uid -} - -// SetUid Set Uid int64 -func (so *Source) SetUid(Uid int64) { - so.Uid = Uid +// Set Simple Value +func (so *Source) Set(field string, value interface{}) { + reflect.ValueOf(so).Elem().FieldByName(field).Set(reflect.ValueOf(value)) } diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index 9509e24..5767ae4 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -1,29 +1,30 @@ create database if not exists `intimate_extractor`; use intimate_extractor; -CREATE TABLE IF NOT EXISTS `anchor_info` ( +CREATE TABLE IF NOT EXISTS `streamer` ( `uid` bigint AUTO_INCREMENT, `platform` varchar(255) NOT NULL, - `anchor_id` varchar(255) NOT NULL, - `anchor_name` varchar(255) NOT NULL, + `user_id` varchar(255) NOT NULL, + `user_name` varchar(255) NOT NULL, `live_url` text, `channel` varchar(128) DEFAULT NULL, - `tags` json DEFAULT NULL, + `latest_log_uid` bigint, `ext` json DEFAULT NULL, - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`uid`), - UNIQUE KEY `platform_anchor_id_idx` (`platform`, `anchor_id`), + UNIQUE KEY `platform_anchor_id_idx` (`platform`, `user_id`), KEY `platform_idx` (`platform`), - KEY `anchor_id_idx` (`anchor_id`), - KEY `anchor_name_idx` (`anchor_name`), + KEY `user_id_idx` (`user_id`), + KEY `user_name_idx` (`user_name`), KEY `channel_idx` (`channel`), KEY `update_time_idx` (`update_time`) ); CREATE TABLE IF NOT EXISTS `collect_log` ( - `uid` bigint, + `log_uid` bigint AUTO_INCREMENT, + `streamer_uid` bigint, `platform` varchar(255) NOT NULL, - `anchor_id` varchar(255) NOT NULL, + `user_id` varchar(255) NOT NULL, `is_live_streaming` tinyint(1) DEFAULT 0, `is_error` tinyint(1) DEFAULT 0, @@ -34,17 +35,18 @@ CREATE TABLE IF NOT EXISTS `collect_log` ( `gratuity` bigint(11) DEFAULT NULL, `live_title` text DEFAULT NULL, - `live_start_time` timestamp NULL DEFAULT NULL, - `live_end_time` timestamp NULL DEFAULT NULL, - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `live_start_time` Timestamp NULL DEFAULT NULL, + `live_end_time` Timestamp NULL DEFAULT NULL, + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `tags` json DEFAULT NULL, `ext` json DEFAULT NULL, `error_msg` text DEFAULT NULL, - KEY `uid_idx` (`uid`), + PRIMARY KEY (`log_uid`), + KEY `streamer_uid_idx` (`streamer_uid`), KEY `platform_idx` (`platform`), - KEY `anchor_id_idx` (`anchor_id`), + KEY `user_id_idx` (`user_id`), KEY `is_live_streaming_idx` (`is_live_streaming`), KEY `is_error_idx` (`is_error`), KEY `followers_idx` (`followers`), diff --git a/sql/intimate_source.sql b/sql/intimate_source.sql index 8c489a3..c2c6591 100644 --- a/sql/intimate_source.sql +++ b/sql/intimate_source.sql @@ -8,7 +8,7 @@ CREATE TABLE IF NOT EXISTS `source_openrec` ( `source` longtext DEFAULT NULL, `ext` json DEFAULT NULL, `pass_gob` blob DEFAULT NULL, - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `operator` int DEFAULT 0, `error_msg` text DEFAULT NULL, PRIMARY KEY(`uid`), diff --git a/store.go b/store.go index 56d9cab..3e761c1 100644 --- a/store.go +++ b/store.go @@ -3,6 +3,7 @@ package intimate import ( "database/sql" "log" + "os" _ "github.com/go-sql-driver/mysql" ) @@ -25,6 +26,15 @@ type ISet interface { Set(string, interface{}) } +type IGet interface { + Get(string) interface{} +} + +type IGetSet interface { + ISet + IGet +} + // SourceStore 储存 type SourceStore struct { table string @@ -58,39 +68,49 @@ func (store *SourceStore) errorAlarm(err error) { } // Insert 插入数据 -func (store *SourceStore) Insert(isource IGetSource) { - _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) - store.errorAlarm(err) +func (store *SourceStore) Insert(isource IGet) { + _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg")) + if err != nil { + log.Panic(err) + } } // Update 更新数据 -func (store *SourceStore) Update(isource IUpdateSource) { - _, err := store.db.Exec("update "+store.table+" set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?", isource.GetExt(), isource.GetPassGob(), isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) - store.errorAlarm(err) +func (store *SourceStore) Update(isource IGet) { + _, err := store.db.Exec("update "+store.table+" set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?", isource.Get("Ext"), isource.Get("PassGob"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) + if err != nil { + log.Panic(err) + } } // UpdateOperator 更新数据操作标志位 -func (store *SourceStore) UpdateOperator(isource IUpdateSource) { - _, err := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) - store.errorAlarm(err) +func (store *SourceStore) UpdateOperator(isource IGet) { + _, err := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) + if err != nil { + log.Panic(err) + } } // UpdateError 更新错误数据 -func (store *SourceStore) UpdateError(isource IUpdateSource, err error) { - isource.SetOperator(int32(OperatorError)) - isource.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) - _, dberr := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) - store.errorAlarm(dberr) +func (store *SourceStore) UpdateError(isource IGetSet, err error) { + isource.Set("Operator", int32(OperatorError)) + isource.Set("ErrorMsg", sql.NullString{String: err.Error(), Valid: true}) + _, dberr := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) + if dberr != nil { + log.Panic(err) + } } // Restore 恢复Operator数据状态 -func (store *SourceStore) Restore(isource IUpdateSource) { - _, err := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.GetLastOperator(), isource.GetUid()) - store.errorAlarm(err) +func (store *SourceStore) Restore(isource IGet) { + _, err := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.Get("LastOperator"), isource.Get("Uid")) + if err != nil { + log.Panic(err) + } } // Pop 弹出一条未处理的数据 -func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSource, error) { +func (store *SourceStore) Pop(targetType string, operators ...int32) (*Source, error) { tx, err := store.db.Begin() if err != nil { @@ -128,13 +148,13 @@ func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSou if err != nil { return nil, err } - s.SetLastOperator(s.Operator) + s.Set("LastOperator", s.Operator) _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) return s, nil } -// AnchorTable 主播表名称 -const AnchorTable string = "anchor_info" +// StreamerTable 主播表名称 +const StreamerTable string = "streamer" // CollectLogTable 采集日志表 const CollectLogTable string = "collect_log" @@ -164,7 +184,7 @@ func (store *ExtractorStore) errorAlarm(err error) { func NewExtractorStore() *ExtractorStore { db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) if err != nil { - panic(err) + log.Panic(err) } return &ExtractorStore{db: db} } @@ -179,48 +199,90 @@ func NewExtractorStore() *ExtractorStore { `show_type` varchar(255) DEFAULT NULL, */ -// InsertAnchorInfo AnchorInfo表, 插入数据 -func (store *ExtractorStore) InsertAnchorInfo(isource IGetAnchorInfo) (Uid int64, err error) { - // select uid from table where platform = ? and anchor_id = ? - selectSQL := "select uid from " + AnchorTable + " where platform = ? and anchor_id = ?" +// UpdateStreamerLogUid Streamer表, 插入数据 +func (store *ExtractorStore) UpdateStreamerLogUid(logUid, streamerUid int64) error { + _, err := store.db.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, streamerUid) + if err != nil { + log.Panic(err) + } + return err +} + +// InsertStreamer Streamer表, 插入数据 +func (store *ExtractorStore) InsertStreamer(isource IGet) (Uid int64, err error) { + // select uid from table where platform = ? and user_id = ? + selectSQL := "select uid from " + StreamerTable + " where platform = ? and user_id = ?" tx, err := store.db.Begin() if err != nil { log.Println(err) return 0, err } - row := tx.QueryRow(selectSQL+` limit 1 for update`, isource.GetPlatform(), isource.GetAnchorId()) + defer func() { + err = tx.Commit() + if err != nil { + log.Println(err) + err = tx.Rollback() + if err != nil { + log.Println(err) + } + Uid = 0 + } + }() + + row := tx.QueryRow(selectSQL+` limit 1 for update`, isource.Get("Platform"), isource.Get("UserId")) var uid int64 if err = row.Scan(&uid); err == nil { return uid, nil + } else { + log.Println(err) } - result, err := tx.Exec("insert into "+AnchorTable+"(platform, anchor_id, anchor_name, live_url, channel, tags, ext) values(?,?,?,?,?,?,?);", isource.GetPlatform(), isource.GetAnchorId(), isource.GetAnchorName(), isource.GetLiveUrl(), isource.GetChannel(), isource.GetTags(), isource.GetExt()) + result, err := tx.Exec("insert into "+StreamerTable+"(platform, user_id, user_name, live_url, channel, latest_log_uid, ext) values(?,?,?,?,?,?,?);", isource.Get("Platform"), isource.Get("UserId"), isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext")) if err != nil { log.Println(err) return 0, nil } - err = tx.Commit() - if err != nil { - log.Println(err) - err = tx.Rollback() - if err != nil { - log.Println(err) - } - return 0, err - } - return result.LastInsertId() } // InsertCollectLog CollectLog表插入数据 -func (store *ExtractorStore) InsertCollectLog(isource IGetCollectLog) error { - _, err := store.db.Exec("insert into "+CollectLogTable+"(uid, platform, anchor_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - isource.GetUid(), isource.GetPlatform(), isource.GetAnchorId(), isource.GetIsLiveStreaming(), isource.GetIsError(), isource.GetFollowers(), isource.GetViews(), isource.GetGiver(), isource.GetGratuity(), isource.GetLiveTitle(), isource.GetLiveStartTime(), isource.GetLiveEndTime(), isource.GetUpdateTime(), isource.GetTags(), isource.GetExt(), isource.GetErrorMsg(), +func (store *ExtractorStore) InsertCollectLog(isource IGet) (int64, error) { + tx, err := store.db.Begin() + + defer func() { + if err := recover(); err != nil { + log.Println(err) + err = tx.Rollback() + if err != nil { + log.Println(err) + } + os.Exit(0) + } + }() + + if err != nil { + log.Panic(err) + } + + result, err := tx.Exec("insert into "+CollectLogTable+"(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + isource.Get("StreamerUid"), isource.Get("Platform"), isource.Get("UserId"), isource.Get("IsLiveStreaming"), isource.Get("IsError"), isource.Get("Followers"), isource.Get("Views"), isource.Get("Giver"), isource.Get("Gratuity"), isource.Get("LiveTitle"), isource.Get("LiveStartTime"), isource.Get("LiveEndTime"), isource.Get("UpdateTime"), isource.Get("Tags"), isource.Get("Ext"), isource.Get("ErrorMsg"), ) - store.errorAlarm(err) - return err + if err != nil { + log.Panic(err) + } + + logUid, err := result.LastInsertId() + if err != nil { + log.Panic(err) + } + + _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, isource.Get("StreamerUid")) + if err = tx.Commit(); err != nil { + log.Panic(err) + } + return result.LastInsertId() } diff --git a/store_test.go b/store_test.go index e5264ce..7d169ff 100644 --- a/store_test.go +++ b/store_test.go @@ -2,8 +2,6 @@ package intimate import ( "testing" - - "github.com/tidwall/gjson" ) func TestStoreInsert(t *testing.T) { @@ -26,20 +24,20 @@ func TestStoreInsertCase1(t *testing.T) { } func TestStorePopCase1(t *testing.T) { - store := NewSourceStore("source_openrec") - source, err := store.Pop(string(TTOpenrecRanking)) - if err != nil { - t.Error(err) - } - t.Error(source.GetOperator()) - t.Error(gjson.Valid(source.GetSource().String)) - result := gjson.Parse(source.GetSource().String) - if result.IsArray() { - for _, User := range result.Array() { - t.Error(User.Get("channel.id").String()) - } - } else { - t.Error("array error") - } + // store := NewSourceStore("source_openrec") + // source, err := store.Pop(string(TTOpenrecRanking)) + // if err != nil { + // t.Error(err) + // } + // t.Error(source.GetOperator()) + // t.Error(gjson.Valid(source.GetSource().String)) + // result := gjson.Parse(source.GetSource().String) + // if result.IsArray() { + // for _, User := range result.Array() { + // t.Error(User.Get("channel.id").String()) + // } + // } else { + // t.Error("array error") + // } } diff --git a/tasks/openrec/openrec_task1/source_openrec.go b/tasks/openrec/openrec_task1/source_openrec.go deleted file mode 100644 index ca805f1..0000000 --- a/tasks/openrec/openrec_task1/source_openrec.go +++ /dev/null @@ -1,19 +0,0 @@ -package main - -/* -CREATE TABLE `source_openrec` ( - uid bigint AUTO_INCREMENT, - `url` text NOT NULL, - `target_type` varchar(64) NOT NULL, - `source` longtext DEFAULT NULL, - `ext` json DEFAULT NULL, - - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `operator` int DEFAULT 0, - `error_msg` text DEFAULT NULL, - PRIMARY KEY(`uid`), - KEY `operator_idx` (`operator`), - KEY `update_time_idx` (`update_time`), - KEY `target_type_idx` (`target_type`) - ); -*/ diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index d59fd41..08ac70c 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -60,14 +60,23 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { resp, err := cxt.Hunt() if err != nil { log.Println(err) - break + time.Sleep(time.Second * 2) + continue } - wf := cxt.Temporary() + tp := cxt.Temporary() content := resp.Content() - if len(content) <= 200 { - return + if len(content) <= 200 { // 末页退出 + finishpoint := time.Now() + log.Println("任务结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*30)) + for time.Now().Sub(finishpoint) < time.Minute*60 { + time.Sleep(time.Second) + if atomic.LoadInt32(&loop) > 0 { + return + } + } + continue } result := gjson.ParseBytes(content) @@ -76,39 +85,23 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { data := &intimate.Source{} userid := User.Get("channel.id").String() - data.SetSource(sql.NullString{String: userid, Valid: len(userid) > 0}) - data.SetUrl(wf.GetRawURL()) - data.SetTargetType(string(intimate.TTOpenrecUser)) + data.Source = sql.NullString{String: userid, Valid: len(userid) > 0} + data.Url = tp.GetRawURL() + data.TargetType = string(intimate.TTOpenrecUser) store.Insert(data) } } - querys := wf.GetQuery() + querys := tp.GetQuery() page, err := strconv.Atoi(querys.Get("page")) if err != nil { log.Println(err) return } - return + page++ querys.Set("page", strconv.Itoa(page)) - wf.SetQuery(querys) + tp.SetQuery(querys) time.Sleep(time.Second * 2) } } - -// OpenrecUser 获取用户信息 -type OpenrecUser struct { - hunter.PreCurlUrl -} - -// Execute 执行方法 -func (or *OpenrecUser) Execute(cxt *hunter.TaskContext) { - resp, err := cxt.Hunt() - if err != nil { - log.Println(err) - return - } - - resp.Content() -} diff --git a/tasks/openrec/openrec_task1/source_openrec_test.go b/tasks/openrec/openrec_task1/task_openrec_test.go similarity index 100% rename from tasks/openrec/openrec_task1/source_openrec_test.go rename to tasks/openrec/openrec_task1/task_openrec_test.go diff --git a/tasks/openrec/openrec_task2/.gitignore b/tasks/openrec/openrec_task2/.gitignore index 5d0489a..374c872 100644 --- a/tasks/openrec/openrec_task2/.gitignore +++ b/tasks/openrec/openrec_task2/.gitignore @@ -1 +1,2 @@ -openrec_task2 \ No newline at end of file +openrec_task2 +log diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 23c570e..86d6b84 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -44,24 +44,27 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { atomic.StoreInt32(&loop, 0) }() + var lasterr error = nil + for atomic.LoadInt32(&loop) > 0 { source, err := store.Pop(string(intimate.TTOpenrecUser)) if source == nil || err != nil { - log.Println(err) + if err != lasterr { + log.Println(err, lasterr) + lasterr = err + } time.Sleep(time.Second * 2) continue } - userSource := &intimate.Source{} - userid := source.GetSource().String - userUrl := "https://www.openrec.tv/user/" + userid - userSource.SetUrl(userUrl) + userId := source.Source.String + userUrl := "https://www.openrec.tv/user/" + userId - wf := cxt.Session().Get(userUrl) - resp, err := wf.Execute() - source.SetUpdateTime(sql.NullTime{Time: time.Now(), Valid: true}) + tp := cxt.Session().Get(userUrl) + resp, err := tp.Execute() + source.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} if err != nil { log.Println(err) @@ -69,7 +72,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { continue } - cookies := cxt.Session().GetCookies(wf.GetParsedURL()) + cookies := cxt.Session().GetCookies(tp.GetParsedURL()) scurl := "https://www.openrec.tv/viewapp/api/v6/supporters?identify_id=sumomo_xqx&month=&Uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683&Token=46598c320408bd69ae3c63298f6f4a3a97354175&Random=AZVXNAAXQVMOSVWNDPIQ&page_number=1 -H 'accept: application/json, text/javascript, */*; q=0.01' -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36' -H 'cookie: uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683;' --compressed" curl := gcurl.ParseRawCURL(scurl) @@ -99,7 +102,7 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { } } - supportersQuery.Set("identify_id", source.GetSource().String) + supportersQuery.Set("identify_id", userId) temporary.SetQuery(supportersQuery) resp, err := temporary.Execute() @@ -131,8 +134,8 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { ext["supporters"] = supporters ext["user"] = string(resp.Content()) - wf = cxt.Session().Get("https://www.openrec.tv/live/" + userid) - resp, err = wf.Execute() + tp = cxt.Session().Get("https://www.openrec.tv/live/" + userId) + resp, err = tp.Execute() if err != nil { log.Println(err) store.UpdateError(source, err) @@ -147,8 +150,8 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { continue } - source.SetOperator(int32(intimate.OperatorOK)) - source.SetExt(string(extJsonBytes)) + source.Operator = int32(intimate.OperatorOK) + source.Ext = string(extJsonBytes) store.Update(source) }