diff --git a/extractor/openrec/main.go b/extractor/openrec/main.go index 7905807..0639189 100644 --- a/extractor/openrec/main.go +++ b/extractor/openrec/main.go @@ -1,5 +1,15 @@ package main +/* + `uid` varchar(36) NOT NULL, + `platform` varchar(255) NOT NULL, + `anchor_id` varchar(255) NOT NULL, + `anchor_name` varchar(255) NOT NULL, + `live_url` text, + `channel` varchar(128) DEFAULT NULL, + `show_type` varchar(255) DEFAULT NULL, +*/ + func main() { } diff --git a/extractor_field.go b/extractor_field.go new file mode 100644 index 0000000..2d15f03 --- /dev/null +++ b/extractor_field.go @@ -0,0 +1,392 @@ +package intimate + +import ( + "database/sql" + "time" +) + +type ISetAnchorInfo interface { + SetUid(int64) // + SetPlatform(string) // + SetAnchorId(string) // + SetAnchorName(string) // + SetLiveUrl(sql.NullString) // + SetChannel(sql.NullString) // + SetShowType(sql.NullString) // + SetExt(interface{}) // + SetUpdateTime(time.Time) // +} + +type IGetAnchorInfo interface { + GetUid() int64 // + GetPlatform() string // + GetAnchorId() string // + GetAnchorName() string // + GetLiveUrl() sql.NullString // + GetChannel() sql.NullString // + GetShowType() sql.NullString // + GetExt() interface{} // + GetUpdateTime() time.Time // +} + +/* +CREATE TABLE `anchor_info` ( + `uid` bigint AUTO_INCREMENT, + `platform` varchar(255) NOT NULL, + `anchor_id` varchar(255) NOT NULL, + `anchor_name` varchar(255) NOT NULL, + `live_url` text, + `channel` varchar(128) DEFAULT NULL, + `show_type` varchar(255) DEFAULT NULL, + `ext` json DEFAULT NULL, + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`uid`), + UNIQUE KEY `platform_anchor_id_idx` (`platform`, `anchor_id`), + KEY `platform_idx` (`platform`), + KEY `anchor_id_idx` (`anchor_id`), + KEY `anchor_name_idx` (`anchor_name`), + KEY `channel_idx` (`channel`), + KEY `show_type_idx` (`show_type`), + KEY `update_time_idx` (`update_time`) +); +*/ + +type AnchorInfo struct { + Uid int64 // + Platform string // + AnchorId string // + AnchorName string // + LiveUrl sql.NullString // + Channel sql.NullString // + ShowType sql.NullString // + Ext interface{} // + UpdateTime time.Time // +} + +// GetUpdateTime Get return UpdateTime time.Time +func (ai *AnchorInfo) GetUpdateTime() time.Time { + return ai.UpdateTime +} + +// SetUpdateTime Set UpdateTime time.Time +func (ai *AnchorInfo) SetUpdateTime(UpdateTime time.Time) { + ai.UpdateTime = UpdateTime +} + +// GetExt Get return Ext interface{} +func (ai *AnchorInfo) GetExt() interface{} { + return ai.Ext +} + +// SetExt Set Ext interface{} +func (ai *AnchorInfo) SetExt(Ext interface{}) { + ai.Ext = Ext +} + +// GetShowType Get return ShowType sql.NullString +func (ai *AnchorInfo) GetShowType() sql.NullString { + return ai.ShowType +} + +// SetShowType Set ShowType sql.NullString +func (ai *AnchorInfo) SetShowType(ShowType sql.NullString) { + ai.ShowType = ShowType +} + +// GetChannel Get return Channel sql.NullString +func (ai *AnchorInfo) GetChannel() sql.NullString { + return ai.Channel +} + +// SetChannel Set Channel sql.NullString +func (ai *AnchorInfo) SetChannel(Channel sql.NullString) { + ai.Channel = Channel +} + +// GetLiveUrl Get return LiveUrl sql.NullString +func (ai *AnchorInfo) GetLiveUrl() sql.NullString { + return ai.LiveUrl +} + +// SetLiveUrl Set LiveUrl sql.NullString +func (ai *AnchorInfo) SetLiveUrl(LiveUrl sql.NullString) { + ai.LiveUrl = LiveUrl +} + +// GetAnchorName Get return AnchorName string +func (ai *AnchorInfo) GetAnchorName() string { + return ai.AnchorName +} + +// SetAnchorName Set AnchorName string +func (ai *AnchorInfo) SetAnchorName(AnchorName string) { + ai.AnchorName = AnchorName +} + +// GetAnchorId Get return AnchorId string +func (ai *AnchorInfo) GetAnchorId() string { + return ai.AnchorId +} + +// SetAnchorId Set AnchorId string +func (ai *AnchorInfo) SetAnchorId(AnchorId string) { + ai.AnchorId = AnchorId +} + +// GetPlatform Get return Platform string +func (ai *AnchorInfo) GetPlatform() string { + return ai.Platform +} + +// SetPlatform Set Platform string +func (ai *AnchorInfo) SetPlatform(Platform string) { + ai.Platform = Platform +} + +// GetUid Get return Uid int64 +func (ai *AnchorInfo) GetUid() int64 { + return ai.Uid +} + +// SetUid Set Uid int64 +func (ai *AnchorInfo) SetUid(Uid int64) { + ai.Uid = Uid +} + +type IGetCollectLog interface { + GetUid() int64 // + GetPlatform() string // + GetAnchorId() string // + GetIsShowing() int32 // + GetIsError() int32 // + GetFollowers() sql.NullInt32 // + GetViews() sql.NullInt32 // + GetGiver() interface{} // + GetGratuity() sql.NullInt32 // + GetShowTitle() sql.NullString // + GetShowStartTime() sql.NullTime // + GetShowEndTime() sql.NullTime // + GetUpdateTime() time.Time // + GetExt() interface{} // + GetError() sql.NullString // +} + +type ISetCollectLog interface { + SetUid(int64) // + SetPlatform(string) // + SetAnchorId(string) // + SetIsShowing(int32) // + SetIsError(int32) // + SetFollowers(sql.NullInt32) // + SetViews(sql.NullInt32) // + SetGiver(interface{}) // + SetGratuity(sql.NullInt32) // + SetShowTitle(sql.NullString) // + SetShowStartTime(sql.NullTime) // + SetShowEndTime(sql.NullTime) // + SetUpdateTime(time.Time) // + SetExt(interface{}) // + SetError(sql.NullString) // +} + +/* +CREATE TABLE `collect_log` ( + `uid` bigint, + `platform` varchar(255) NOT NULL, + `anchor_id` varchar(255) NOT NULL, + + `is_showing` tinyint(1) DEFAULT NULL, + `is_error` tinyint(1) DEFAULT NULL, + + `followers` int(11) DEFAULT NULL, + `views` int(11) DEFAULT NULL, + `giver` json DEFAULT NULL, + `gratuity` int(11) DEFAULT NULL, + + `show_title` text DEFAULT NULL, + `show_start_time` timestamp NULL DEFAULT NULL, + `show_end_time` timestamp NULL DEFAULT NULL, + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, --时间戳从源数据里获取 + `ext` json DEFAULT NULL, + + `error` text DEFAULT NULL, + + KEY `uid_idx` (`uid`), + KEY `platform_idx` (`platform`), + KEY `anchor_id_idx` (`anchor_id`), + KEY `is_showing_idx` (`is_showing`), + KEY `is_error_idx` (`is_error`), + KEY `followers_idx` (`followers`), + KEY `views_idx` (`views`), + KEY `gratuity_idx` (`gratuity`), + KEY `update_time_idx` (`update_time`) +) +*/ + +type CollectLog struct { + Uid int64 // + Platform string // + AnchorId string // + IsShowing int32 // + IsError int32 // + Followers sql.NullInt32 // + Views sql.NullInt32 // + Giver interface{} // + Gratuity sql.NullInt32 // + ShowTitle sql.NullString // + ShowStartTime sql.NullTime // + ShowEndTime sql.NullTime // + UpdateTime time.Time // + Ext interface{} // + Error sql.NullString // +} + +// GetError Get return Error sql.NullString +func (cl *CollectLog) GetError() sql.NullString { + return cl.Error +} + +// SetError Set Error sql.NullString +func (cl *CollectLog) SetError(Error sql.NullString) { + cl.Error = Error +} + +// GetExt Get return Ext interface{} +func (cl *CollectLog) GetExt() interface{} { + return cl.Ext +} + +// SetExt Set Ext interface{} +func (cl *CollectLog) SetExt(Ext interface{}) { + cl.Ext = Ext +} + +// GetUpdateTime Get return UpdateTime time.Time +func (cl *CollectLog) GetUpdateTime() time.Time { + return cl.UpdateTime +} + +// SetUpdateTime Set UpdateTime time.Time +func (cl *CollectLog) SetUpdateTime(UpdateTime time.Time) { + cl.UpdateTime = UpdateTime +} + +// GetShowEndTime Get return ShowEndTime sql.NullTime +func (cl *CollectLog) GetShowEndTime() sql.NullTime { + return cl.ShowEndTime +} + +// SetShowEndTime Set ShowEndTime sql.NullTime +func (cl *CollectLog) SetShowEndTime(ShowEndTime sql.NullTime) { + cl.ShowEndTime = ShowEndTime +} + +// GetShowStartTime Get return ShowStartTime sql.NullTime +func (cl *CollectLog) GetShowStartTime() sql.NullTime { + return cl.ShowStartTime +} + +// SetShowStartTime Set ShowStartTime sql.NullTime +func (cl *CollectLog) SetShowStartTime(ShowStartTime sql.NullTime) { + cl.ShowStartTime = ShowStartTime +} + +// GetShowTitle Get return ShowTitle sql.NullString +func (cl *CollectLog) GetShowTitle() sql.NullString { + return cl.ShowTitle +} + +// SetShowTitle Set ShowTitle sql.NullString +func (cl *CollectLog) SetShowTitle(ShowTitle sql.NullString) { + cl.ShowTitle = ShowTitle +} + +// GetGratuity Get return Gratuity sql.NullInt32 +func (cl *CollectLog) GetGratuity() sql.NullInt32 { + return cl.Gratuity +} + +// SetGratuity Set Gratuity sql.NullInt32 +func (cl *CollectLog) SetGratuity(Gratuity sql.NullInt32) { + cl.Gratuity = Gratuity +} + +// GetGiver Get return Giver interface{} +func (cl *CollectLog) GetGiver() interface{} { + return cl.Giver +} + +// SetGiver Set Giver interface{} +func (cl *CollectLog) SetGiver(Giver interface{}) { + cl.Giver = Giver +} + +// GetViews Get return Views sql.NullInt32 +func (cl *CollectLog) GetViews() sql.NullInt32 { + return cl.Views +} + +// SetViews Set Views sql.NullInt32 +func (cl *CollectLog) SetViews(Views sql.NullInt32) { + cl.Views = Views +} + +// GetFollowers Get return Followers sql.NullInt32 +func (cl *CollectLog) GetFollowers() sql.NullInt32 { + return cl.Followers +} + +// SetFollowers Set Followers sql.NullInt32 +func (cl *CollectLog) SetFollowers(Followers sql.NullInt32) { + cl.Followers = Followers +} + +// GetIsError Get return IsError int32 +func (cl *CollectLog) GetIsError() int32 { + return cl.IsError +} + +// SetIsError Set IsError int32 +func (cl *CollectLog) SetIsError(IsError int32) { + cl.IsError = IsError +} + +// GetIsShowing Get return IsShowing int32 +func (cl *CollectLog) GetIsShowing() int32 { + return cl.IsShowing +} + +// SetIsShowing Set IsShowing int32 +func (cl *CollectLog) SetIsShowing(IsShowing int32) { + cl.IsShowing = IsShowing +} + +// GetAnchorId Get return AnchorId string +func (cl *CollectLog) GetAnchorId() string { + return cl.AnchorId +} + +// SetAnchorId Set AnchorId string +func (cl *CollectLog) SetAnchorId(AnchorId string) { + cl.AnchorId = AnchorId +} + +// GetPlatform Get return Platform string +func (cl *CollectLog) GetPlatform() string { + return cl.Platform +} + +// SetPlatform Set Platform string +func (cl *CollectLog) SetPlatform(Platform string) { + cl.Platform = Platform +} + +// GetUid Get return Uid int64 +func (cl *CollectLog) GetUid() int64 { + return cl.Uid +} + +// SetUid Set Uid int64 +func (cl *CollectLog) SetUid(Uid int64) { + cl.Uid = Uid +} diff --git a/source.go b/source_field.go similarity index 80% rename from source.go rename to source_field.go index f4aaf1e..8e7c77d 100644 --- a/source.go +++ b/source_field.go @@ -5,6 +5,29 @@ import ( "time" ) +// IGetSource 源接口结构 +type IGetSource interface { + GetUid() int64 // + GetUrl() string // + GetTargetType() string // + GetSource() sql.NullString // + GetExt() interface{} // + GetUpdateTime() time.Time // + GetOperator() int32 // + GetErrorMsg() sql.NullString // +} + +type IUpdateSource interface { + IGetSource + + GetLastOperator() int32 + + SetExt(ext interface{}) // + SetUpdateTime(ut time.Time) // + SetOperator(operator int32) // + SetErrorMsg(emsg sql.NullString) // +} + // Source 的结构体 type Source struct { Uid int64 // diff --git a/sql/intimate_extractor.sql b/sql/intimate_extractor.sql index f13534c..5f5728a 100644 --- a/sql/intimate_extractor.sql +++ b/sql/intimate_extractor.sql @@ -2,15 +2,17 @@ create database if not exists `intimate_extractor`; use intimate_extractor; CREATE TABLE IF NOT EXISTS `anchor_info` ( - `uid` varchar(36) NOT NULL, + `uid` bigint AUTO_INCREMENT, `platform` varchar(255) NOT NULL, `anchor_id` varchar(255) NOT NULL, `anchor_name` varchar(255) NOT NULL, `live_url` text, `channel` varchar(128) DEFAULT NULL, `show_type` varchar(255) DEFAULT NULL, + `ext` json DEFAULT NULL, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`uid`), + UNIQUE KEY `platform_anchor_id_idx` (`platform`, `anchor_id`), KEY `platform_idx` (`platform`), KEY `anchor_id_idx` (`anchor_id`), KEY `anchor_name_idx` (`anchor_name`), @@ -19,13 +21,13 @@ CREATE TABLE IF NOT EXISTS `anchor_info` ( KEY `update_time_idx` (`update_time`) ); -CREATE TABLE IF NOT EXISTS `show_log` ( - `uid` varchar(36) NOT NULL, +CREATE TABLE IF NOT EXISTS `collect_log` ( + `uid` bigint, `platform` varchar(255) NOT NULL, `anchor_id` varchar(255) NOT NULL, - `is_showing` tinyint(1) DEFAULT NULL, - `is_error` tinyint(1) DEFAULT NULL, + `is_showing` tinyint(1) DEFAULT 0, + `is_error` tinyint(1) DEFAULT 0, `followers` int(11) DEFAULT NULL, `views` int(11) DEFAULT NULL, diff --git a/store.go b/store.go index 268a43d..9d15378 100644 --- a/store.go +++ b/store.go @@ -4,34 +4,10 @@ import ( "database/sql" "errors" "log" - "time" _ "github.com/go-sql-driver/mysql" ) -// IGetSource 源接口结构 -type IGetSource interface { - GetUid() int64 // - GetUrl() string // - GetTargetType() string // - GetSource() sql.NullString // - GetExt() interface{} // - GetUpdateTime() time.Time // - GetOperator() int32 // - GetErrorMsg() sql.NullString // -} - -type IUpdateSource interface { - IGetSource - - GetLastOperator() int32 - - SetExt(ext interface{}) // - SetUpdateTime(ut time.Time) // - SetOperator(operator int32) // - SetErrorMsg(emsg sql.NullString) // -} - // OperatorFlag 标志 type OperatorFlag int32 @@ -78,7 +54,7 @@ func (store *SourceStore) errorAlarm(err error) { // Insert 插入数据 func (store *SourceStore) Insert(isource IGetSource) { - _, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) + _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) store.errorAlarm(err) } @@ -150,11 +126,74 @@ func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSou return nil, errors.New("TaskQueue is nil") } -// NewExtractorStore 创建一个存储实例 -func NewExtractorStore(table string) *SourceStore { - db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) - if err != nil { - panic(err) - } - return &SourceStore{table: table, db: db} +// AnchorTable 主播表名称 +const AnchorTable string = "anchor_info" + +// CollectLogTable 采集日志表 +const CollectLogTable string = "collect_log" + +type ExtractorStore struct { + db *sql.DB + + errorCount int + errorLimit int +} + +func (store *ExtractorStore) errorAlarm(err error) { + if err != nil { + log.Println("store error: ", err) + // 报警. 如果数据插入有问题 + store.errorCount++ + if store.errorCount >= store.errorLimit { + // 数据库频繁操作初问题 报警, 减少没意义的请求 + } + } else { + if store.errorCount > 0 { + store.errorCount-- + } + } +} + +/* + `uid` bigint, + `platform` varchar(255) NOT NULL, + `anchor_id` varchar(255) NOT NULL, + `anchor_name` varchar(255) NOT NULL, + `live_url` text, + `channel` varchar(128) DEFAULT NULL, + `show_type` varchar(255) DEFAULT NULL, +*/ + +// InsertAnchorInfo AnchorInfo表, 插入数据 +func (store *ExtractorStore) InsertAnchorInfo(isource IGetAnchorInfo) { + _, err := store.db.Exec("insert into "+AnchorTable+"(platform, anchor_id, anchor_name, live_url, channel, show_type, ext) values(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE", isource.GetPlatform(), isource.GetAnchorId(), isource.GetAnchorName(), isource.GetLiveUrl(), isource.GetChannel(), isource.GetShowType(), isource.GetExt()) + store.errorAlarm(err) +} + +/* + `uid` bigint, + `platform` varchar(255) NOT NULL, + `anchor_id` varchar(255) NOT NULL, + + `is_showing` tinyint(1) DEFAULT NULL, + `is_error` tinyint(1) DEFAULT NULL, + + `followers` int(11) DEFAULT NULL, + `views` int(11) DEFAULT NULL, + `giver` json DEFAULT NULL, + `gratuity` int(11) DEFAULT NULL, + + `show_title` text DEFAULT NULL, + `show_start_time` timestamp NULL DEFAULT NULL, + `show_end_time` timestamp NULL DEFAULT NULL, + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, --时间戳从源数据里获取 + `ext` json DEFAULT NULL, + + `error` text DEFAULT NULL, +*/ + +// InsertCollectLog CollectLog表插入数据 +func (store *ExtractorStore) InsertCollectLog(isource IGetAnchorInfo) { + _, err := store.db.Exec("insert into "+CollectLogTable+"(platform, anchor_id, anchor_name, live_url, channel, show_type, ext) values(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE", isource.GetPlatform(), isource.GetAnchorId(), isource.GetAnchorName(), isource.GetLiveUrl(), isource.GetChannel(), isource.GetShowType(), isource.GetExt()) + store.errorAlarm(err) }