diff --git a/extractor/mirrativ_extractor/mirrativ_extractor.go b/extractor/mirrativ_extractor/mirrativ_extractor.go index 33015b3..023386a 100644 --- a/extractor/mirrativ_extractor/mirrativ_extractor.go +++ b/extractor/mirrativ_extractor/mirrativ_extractor.go @@ -88,6 +88,10 @@ func main() { clog.Views = &sql.NullInt64{Int64: result.Int(), Valid: true} } + if result := livejson.Get("max_online_viewer_num"); result.Exists() { + clog.PCU = &sql.NullInt64{Int64: result.Int(), Valid: true} + } + if result := livejson.Get("title"); result.Exists() { clog.LiveTitle = &sql.NullString{String: result.String(), Valid: true} } diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index 245c961..c40513c 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -18,7 +18,7 @@ import ( type UserInfo struct { UserName string `exp:"//p[ contains(@class, 'c-global__user__profile__list__name__text')]"` Followers int64 `exp:"//p[@class='c-global__user__count__row__right js-userCountFollowers']" mth:"r:ParseNumber"` - Views int64 `exp:"//ul[@class='c-contents']//p[@class='c-thumbnailVideo__footer__liveCount']" mth:"r:ExtractNumber"` + PCU int64 `exp:"//ul[@class='c-contents']//p[@class='c-thumbnailVideo__footer__liveCount']" mth:"r:ExtractNumber"` } //UserLive 提取信息的结构体 @@ -27,6 +27,7 @@ type UserLive struct { LiveStartTime string `exp:"//meta[@itemprop='uploadDate']/@content"` LiveEndTime string `exp:"//meta[@itemprop='duration']/@content"` Tags []string `exp:"//div[contains(@class,'MovieMetaContent__TagContainer')]//a[@role ='button']"` + Views int64 `exp:"//meta[@itemprop='interactionCount']/@content"` } // Execute 执行 @@ -173,10 +174,7 @@ func Extractor(streamer *intimate.Streamer, userId string, htmlUser, htmlLive st if ok1 { clog.Followers = &sql.NullInt64{Int64: ui.Followers, Valid: true} - clog.Views = &sql.NullInt64{Int64: ui.Views, Valid: true} - if ui.Views != 0 { - clog.IsLiveStreaming = true - } + clog.PCU = &sql.NullInt64{Int64: ui.PCU, Valid: true} streamer.UserName = &sql.NullString{String: ui.UserName, Valid: true} // giverjson := jsonSupporters @@ -208,6 +206,7 @@ func Extractor(streamer *intimate.Streamer, userId string, htmlUser, htmlLive st //log.Println(ul) if ok2 { + clog.Views = &sql.NullInt64{Int64: ul.Views, Valid: true} clog.LiveTitle = &sql.NullString{String: ul.Title, Valid: true} startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local) diff --git a/extractor/twitcasting_extractor/twitcasting_extractor.go b/extractor/twitcasting_extractor/twitcasting_extractor.go index da8c17d..d86acd6 100644 --- a/extractor/twitcasting_extractor/twitcasting_extractor.go +++ b/extractor/twitcasting_extractor/twitcasting_extractor.go @@ -13,12 +13,6 @@ import ( "github.com/474420502/requests" ) -// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) - -// estore 解析存储连接实例 -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() - type LiveData struct { UserName string `exp:"//span[@class='tw-live-author__info-username']" method:"Text"` Follower int64 `exp:"(//span[@class='tw-user-nav-list-count'])[2]" method:"r:ExtractNumber"` diff --git a/extractor/twitch_extractor/tiwtch_extractor.go b/extractor/twitch_extractor/tiwtch_extractor.go index f502479..086dce2 100644 --- a/extractor/twitch_extractor/tiwtch_extractor.go +++ b/extractor/twitch_extractor/tiwtch_extractor.go @@ -365,6 +365,7 @@ func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error { vint, _ := intimate.ParseNumber(txt) clog.Views = &sql.NullInt64{Int64: vint, Valid: true} + clog.PCU = clog.Views // log.Println("views:", txt) views.Click() diff --git a/extractor_field.go b/extractor_field.go index c38962d..63314c3 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -75,22 +75,22 @@ type CollectLog struct { LogUid int64 `field:"log_uid"` // 日志id StreamerUid int64 `field:"streamer_uid"` // StreamerId 表id与 - Platform Platform `field:"platform"` // - UserId string `field:"user_id"` // 平台的UserId - IsLiveStreaming bool `field:"is_live_streaming"` // - IsError bool `field:"is_error"` // - Followers *sql.NullInt64 `field:"followers"` // - Views *sql.NullInt64 `field:"views"` // - Giver interface{} `field:"giver"` // - Gratuity *sql.NullInt64 `field:"gratuity"` // - LiveTitle *sql.NullString `field:"live_title"` // - LiveStartTime *sql.NullTime `field:"live_start_time"` // - LiveEndTime *sql.NullTime `field:"live_end_time"` // - UpdateTime *sql.NullTime `field:"update_time"` // - Tags interface{} `field:"tags"` - Ext interface{} `field:"ext"` // - ErrorMsg *sql.NullString `field:"error_msg"` // - Comments interface{} `field:"comments"` // + Platform Platform `field:"platform"` // + UserId string `field:"user_id"` // 平台的UserId + PCU *sql.NullInt64 `field:"pcu"` // + IsError bool `field:"is_error"` // + Followers *sql.NullInt64 `field:"followers"` // + Views *sql.NullInt64 `field:"views"` // + Giver interface{} `field:"giver"` // + Gratuity *sql.NullInt64 `field:"gratuity"` // + LiveTitle *sql.NullString `field:"live_title"` // + LiveStartTime *sql.NullTime `field:"live_start_time"` // + LiveEndTime *sql.NullTime `field:"live_end_time"` // + UpdateTime *sql.NullTime `field:"update_time"` // + Tags interface{} `field:"tags"` + Ext interface{} `field:"ext"` // + ErrorMsg *sql.NullString `field:"error_msg"` // + Comments interface{} `field:"comments"` // } // Get Simple Value diff --git a/store.go b/store.go index a3179dd..71e06b9 100644 --- a/store.go +++ b/store.go @@ -1,12 +1,7 @@ package intimate import ( - "crypto/md5" "database/sql" - "fmt" - "log" - "strings" - "time" _ "github.com/go-sql-driver/mysql" ) @@ -47,441 +42,3 @@ type StoreSource struct { errorCount int errorLimit int } - -func (store *StoreSource) PopCount() int { - return store.popCount -} - -func (store *StoreSource) Close() error { - return store.db.Close() -} - -// NewSourceStore 创建一个存储实例 -func NewStoreSource(table string) *StoreSource { - db, err := sql.Open("mysql", InitConfig.Database.SourceURI) - if err != nil { - panic(err) - } - return &StoreSource{table: table, db: db} -} - -func (store *StoreSource) errorAlarm(err error) { - if err != nil { - log.Println("store error: ", err) - // 报警. 如果数据插入有问题 - store.errorCount++ - if store.errorCount >= store.errorLimit { - // 数据库频繁操作初问题 报警, 减少没意义的请求 - } - } else { - if store.errorCount > 0 { - store.errorCount-- - } - } -} - -// Insert 插入数据 -func (store *StoreSource) Insert(isource IGet) { - _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)", isource.Get("Url"), isource.Get("Target"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("StreamerId")) - if err != nil { - panic(err) - } -} - -// Deduplicate 去重 -func (store *StoreSource) Deduplicate(target Target, field string) { - sql := `DELETE FROM ` + store.table + ` WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, ` + field + ` FROM ` + store.table + ` force index(target_type_idx) WHERE target_type = "` + string(target) + `" ) s GROUP BY s.` + string(field) + `) ;` - _, err := store.db.Exec(sql) - if err != nil { - panic(err) - } -} - -// Update 更新数据 -func (store *StoreSource) Update(isource IGet) { - _, err := store.db.Exec("update "+store.table+" set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?", isource.Get("Ext"), isource.Get("PassGob"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) - if err != nil { - panic(err) - } -} - -// UpdateOperator 更新数据操作标志位 -func (store *StoreSource) UpdateOperator(isource IGet) { - _, err := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) - if err != nil { - panic(err) - } -} - -// UpdateError 更新错误数据 -func (store *StoreSource) UpdateError(isource IGetSet, err error) { - isource.Set("Operator", int32(OperatorError)+isource.Get("Operator").(int32)) - isource.Set("ErrorMsg", sql.NullString{String: err.Error(), Valid: true}) - _, dberr := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) - if dberr != nil { - // email tell owner to deal with - panic(err) - } -} - -// Restore 恢复Operator数据状态 -func (store *StoreSource) Restore(isource IGet) { - _, dberr := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.Get("LastOperator"), isource.Get("Uid")) - if dberr != nil { - // email tell owner to deal with - panic(dberr) - } -} - -// Pop 弹出一条未处理的数据 -func (store *StoreSource) Pop(targetType Target, operators ...int32) (*Source, error) { - - tx, err := store.db.Begin() - if err != nil { - return nil, err - } - var args = []interface{}{string(targetType)} - selectSQL := `select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store.table + ` where target_type = ?` - if len(operators) == 0 { - selectSQL += " and operator = ?" - args = append(args, 0) - } else { - for _, operator := range operators { - selectSQL += " and operator = ?" - args = append(args, operator) - } - } - - // log.Println(selectSQL + ` limit 1 for update`) - row := tx.QueryRow(selectSQL+` limit 1 for update`, args...) - - defer func() { - err := tx.Commit() - if err != nil { - log.Println(err) - err = tx.Rollback() - if err != nil { - log.Println(err) - } - } - store.popCount++ - }() - - s := &Source{} - // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.Url, &s.Target, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime, &s.StreamerId) - if err != nil { - return nil, err - } - - s.Set("LastOperator", s.Operator) - _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) - return s, nil -} - -// StreamerTable 主播表名称 -const StreamerTable string = "streamer" - -// CollectLogTable 采集日志表 -const CollectLogTable string = "collect_log" - -// StreamerListTable 主播表名称 -const StreamerListTable string = "streamer_list" - -type StoreExtractor struct { - db *sql.DB - - popCount int - errorCount int - errorLimit int -} - -func (store *StoreExtractor) PopCount() int { - return store.popCount -} - -func (store *StoreExtractor) Close() error { - return store.db.Close() -} - -func (store *StoreExtractor) errorAlarm(err error) { - if err != nil { - log.Println("store error: ", err) - // 报警. 如果数据插入有问题 - store.errorCount++ - if store.errorCount >= store.errorLimit { - // 数据库频繁操作初问题 报警, 减少没意义的请求 - } - } else { - if store.errorCount > 0 { - store.errorCount-- - } - } -} - -// NewStoreExtractor 生成一个extractor库的相关链接 -func NewStoreExtractor() *StoreExtractor { - db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) - if err != nil { - panic(err) - } - return &StoreExtractor{db: db} -} - -// PopNoWait 弹出一个不用按时间间隔更新的主播信息, 主要用来测试. -func (store *StoreExtractor) PopNoWait(platform Platform, condition string, operators ...int32) (*Streamer, error) { - tx, err := store.db.Begin() - if err != nil { - return nil, err - } - var args = []interface{}{string(platform)} - selectSQL := `select uid, update_time, user_id, tags, live_url, update_url, is_update_streamer, update_interval from ` + StreamerTable + ` where platform = ? and ` + condition - if len(operators) == 0 { - selectSQL += " and operator = ?" - args = append(args, 0) - } else { - for _, operator := range operators { - selectSQL += " and operator = ?" - args = append(args, operator) - } - } - - defer func() { - err := tx.Commit() - if err != nil { - log.Println(err) - err = tx.Rollback() - if err != nil { - log.Println(err) - } - } - store.popCount++ - }() - - // log.Println(selectSQL + ` limit 1 for update`) - row := tx.QueryRow(selectSQL+` limit 1 for update`, args...) - - s := &Streamer{} - // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.Tags, &s.LiveUrl, &s.UpdateUrl, &s.IsUpdateStreamer, &s.UpdateInterval) - if err != nil { - return nil, err - } - s.Set("LastOperator", s.Operator) - _, err = tx.Exec("update "+StreamerTable+" set operator = ? where uid = ?", OperatorWait, s.Uid) - return s, nil -} - -// Pop 弹出一条未处理的数据 -func (store *StoreExtractor) Pop(platform Platform, operators ...int32) (*Streamer, error) { - - tx, err := store.db.Begin() - if err != nil { - return nil, err - } - var args = []interface{}{string(platform)} - selectSQL := `select uid, update_time, user_id, tags, live_url, update_url, is_update_streamer, update_interval from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval` - if len(operators) == 0 { - selectSQL += " and operator = ?" - args = append(args, 0) - } else { - for _, operator := range operators { - selectSQL += " and operator = ?" - args = append(args, operator) - } - } - - defer func() { - err := tx.Commit() - if err != nil { - log.Println(err) - err = tx.Rollback() - if err != nil { - log.Println(err) - } - } - store.popCount++ - }() - - // log.Println(selectSQL + ` limit 1 for update`) - row := tx.QueryRow(selectSQL+` limit 1 for update`, args...) - - s := &Streamer{} - // uid, url, target_type, source, ext, operator - err = row.Scan(&s.Uid, &s.UpdateTime, &s.UserId, &s.Tags, &s.LiveUrl, &s.UpdateUrl, &s.IsUpdateStreamer, &s.UpdateInterval) - if err != nil { - return nil, err - } - s.Set("LastOperator", s.Operator) - _, err = tx.Exec("update "+StreamerTable+" set operator = ? where uid = ?", OperatorWait, s.Uid) - return s, nil -} - -// UpdateStreamerList streamerlist表, 更新数据 -func (store *StoreExtractor) UpdateStreamerList(streamer IGet, fieldvalues ...interface{}) { - updateSQL := "UPDATE " + StreamerListTable + " SET " - var values []interface{} - for i := 0; i < len(fieldvalues); i += 2 { - field := fieldvalues[i] - values = append(values, fieldvalues[i+1]) - updateSQL += field.(string) + " = ? " - } - updateSQL += "WHERE urlhash = ?" - values = append(values, streamer.Get("UrlHash")) - _, err := store.db.Exec(updateSQL, values...) - if err != nil { - panic(err) - } -} - -// InsertStreamer streamerlist表, 插入数据 -func (store *StoreExtractor) InsertStreamerList(streamerlist IGet) (isExists bool) { - urlstr := streamerlist.Get("Url").(string) - - _, err := store.db.Exec("insert into streamer_list(urlhash, url, platform, label, serialize, update_interval, error_msg, operator) values(?,?,?,?,?,?,?,?)", - fmt.Sprintf("%x", md5.Sum([]byte(urlstr))), - urlstr, - streamerlist.Get("Platform"), - streamerlist.Get("Label"), - streamerlist.Get("Serialize"), - streamerlist.Get("UpdateInterval"), - streamerlist.Get("ErrorMsg"), - streamerlist.Get("Operator"), - ) - - if err != nil { - if !strings.HasPrefix(err.Error(), "Error 1062") { - log.Println(err) - } - return true - } - - return false -} - -// InsertStreamer Streamer表, 插入数据 -func (store *StoreExtractor) InsertStreamer(streamer *Streamer) (isExists bool) { - // select uid from table where platform = ? and user_id = ? - // selectSQL := "SELECT is_update_url, uid FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?" - tx, err := store.db.Begin() - if err != nil { - panic(err) - } - - defer func() { - err = tx.Commit() - if err != nil { - rerr := tx.Rollback() - if rerr != nil { - log.Println(rerr) - } - panic(err) - } - }() - - streamer.UpdateTime = &sql.NullTime{Time: time.Now().Add(-time.Hour * 100000), Valid: true} - _, err = tx.Exec("INSERT IGNORE INTO "+StreamerTable+"(platform, user_id, user_name, live_url, update_url, tags, update_time) VALUES(?,?,?,?,?,?,?);", - streamer.Platform, - streamer.UserId, - streamer.UserName, - streamer.LiveUrl, - streamer.UpdateUrl, - streamer.Tags, - streamer.UpdateTime, - ) - - if err != nil { - panic(err) - } - return false -} - -// UpdateError 更新错误数据 -func (store *StoreExtractor) UpdateError(isource IGetSet, err error) { - isource.Set("Operator", int32(OperatorError)+isource.Get("Operator").(int32)) - isource.Set("ErrorMsg", sql.NullString{String: err.Error(), Valid: true}) - _, dberr := store.db.Exec("update "+StreamerTable+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) - if dberr != nil { - // email tell owner to deal with - panic(err) - } -} - -// UpdateStreamerLog 只更新Streamer的关联日志和时间戳 -func (store *StoreExtractor) UpdateStreamerLog(latestUid int64, streamerUid int64) { - _, err := store.db.Exec("UPDATE "+StreamerTable+" SET latest_log_uid = ?, update_time = CURRENT_TIMESTAMP() WHERE uid = ?", latestUid, streamerUid) - if err != nil { - panic(err) - } -} - -// UpdateOperator Streamer表, 插入数据 -func (store *StoreExtractor) UpdateOperator(isource IGet) { - _, err := store.db.Exec("update "+StreamerTable+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) - if err != nil { - panic(err) - } -} - -// UpdateStreamer Streamer表, 插入数据 -func (store *StoreExtractor) UpdateStreamer(streamer IGet) { - // log.Printf("UPDATE "+StreamerTable+" SET user_name = %v, live_url = %v, channel = %v, latest_log_uid = %v, tags = %v, ext = %v, operator = %v, update_time = %v, update_interval = %v WHERE uid = %v", streamer.Get("UserName"), streamer.Get("LiveUrl"), streamer.Get("Channel"), streamer.Get("LatestLogUid"), streamer.Get("Tags"), streamer.Get("Ext"), streamer.Get("Operator"), streamer.Get("UpdateTime"), streamer.Get("UpdateInterval"), streamer.Get("Uid")) - _, err := store.db.Exec("UPDATE "+StreamerTable+" SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, tags = ?, ext = ?, operator = ?, update_time = ?, update_interval = ? WHERE uid = ?", - streamer.Get("UserName"), streamer.Get("LiveUrl"), streamer.Get("Channel"), streamer.Get("LatestLogUid"), streamer.Get("Tags"), streamer.Get("Ext"), streamer.Get("Operator"), streamer.Get("UpdateTime"), streamer.Get("UpdateInterval"), streamer.Get("Uid")) - if err != nil { - panic(err) - } -} - -// Update Streamer表, 更新指定的字段 -func (store *StoreExtractor) Update(streamer IGet, fieldvalues ...interface{}) { - updateSQL := "UPDATE " + StreamerTable + " SET " - var values []interface{} - for i := 0; i < len(fieldvalues); i += 2 { - field := fieldvalues[i] - values = append(values, fieldvalues[i+1]) - updateSQL += field.(string) + " = ?," - } - updateSQL = updateSQL[0 : len(updateSQL)-1] - updateSQL += "WHERE uid = ?" - values = append(values, streamer.Get("Uid")) - _, err := store.db.Exec(updateSQL, values...) - if err != nil { - log.Println(updateSQL) - panic(err) - } -} - -// InsertClog CollectLog表插入数据 -func (store *StoreExtractor) InsertClog(clog IGet) int64 { - tx, err := store.db.Begin() - - defer func() { - if err := recover(); err != nil { - tx.Rollback() - log.Panic(err) - } - }() - - if err != nil { - panic(err) - } - - result, err := tx.Exec("insert into "+CollectLogTable+"(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - clog.Get("StreamerUid"), clog.Get("Platform"), clog.Get("UserId"), clog.Get("IsLiveStreaming"), clog.Get("IsError"), clog.Get("Followers"), clog.Get("Views"), clog.Get("Giver"), clog.Get("Gratuity"), clog.Get("LiveTitle"), clog.Get("LiveStartTime"), clog.Get("LiveEndTime"), clog.Get("UpdateTime"), clog.Get("Tags"), clog.Get("Ext"), clog.Get("ErrorMsg"), - ) - if err != nil { - panic(err) - } - - logUid, err := result.LastInsertId() - if err != nil { - panic(err) - } - - _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, clog.Get("StreamerUid")) - if err = tx.Commit(); err != nil { - panic(err) - } - return logUid -} diff --git a/tasks/twitcasting/twitcasting_task1/twitcasting.go b/tasks/twitcasting/twitcasting_task1/twitcasting.go index efb9ea1..9b220ef 100644 --- a/tasks/twitcasting/twitcasting_task1/twitcasting.go +++ b/tasks/twitcasting/twitcasting_task1/twitcasting.go @@ -14,12 +14,6 @@ import ( "github.com/474420502/requests" ) -// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitcasting)) - -// estore 解析存储连接实例 -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() - type SearchProfile struct { UserName string `exp:".//span[@class='username']" method:"Text"` UserId string // `exp:".//span[@class='fullname']" method:"Text"`