package intimate import ( "database/sql" "errors" "log" _ "github.com/go-sql-driver/mysql" ) // OperatorFlag 标志 type OperatorFlag int32 const ( // OperatorOK 等待被处理 OperatorOK OperatorFlag = 100 // OperatorWait 等待被处理 OperatorWait OperatorFlag = 1000 // OperatorError 错误标志 OperatorError OperatorFlag = 10000 ) // SourceStore 储存 type SourceStore struct { table string db *sql.DB errorCount int errorLimit int } // NewSourceStore 创建一个存储实例 func NewSourceStore(table string) *SourceStore { db, err := sql.Open("mysql", InitConfig.Database.SourceURI) if err != nil { panic(err) } return &SourceStore{table: table, db: db} } func (store *SourceStore) errorAlarm(err error) { if err != nil { log.Println("store error: ", err) // 报警. 如果数据插入有问题 store.errorCount++ if store.errorCount >= store.errorLimit { // 数据库频繁操作初问题 报警, 减少没意义的请求 } } else { if store.errorCount > 0 { store.errorCount-- } } } // Insert 插入数据 func (store *SourceStore) Insert(isource IGetSource) { _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) store.errorAlarm(err) } // Update 更新数据 func (store *SourceStore) Update(isource IUpdateSource) { _, err := store.db.Exec("update "+store.table+" set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?", isource.GetExt(), isource.GetPassGob(), isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) store.errorAlarm(err) } // UpdateError 更新错误数据 func (store *SourceStore) UpdateError(isource IUpdateSource, err error) { isource.SetOperator(int32(OperatorError)) isource.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) _, dberr := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) store.errorAlarm(dberr) } // Restore 恢复Operator数据状态 func (store *SourceStore) Restore(isource IUpdateSource) { _, err := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.GetLastOperator(), isource.GetUid()) store.errorAlarm(err) } // Pop 弹出一条未处理的数据 func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSource, error) { tx, err := store.db.Begin() if err != nil { log.Println(err, targetType) return nil, err } var args = []interface{}{targetType} selectSQL := `select uid, url, target_type, source, ext, operator from ` + store.table + ` where target_type = ?` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) } else { for _, operator := range operators { selectSQL += " and operator = ?" args = append(args, operator) } } // log.Println(selectSQL + ` limit 1 for update`) row := tx.QueryRow(selectSQL+` limit 1 for update`, args...) defer func() { err := tx.Commit() if err != nil { log.Println(err) err = tx.Rollback() if err != nil { log.Println(err) } } }() if row != nil { s := &Source{} // uid, url, target_type, source, ext, operator err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator) s.SetLastOperator(s.Operator) if err != nil { log.Println(err, targetType) _, err = tx.Exec("update "+store.table+" set error_msg = ?, operator = ? where uid = ?", OperatorError, s.Uid) if err != nil { log.Println(err) } return nil, err } _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) return s, nil } return nil, errors.New("TaskQueue is nil") } // AnchorTable 主播表名称 const AnchorTable string = "anchor_info" // CollectLogTable 采集日志表 const CollectLogTable string = "collect_log" type ExtractorStore struct { db *sql.DB errorCount int errorLimit int } func (store *ExtractorStore) errorAlarm(err error) { if err != nil { log.Panic("store error: ", err) // 报警. 如果数据插入有问题 store.errorCount++ if store.errorCount >= store.errorLimit { // 数据库频繁操作初问题 报警, 减少没意义的请求 } } else { if store.errorCount > 0 { store.errorCount-- } } } func NewExtractorStore() *ExtractorStore { db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) if err != nil { panic(err) } return &ExtractorStore{db: db} } /* `uid` bigint, `platform` varchar(255) NOT NULL, `anchor_id` varchar(255) NOT NULL, `anchor_name` varchar(255) NOT NULL, `live_url` text, `channel` varchar(128) DEFAULT NULL, `show_type` varchar(255) DEFAULT NULL, */ // InsertAnchorInfo AnchorInfo表, 插入数据 func (store *ExtractorStore) InsertAnchorInfo(isource IGetAnchorInfo) { _, err := store.db.Exec("insert into "+AnchorTable+"(platform, anchor_id, anchor_name, live_url, channel, show_type, ext) values(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE", isource.GetPlatform(), isource.GetAnchorId(), isource.GetAnchorName(), isource.GetLiveUrl(), isource.GetChannel(), isource.GetShowType(), isource.GetExt()) store.errorAlarm(err) } /* `uid` bigint, `platform` varchar(255) NOT NULL, `anchor_id` varchar(255) NOT NULL, `is_showing` tinyint(1) DEFAULT NULL, `is_error` tinyint(1) DEFAULT NULL, `followers` int(11) DEFAULT NULL, `views` int(11) DEFAULT NULL, `giver` json DEFAULT NULL, `gratuity` int(11) DEFAULT NULL, `show_title` text DEFAULT NULL, `show_start_time` timestamp NULL DEFAULT NULL, `show_end_time` timestamp NULL DEFAULT NULL, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, --时间戳从源数据里获取 `ext` json DEFAULT NULL, `error_msg` text DEFAULT NULL, */ // InsertCollectLog CollectLog表插入数据 func (store *ExtractorStore) InsertCollectLog(isource IGetCollectLog) { _, err := store.db.Exec("insert into "+CollectLogTable+"(uid, platform, anchor_id, is_showing, is_error, followers, views, giver, gratuity, show_title, show_start_time, show_end_time, update_time, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", isource.GetUid(), isource.GetPlatform(), isource.GetAnchorId(), isource.GetIsShowing(), isource.GetIsError(), isource.GetFollowers(), isource.GetViews(), isource.GetGiver(), isource.GetGratuity(), isource.GetShowTitle(), isource.GetShowStartTime(), isource.GetShowEndTime(), isource.GetUpdateTime(), isource.GetExt(), isource.GetErrorMsg(), ) store.errorAlarm(err) }