diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 838eaba..1e87fdd 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -36,8 +36,8 @@ func (oe *OpenrecExtractor) Execute() { atomic.StoreInt32(&loop, 0) }() - extractorStore := intimate.NewExtractorStore() - store := intimate.NewSourceStore("source_openrec") + extractorStore := intimate.NewStoreExtractor() + store := intimate.NewStoreSource("source_openrec") var lasterr error = nil for atomic.LoadInt32(&loop) > 0 { @@ -90,7 +90,7 @@ func (oe *OpenrecExtractor) Execute() { LiveUrl := "https://www.openrec.tv/live/" + userId streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} - streamUid, err := extractorStore.InsertStreamer(streamer) + streamUid, err := extractorStore.UpdateStreamer(streamer) if err != nil { log.Println(err) source.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} @@ -110,7 +110,7 @@ func (oe *OpenrecExtractor) Execute() { return } - extractorStore.UpdateStreamerLogUid(logUid, streamUid) + extractorStore.UpdateStreamerLog(logUid, streamUid) source.Operator = int32(intimate.OperatorExtractorOK) store.UpdateOperator(source) } else { diff --git a/extractor_field.go b/extractor_field.go index e443866..76ae0b7 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -12,15 +12,26 @@ type GetSet struct { } type Streamer struct { - Uid int64 // - Platform string // - UserId string // - UserName string // - LiveUrl sql.NullString // - Channel sql.NullString // - LatestLogUid int64 - Ext interface{} // - UpdateTime sql.NullTime // + Uid int64 // + Platform string // + UserId string // + + UserName sql.NullString // + LiveUrl sql.NullString // + Channel sql.NullString // + Ext interface{} // + + IsUpdateStreamer bool // 更新上面的内容 + IsUpdateUrl bool + updateInterval int32 + UpdateUrl interface{} + LatestLogUid int64 + UpdateTime sql.NullTime // + + ErrorMsg sql.NullString + Operator int32 + + LastOperator int32 } // Get Simple Value @@ -39,8 +50,8 @@ type CollectLog struct { Platform string // UserId string // 平台的UserId - IsLiveStreaming int32 // - IsError int32 // + IsLiveStreaming bool // + IsError bool // Followers sql.NullInt64 // Views sql.NullInt64 // Giver interface{} // diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index 5767ae4..1c351cc 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -2,46 +2,56 @@ create database if not exists `intimate_extractor`; use intimate_extractor; CREATE TABLE IF NOT EXISTS `streamer` ( - `uid` bigint AUTO_INCREMENT, - `platform` varchar(255) NOT NULL, - `user_id` varchar(255) NOT NULL, - `user_name` varchar(255) NOT NULL, - `live_url` text, - `channel` varchar(128) DEFAULT NULL, - `latest_log_uid` bigint, - `ext` json DEFAULT NULL, - `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `uid` bigint AUTO_INCREMENT COMMENT '自增UID, 便于查询定位', + `platform` varchar(255) NOT NULL COMMENT '平台', + `user_id` varchar(255) NOT NULL COMMENT '用户唯一UID', + `user_name` varchar(255) DEFAULT NULL COMMENT '用户名字 区别于ID', + `live_url` text COMMENT '直播的url', + `channel` varchar(128) DEFAULT NULL COMMENT'所属 频道,分类 未必所有平台都有明确的标签', + `ext` json DEFAULT NULL COMMENT '扩展类型, 把一些可能需要但是没字段的数据放在json扩展', + + `is_update_streamer` tinyint(1) DEFAULT 0 COMMENT '是否需要持续更新streamer的信息. 1为需要,0则否', + `is_update_url` tinyint(1) DEFAULT 0 COMMENT '是否需要持续更新update_url. 1为需要,0则否', + `update_url` json NOT NULL COMMENT '更新数据的url, 如直播url, profile url等', + `update_interval` int DEFAULT 30 COMMENT '分钟单位, 默认30分钟, 下次更新的时间间隔', + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + `latest_log_uid` bigint COMMENT '最新更新的日志表的uid, 方便关联', + + `error_msg` text DEFAULT NULL COMMENT '错误信息', + `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', + PRIMARY KEY (`uid`), UNIQUE KEY `platform_anchor_id_idx` (`platform`, `user_id`), KEY `platform_idx` (`platform`), KEY `user_id_idx` (`user_id`), KEY `user_name_idx` (`user_name`), KEY `channel_idx` (`channel`), - KEY `update_time_idx` (`update_time`) + KEY `update_time_idx` (`update_time`), + KEY `operator_idx` (`operator`) ); CREATE TABLE IF NOT EXISTS `collect_log` ( - `log_uid` bigint AUTO_INCREMENT, - `streamer_uid` bigint, - `platform` varchar(255) NOT NULL, - `user_id` varchar(255) NOT NULL, + `log_uid` bigint AUTO_INCREMENT COMMENT '日志自增UID', + `streamer_uid` bigint COMMENT '对应streamer表的UID', + `platform` varchar(255) NOT NULL COMMENT '平台名称, 方便于搜索日志分类', + `user_id` varchar(255) NOT NULL COMMENT '用户UID', - `is_live_streaming` tinyint(1) DEFAULT 0, - `is_error` tinyint(1) DEFAULT 0, + `is_live_streaming` tinyint(1) DEFAULT 0 COMMENT '是否正在直播', + `is_error` tinyint(1) DEFAULT 0 COMMENT '是否采集数据的时候出错, 便于定位错误', - `followers` bigint(11) DEFAULT NULL, - `views` bigint(11) DEFAULT NULL, - `giver` json DEFAULT NULL, - `gratuity` bigint(11) DEFAULT NULL, + `followers` bigint(11) DEFAULT NULL COMMENT '关注数', + `views` bigint(11) DEFAULT NULL COMMENT '当前直播的观众', + `giver` json DEFAULT NULL COMMENT '打赏礼物者,和一些金额数据等, 数据类型异于平台', + `gratuity` bigint(11) DEFAULT NULL COMMENT '打赏值的总值, 数据类型异于平台', - `live_title` text DEFAULT NULL, - `live_start_time` Timestamp NULL DEFAULT NULL, - `live_end_time` Timestamp NULL DEFAULT NULL, - `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `tags` json DEFAULT NULL, - `ext` json DEFAULT NULL, + `live_title` text DEFAULT NULL COMMENT '直播标题', + `live_start_time` Timestamp NULL DEFAULT NULL COMMENT '直播开始时间', + `live_end_time` Timestamp NULL DEFAULT NULL COMMENT '直播结束时间', + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据日志更新时间', + `tags` json DEFAULT NULL COMMENT '主播直播的类型标签, 非永久固定', + `ext` json DEFAULT NULL COMMENT '扩展字段, 用于一些数据不存在的字段, 便于记录扩展', - `error_msg` text DEFAULT NULL, + `error_msg` text DEFAULT NULL COMMENT '错误信息', PRIMARY KEY (`log_uid`), KEY `streamer_uid_idx` (`streamer_uid`), diff --git a/sql/intimate_source.sql b/sql/intimate_source.sql index c2c6591..3ed3ffa 100644 --- a/sql/intimate_source.sql +++ b/sql/intimate_source.sql @@ -2,15 +2,17 @@ create database if not exists `intimate_source`; use intimate_source; CREATE TABLE IF NOT EXISTS `source_openrec` ( - uid bigint AUTO_INCREMENT, - `url` text NOT NULL, - `target_type` varchar(64) NOT NULL, - `source` longtext DEFAULT NULL, - `ext` json DEFAULT NULL, - `pass_gob` blob DEFAULT NULL, - `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `operator` int DEFAULT 0, - `error_msg` text DEFAULT NULL, + uid bigint AUTO_INCREMENT COMMENT '自增UID', + + `url` text NOT NULL COMMENT '获取源数据地址', + `source` longtext DEFAULT NULL COMMENT '源数据', + `ext` json DEFAULT NULL COMMENT '扩展字段', + `pass_gob` blob DEFAULT NULL COMMENT '需要给下个任务传递gob 序列花数据, 非必要不用', + `update_time` Timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新数据', + `error_msg` text DEFAULT NULL COMMENT '错误信息', + + `target_type` varchar(64) NOT NULL COMMENT '目标类型', + `operator` int DEFAULT 0 COMMENT '操作标志位, 根据不同解析方法有不同标志', PRIMARY KEY(`uid`), KEY `operator_idx` (`operator`), KEY `update_time_idx` (`update_time`), diff --git a/store.go b/store.go index 3e761c1..1245f90 100644 --- a/store.go +++ b/store.go @@ -4,6 +4,7 @@ import ( "database/sql" "log" "os" + "time" _ "github.com/go-sql-driver/mysql" ) @@ -36,7 +37,7 @@ type IGetSet interface { } // SourceStore 储存 -type SourceStore struct { +type StoreSource struct { table string db *sql.DB errorCount int @@ -44,15 +45,15 @@ type SourceStore struct { } // NewSourceStore 创建一个存储实例 -func NewSourceStore(table string) *SourceStore { +func NewStoreSource(table string) *StoreSource { db, err := sql.Open("mysql", InitConfig.Database.SourceURI) if err != nil { panic(err) } - return &SourceStore{table: table, db: db} + return &StoreSource{table: table, db: db} } -func (store *SourceStore) errorAlarm(err error) { +func (store *StoreSource) errorAlarm(err error) { if err != nil { log.Println("store error: ", err) // 报警. 如果数据插入有问题 @@ -68,49 +69,51 @@ func (store *SourceStore) errorAlarm(err error) { } // Insert 插入数据 -func (store *SourceStore) Insert(isource IGet) { +func (store *StoreSource) Insert(isource IGet) { _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg")) if err != nil { - log.Panic(err) + panic(err) } } // Update 更新数据 -func (store *SourceStore) Update(isource IGet) { +func (store *StoreSource) Update(isource IGet) { _, err := store.db.Exec("update "+store.table+" set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?", isource.Get("Ext"), isource.Get("PassGob"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) if err != nil { - log.Panic(err) + panic(err) } } // UpdateOperator 更新数据操作标志位 -func (store *SourceStore) UpdateOperator(isource IGet) { +func (store *StoreSource) UpdateOperator(isource IGet) { _, err := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) if err != nil { - log.Panic(err) + panic(err) } } // UpdateError 更新错误数据 -func (store *SourceStore) UpdateError(isource IGetSet, err error) { - isource.Set("Operator", int32(OperatorError)) +func (store *StoreSource) UpdateError(isource IGetSet, err error) { + isource.Set("Operator", int32(OperatorError)+isource.Get("Operator").(int32)) isource.Set("ErrorMsg", sql.NullString{String: err.Error(), Valid: true}) _, dberr := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) if dberr != nil { - log.Panic(err) + // email tell owner to deal with + panic(err) } } // Restore 恢复Operator数据状态 -func (store *SourceStore) Restore(isource IGet) { - _, err := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.Get("LastOperator"), isource.Get("Uid")) - if err != nil { - log.Panic(err) +func (store *StoreSource) Restore(isource IGet) { + _, dberr := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.Get("LastOperator"), isource.Get("Uid")) + if dberr != nil { + // email tell owner to deal with + panic(dberr) } } // Pop 弹出一条未处理的数据 -func (store *SourceStore) Pop(targetType string, operators ...int32) (*Source, error) { +func (store *StoreSource) Pop(targetType string, operators ...int32) (*Source, error) { tx, err := store.db.Begin() if err != nil { @@ -159,16 +162,16 @@ const StreamerTable string = "streamer" // CollectLogTable 采集日志表 const CollectLogTable string = "collect_log" -type ExtractorStore struct { +type StoreExtractor struct { db *sql.DB errorCount int errorLimit int } -func (store *ExtractorStore) errorAlarm(err error) { +func (store *StoreExtractor) errorAlarm(err error) { if err != nil { - log.Panic("store error: ", err) + log.Println("store error: ", err) // 报警. 如果数据插入有问题 store.errorCount++ if store.errorCount >= store.errorLimit { @@ -181,41 +184,129 @@ func (store *ExtractorStore) errorAlarm(err error) { } } -func NewExtractorStore() *ExtractorStore { +// NewStoreExtractor 生成一个extractor库的相关链接 +func NewStoreExtractor() *StoreExtractor { db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) if err != nil { - log.Panic(err) + panic(err) } - return &ExtractorStore{db: db} + return &StoreExtractor{db: db} } -/* - `uid` bigint, - `platform` varchar(255) NOT NULL, - `anchor_id` varchar(255) NOT NULL, - `anchor_name` varchar(255) NOT NULL, - `live_url` text, - `channel` varchar(128) DEFAULT NULL, - `show_type` varchar(255) DEFAULT NULL, -*/ +// Pop 弹出一条未处理的数据 +func (store *StoreExtractor) Pop(platform string, operators ...int32) (*Streamer, error) { -// UpdateStreamerLogUid Streamer表, 插入数据 -func (store *ExtractorStore) UpdateStreamerLogUid(logUid, streamerUid int64) error { - _, err := store.db.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, streamerUid) + tx, err := store.db.Begin() if err != nil { - log.Panic(err) + return nil, err } - return err + var args = []interface{}{platform} + selectSQL := `select uid, update_time, update_url, is_update_streamer from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` + if len(operators) == 0 { + selectSQL += " and operator = ?" + args = append(args, 0) + } else { + for _, operator := range operators { + selectSQL += " and operator = ?" + args = append(args, operator) + } + } + + defer func() { + err := tx.Commit() + if err != nil { + log.Println(err) + err = tx.Rollback() + if err != nil { + log.Println(err) + } + } + }() + + // log.Println(selectSQL + ` limit 1 for update`) + row := tx.QueryRow(selectSQL+` limit 1 for update`, args...) + + s := &Streamer{} + // uid, url, target_type, source, ext, operator + err = row.Scan(&s.Uid, &s.UpdateTime, &s.UpdateUrl, &s.IsUpdateStreamer) + if err != nil { + return nil, err + } + s.Set("LastOperator", s.Operator) + _, err = tx.Exec("update "+StreamerTable+" set operator = ? where uid = ?", OperatorWait, s.Uid) + return s, nil } // InsertStreamer Streamer表, 插入数据 -func (store *ExtractorStore) InsertStreamer(isource IGet) (Uid int64, err error) { +func (store *StoreExtractor) InsertStreamer(streamer IGet) (isExists bool) { // select uid from table where platform = ? and user_id = ? - selectSQL := "select uid from " + StreamerTable + " where platform = ? and user_id = ?" + selectSQL := "SELECT is_update_url FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" tx, err := store.db.Begin() if err != nil { + panic(err) + } + + defer func() { + err = tx.Commit() + if err != nil { + rerr := tx.Rollback() + if rerr != nil { + log.Println(rerr) + } + panic(err) + } + }() + + row := tx.QueryRow(selectSQL+` LIMIT 1 FOR UPDATE`, streamer.Get("Platform"), streamer.Get("UserId")) + var isUpdateUrl int + if err = row.Scan(&isUpdateUrl); err == nil { + if isUpdateUrl == 1 { + tx.Exec("UPDATE "+StreamerTable+" SET update_url = ?", streamer.Get("UpdateUrl")) + } + return true + } else { log.Println(err) - return 0, err + } + + _, err = tx.Exec("INSERT INTO "+StreamerTable+"(platform, user_id, update_url, update_time) VALUES(?,?,?,?);", streamer.Get("Platform"), streamer.Get("UserId"), streamer.Get("UpdateUrl"), time.Now().Add(-time.Minute*30)) + if err != nil { + panic(err) + } + return false +} + +// UpdateError 更新错误数据 +func (store *StoreExtractor) UpdateError(isource IGetSet, err error) { + isource.Set("Operator", int32(OperatorError)+isource.Get("Operator").(int32)) + isource.Set("ErrorMsg", sql.NullString{String: err.Error(), Valid: true}) + _, dberr := store.db.Exec("update "+StreamerTable+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) + if dberr != nil { + // email tell owner to deal with + panic(err) + } +} + +// UpdateStreamerLog 只更新Streamer的关联日志和时间戳 +func (store *StoreExtractor) UpdateStreamerLog(latestUid int64, streamerUid int64) { + _, err := store.db.Exec("UPDATE "+StreamerTable+" SET latest_log_uid = ?, update_time = CURRENT_TIMESTAMP() WHERE uid = ?", latestUid, streamerUid) + if err != nil { + panic(err) + } +} + +// UpdateOperator Streamer表, 插入数据 +func (store *StoreExtractor) UpdateOperator(isource IGet) { + _, err := store.db.Exec("update "+StreamerTable+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) + if err != nil { + panic(err) + } +} + +// UpdateStreamer Streamer表, 插入数据 +func (store *StoreExtractor) UpdateStreamer(isource IGet) { + tx, err := store.db.Begin() + if err != nil { + panic(err) } defer func() { @@ -224,33 +315,20 @@ func (store *ExtractorStore) InsertStreamer(isource IGet) (Uid int64, err error) log.Println(err) err = tx.Rollback() if err != nil { - log.Println(err) + panic(err) } - Uid = 0 + os.Exit(0) } }() - row := tx.QueryRow(selectSQL+` limit 1 for update`, isource.Get("Platform"), isource.Get("UserId")) - - var uid int64 - if err = row.Scan(&uid); err == nil { - return uid, nil - } else { - log.Println(err) - } - - result, err := tx.Exec("insert into "+StreamerTable+"(platform, user_id, user_name, live_url, channel, latest_log_uid, ext) values(?,?,?,?,?,?,?);", isource.Get("Platform"), isource.Get("UserId"), isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext")) - + _, err = tx.Exec("UPDATE "+StreamerTable+" SET platform = ?, user_id = ?, user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, ext = ?,update_time = ?, update_url = ?", isource.Get("Platform"), isource.Get("UserId"), isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext"), isource.Get("UpdateTime"), isource.Get("UpdateUrl")) if err != nil { - log.Println(err) - return 0, nil + panic(err) } - - return result.LastInsertId() } // InsertCollectLog CollectLog表插入数据 -func (store *ExtractorStore) InsertCollectLog(isource IGet) (int64, error) { +func (store *StoreExtractor) InsertCollectLog(isource IGet) (int64, error) { tx, err := store.db.Begin() defer func() { @@ -265,24 +343,24 @@ func (store *ExtractorStore) InsertCollectLog(isource IGet) (int64, error) { }() if err != nil { - log.Panic(err) + panic(err) } result, err := tx.Exec("insert into "+CollectLogTable+"(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", isource.Get("StreamerUid"), isource.Get("Platform"), isource.Get("UserId"), isource.Get("IsLiveStreaming"), isource.Get("IsError"), isource.Get("Followers"), isource.Get("Views"), isource.Get("Giver"), isource.Get("Gratuity"), isource.Get("LiveTitle"), isource.Get("LiveStartTime"), isource.Get("LiveEndTime"), isource.Get("UpdateTime"), isource.Get("Tags"), isource.Get("Ext"), isource.Get("ErrorMsg"), ) if err != nil { - log.Panic(err) + panic(err) } logUid, err := result.LastInsertId() if err != nil { - log.Panic(err) + panic(err) } _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, isource.Get("StreamerUid")) if err = tx.Commit(); err != nil { - log.Panic(err) + panic(err) } return result.LastInsertId() } diff --git a/target_type_list.go b/target_type_list.go index fcd013c..8869ec3 100644 --- a/target_type_list.go +++ b/target_type_list.go @@ -8,5 +8,5 @@ const ( TTOpenrecRanking TargetType = "openrec_ranking" // TTOpenrecUser openrec源TargetType名称 - TTOpenrecUser TargetType = "openrec_ranking" + TTOpenrecUser TargetType = "openrec_user" ) diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index 08ac70c..c9b6678 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -1,7 +1,7 @@ package main import ( - "database/sql" + "encoding/json" "intimate" "log" "os" @@ -17,8 +17,11 @@ import ( var openrecRanking *OpenrecRanking -// store 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var store *intimate.SourceStore = intimate.NewSourceStore(string(intimate.STOpenrec)) +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) + +// estore 解析存储连接实例 +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func init() { @@ -69,26 +72,52 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { content := resp.Content() if len(content) <= 200 { // 末页退出 finishpoint := time.Now() - log.Println("任务结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*30)) + log.Println("任务结束休眠, 下次启动时间:", finishpoint.Add(time.Minute*60)) for time.Now().Sub(finishpoint) < time.Minute*60 { time.Sleep(time.Second) if atomic.LoadInt32(&loop) > 0 { return } } + + querys := tp.GetQuery() + querys.Set("page", strconv.Itoa(1)) + tp.SetQuery(querys) + continue } result := gjson.ParseBytes(content) if result.IsArray() { for _, User := range result.Array() { - data := &intimate.Source{} + userid := User.Get("channel.id").String() - data.Source = sql.NullString{String: userid, Valid: len(userid) > 0} - data.Url = tp.GetRawURL() - data.TargetType = string(intimate.TTOpenrecUser) - store.Insert(data) + // data := &intimate.Source{} + // data.Source = sql.NullString{String: userid, Valid: len(userid) > 0} + // data.Url = tp.GetRawURL() + // data.TargetType = string(intimate.TTOpenrecUser) + // sstore.Insert(data) + + streamer := &intimate.Streamer{} + streamer.UserId = userid + streamer.Platform = string(intimate.Popenrec) + + updateUrl := make(map[string]interface{}) + + supportersUrl := "https://www.openrec.tv/viewapp/api/v6/supporters?identify_id=sumomo_xqx&month=&Uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683&Token=46598c320408bd69ae3c63298f6f4a3a97354175&Random=AZVXNAAXQVMOSVWNDPIQ&page_number=1 -H 'accept: application/json, text/javascript, */*; q=0.01' -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36' -H 'cookie: uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683;' --compressed" + updateUrl["supporters"] = supportersUrl + updateUrl["user"] = "https://www.openrec.tv/user/" + userid + updateUrl["live"] = "https://www.openrec.tv/live/" + userid + + updateUrlBytes, err := json.Marshal(updateUrl) + if err != nil { + estore.UpdateError(streamer, err) + continue + } + + streamer.UpdateUrl = updateUrlBytes + estore.InsertStreamer(streamer) } } @@ -102,6 +131,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { page++ querys.Set("page", strconv.Itoa(page)) tp.SetQuery(querys) - time.Sleep(time.Second * 2) + + time.Sleep(time.Second * 1) } } diff --git a/tasks/openrec/openrec_task1/task_openrec_test.go b/tasks/openrec/openrec_task1/task_openrec_test.go index b21c538..7190d66 100644 --- a/tasks/openrec/openrec_task1/task_openrec_test.go +++ b/tasks/openrec/openrec_task1/task_openrec_test.go @@ -2,6 +2,7 @@ package main import ( "testing" + "time" "github.com/tidwall/gjson" @@ -56,6 +57,12 @@ func TestRanking(t *testing.T) { ht.Execute() } +func TestTimeAdd(t *testing.T) { + finishpoint := time.Now() + time.Sleep(time.Second * 2) + t.Error(time.Now().Sub(finishpoint) > time.Second*1) +} + func TestRankingInsert(t *testing.T) { ht := hunter.NewHunter(openrecRanking) ht.Execute() diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 86d6b84..faca2c0 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -20,8 +20,11 @@ import ( var oer *OpenrecExtratorRanking -// store 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var store *intimate.SourceStore = intimate.NewSourceStore(string(intimate.STOpenrec)) +// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) + +// estore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_extractor.sql +var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() func init() { oer = &OpenrecExtratorRanking{} @@ -48,9 +51,9 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { for atomic.LoadInt32(&loop) > 0 { - source, err := store.Pop(string(intimate.TTOpenrecUser)) + streamer, err := estore.Pop(string(intimate.Popenrec)) - if source == nil || err != nil { + if streamer == nil || err != nil { if err != lasterr { log.Println(err, lasterr) lasterr = err @@ -59,22 +62,31 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { continue } - userId := source.Source.String - userUrl := "https://www.openrec.tv/user/" + userId + userId := streamer.UserId + var updateUrl map[string]string + + err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) + if err != nil { + log.Println(err) + continue + } + // Check Userid + + userUrl := updateUrl["user"] tp := cxt.Session().Get(userUrl) resp, err := tp.Execute() - source.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} + streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} if err != nil { log.Println(err) - store.UpdateError(source, err) + estore.UpdateError(streamer, err) continue } cookies := cxt.Session().GetCookies(tp.GetParsedURL()) - scurl := "https://www.openrec.tv/viewapp/api/v6/supporters?identify_id=sumomo_xqx&month=&Uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683&Token=46598c320408bd69ae3c63298f6f4a3a97354175&Random=AZVXNAAXQVMOSVWNDPIQ&page_number=1 -H 'accept: application/json, text/javascript, */*; q=0.01' -H 'user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36' -H 'cookie: uuid=B96EE988-E3A2-4A44-A543-611A8B4BC683;' --compressed" + scurl := updateUrl["supporters"] curl := gcurl.ParseRawCURL(scurl) supportersSession := curl.CreateSession() @@ -134,25 +146,31 @@ func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { ext["supporters"] = supporters ext["user"] = string(resp.Content()) - tp = cxt.Session().Get("https://www.openrec.tv/live/" + userId) + liveUrl := updateUrl["live"] + tp = cxt.Session().Get(liveUrl) resp, err = tp.Execute() if err != nil { log.Println(err) - store.UpdateError(source, err) + estore.UpdateError(streamer, err) continue } - ext["user_live"] = string(resp.Content()) + ext["live"] = string(resp.Content()) extJsonBytes, err := json.Marshal(ext) if err != nil { log.Println(err) - store.UpdateError(source, err) + estore.UpdateError(streamer, err) continue } - source.Operator = int32(intimate.OperatorOK) + streamer.Operator = int32(intimate.OperatorOK) + + source := &intimate.Source{} + source.TargetType = string(intimate.TTOpenrecUser) source.Ext = string(extJsonBytes) - store.Update(source) + sstore.Insert(source) + + estore.UpdateOperator(streamer) } }