package intimate import ( "database/sql" "log" "os" _ "github.com/go-sql-driver/mysql" ) // OperatorFlag 标志 type OperatorFlag int32 const ( // OperatorOK 等待被处理 OperatorOK OperatorFlag = 100 // OperatorExtractorOK 提取数据完成 OperatorExtractorOK OperatorFlag = 200 // OperatorWait 等待被处理 OperatorWait OperatorFlag = 1000 // OperatorError 错误标志 OperatorError OperatorFlag = 10000 ) type ISet interface { Set(string, interface{}) } type IGet interface { Get(string) interface{} } type IGetSet interface { ISet IGet } // SourceStore 储存 type SourceStore struct { table string db *sql.DB errorCount int errorLimit int } // NewSourceStore 创建一个存储实例 func NewSourceStore(table string) *SourceStore { db, err := sql.Open("mysql", InitConfig.Database.SourceURI) if err != nil { panic(err) } return &SourceStore{table: table, db: db} } func (store *SourceStore) errorAlarm(err error) { if err != nil { log.Println("store error: ", err) // 报警. 如果数据插入有问题 store.errorCount++ if store.errorCount >= store.errorLimit { // 数据库频繁操作初问题 报警, 减少没意义的请求 } } else { if store.errorCount > 0 { store.errorCount-- } } } // Insert 插入数据 func (store *SourceStore) Insert(isource IGet) { _, err := store.db.Exec("insert into "+store.table+"(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.Get("Url"), isource.Get("TargetType"), isource.Get("Source"), isource.Get("Ext"), isource.Get("Operator"), isource.Get("ErrorMsg")) if err != nil { log.Panic(err) } } // Update 更新数据 func (store *SourceStore) Update(isource IGet) { _, err := store.db.Exec("update "+store.table+" set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?", isource.Get("Ext"), isource.Get("PassGob"), isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) if err != nil { log.Panic(err) } } // UpdateOperator 更新数据操作标志位 func (store *SourceStore) UpdateOperator(isource IGet) { _, err := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) if err != nil { log.Panic(err) } } // UpdateError 更新错误数据 func (store *SourceStore) UpdateError(isource IGetSet, err error) { isource.Set("Operator", int32(OperatorError)) isource.Set("ErrorMsg", sql.NullString{String: err.Error(), Valid: true}) _, dberr := store.db.Exec("update "+store.table+" set operator = ?, error_msg = ? where uid = ?", isource.Get("Operator"), isource.Get("ErrorMsg"), isource.Get("Uid")) if dberr != nil { log.Panic(err) } } // Restore 恢复Operator数据状态 func (store *SourceStore) Restore(isource IGet) { _, err := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.Get("LastOperator"), isource.Get("Uid")) if err != nil { log.Panic(err) } } // Pop 弹出一条未处理的数据 func (store *SourceStore) Pop(targetType string, operators ...int32) (*Source, error) { tx, err := store.db.Begin() if err != nil { return nil, err } var args = []interface{}{targetType} selectSQL := `select uid, url, target_type, source, ext, operator, update_time from ` + store.table + ` where target_type = ?` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) } else { for _, operator := range operators { selectSQL += " and operator = ?" args = append(args, operator) } } // log.Println(selectSQL + ` limit 1 for update`) row := tx.QueryRow(selectSQL+` limit 1 for update`, args...) defer func() { err := tx.Commit() if err != nil { log.Println(err) err = tx.Rollback() if err != nil { log.Println(err) } } }() s := &Source{} // uid, url, target_type, source, ext, operator err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator, &s.UpdateTime) if err != nil { return nil, err } s.Set("LastOperator", s.Operator) _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) return s, nil } // StreamerTable 主播表名称 const StreamerTable string = "streamer" // CollectLogTable 采集日志表 const CollectLogTable string = "collect_log" type ExtractorStore struct { db *sql.DB errorCount int errorLimit int } func (store *ExtractorStore) errorAlarm(err error) { if err != nil { log.Panic("store error: ", err) // 报警. 如果数据插入有问题 store.errorCount++ if store.errorCount >= store.errorLimit { // 数据库频繁操作初问题 报警, 减少没意义的请求 } } else { if store.errorCount > 0 { store.errorCount-- } } } func NewExtractorStore() *ExtractorStore { db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) if err != nil { log.Panic(err) } return &ExtractorStore{db: db} } /* `uid` bigint, `platform` varchar(255) NOT NULL, `anchor_id` varchar(255) NOT NULL, `anchor_name` varchar(255) NOT NULL, `live_url` text, `channel` varchar(128) DEFAULT NULL, `show_type` varchar(255) DEFAULT NULL, */ // UpdateStreamerLogUid Streamer表, 插入数据 func (store *ExtractorStore) UpdateStreamerLogUid(logUid, streamerUid int64) error { _, err := store.db.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, streamerUid) if err != nil { log.Panic(err) } return err } // InsertStreamer Streamer表, 插入数据 func (store *ExtractorStore) InsertStreamer(isource IGet) (Uid int64, err error) { // select uid from table where platform = ? and user_id = ? selectSQL := "select uid from " + StreamerTable + " where platform = ? and user_id = ?" tx, err := store.db.Begin() if err != nil { log.Println(err) return 0, err } defer func() { err = tx.Commit() if err != nil { log.Println(err) err = tx.Rollback() if err != nil { log.Println(err) } Uid = 0 } }() row := tx.QueryRow(selectSQL+` limit 1 for update`, isource.Get("Platform"), isource.Get("UserId")) var uid int64 if err = row.Scan(&uid); err == nil { return uid, nil } else { log.Println(err) } result, err := tx.Exec("insert into "+StreamerTable+"(platform, user_id, user_name, live_url, channel, latest_log_uid, ext) values(?,?,?,?,?,?,?);", isource.Get("Platform"), isource.Get("UserId"), isource.Get("UserName"), isource.Get("LiveUrl"), isource.Get("Channel"), isource.Get("LatestLogUid"), isource.Get("Ext")) if err != nil { log.Println(err) return 0, nil } return result.LastInsertId() } // InsertCollectLog CollectLog表插入数据 func (store *ExtractorStore) InsertCollectLog(isource IGet) (int64, error) { tx, err := store.db.Begin() defer func() { if err := recover(); err != nil { log.Println(err) err = tx.Rollback() if err != nil { log.Println(err) } os.Exit(0) } }() if err != nil { log.Panic(err) } result, err := tx.Exec("insert into "+CollectLogTable+"(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", isource.Get("StreamerUid"), isource.Get("Platform"), isource.Get("UserId"), isource.Get("IsLiveStreaming"), isource.Get("IsError"), isource.Get("Followers"), isource.Get("Views"), isource.Get("Giver"), isource.Get("Gratuity"), isource.Get("LiveTitle"), isource.Get("LiveStartTime"), isource.Get("LiveEndTime"), isource.Get("UpdateTime"), isource.Get("Tags"), isource.Get("Ext"), isource.Get("ErrorMsg"), ) if err != nil { log.Panic(err) } logUid, err := result.LastInsertId() if err != nil { log.Panic(err) } _, err = tx.Exec("update "+StreamerTable+" set latest_log_uid = ? where uid = ?", logUid, isource.Get("StreamerUid")) if err = tx.Commit(); err != nil { log.Panic(err) } return result.LastInsertId() }