2020-07-07 10:39:24 +00:00
package intimate
2020-07-06 08:33:35 +00:00
import (
2020-08-05 10:49:47 +00:00
"crypto/md5"
2020-07-06 08:33:35 +00:00
"database/sql"
2020-08-05 10:49:47 +00:00
"fmt"
2020-07-06 09:58:24 +00:00
"log"
2020-08-05 10:49:47 +00:00
"strings"
2020-07-20 10:13:54 +00:00
"time"
2020-07-06 08:33:35 +00:00
_ "github.com/go-sql-driver/mysql"
)
2020-07-08 07:02:55 +00:00
// OperatorFlag 标志
type OperatorFlag int32
const (
2020-07-09 03:38:51 +00:00
// OperatorOK 等待被处理
OperatorOK OperatorFlag = 100
2020-07-16 08:22:14 +00:00
// OperatorExtractorOK 提取数据完成
OperatorExtractorOK OperatorFlag = 200
2020-07-08 07:02:55 +00:00
// OperatorWait 等待被处理
OperatorWait OperatorFlag = 1000
// OperatorError 错误标志
OperatorError OperatorFlag = 10000
)
2020-07-16 07:25:55 +00:00
type ISet interface {
Set ( string , interface { } )
}
2020-07-17 10:21:38 +00:00
type IGet interface {
Get ( string ) interface { }
}
type IGetSet interface {
ISet
IGet
}
2020-07-10 04:05:33 +00:00
// SourceStore 储存
2020-07-20 10:13:54 +00:00
type StoreSource struct {
2020-07-22 12:00:02 +00:00
table string
db * sql . DB
popCount int
2020-07-09 03:38:51 +00:00
errorCount int
errorLimit int
2020-07-06 08:33:35 +00:00
}
2020-07-22 12:00:02 +00:00
func ( store * StoreSource ) PopCount ( ) int {
return store . popCount
}
func ( store * StoreSource ) Close ( ) error {
return store . db . Close ( )
}
2020-07-10 04:05:33 +00:00
// NewSourceStore 创建一个存储实例
2020-07-20 10:13:54 +00:00
func NewStoreSource ( table string ) * StoreSource {
2020-07-10 04:05:33 +00:00
db , err := sql . Open ( "mysql" , InitConfig . Database . SourceURI )
2020-07-06 08:33:35 +00:00
if err != nil {
panic ( err )
}
2020-07-20 10:13:54 +00:00
return & StoreSource { table : table , db : db }
2020-07-06 08:33:35 +00:00
}
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) errorAlarm ( err error ) {
2020-07-06 09:58:24 +00:00
if err != nil {
2020-07-09 03:38:51 +00:00
log . Println ( "store error: " , err )
// 报警. 如果数据插入有问题
store . errorCount ++
if store . errorCount >= store . errorLimit {
// 数据库频繁操作初问题 报警, 减少没意义的请求
}
} else {
if store . errorCount > 0 {
store . errorCount --
}
2020-07-06 09:58:24 +00:00
}
2020-07-06 08:33:35 +00:00
}
2020-07-08 07:02:55 +00:00
2020-07-10 04:05:33 +00:00
// Insert 插入数据
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) Insert ( isource IGet ) {
2020-07-23 10:29:56 +00:00
_ , err := store . db . Exec ( "insert into " + store . table + "(url, target_type, source, ext, operator, error_msg, streamer_id) values(?,?,?,?,?,?,?)" , isource . Get ( "Url" ) , isource . Get ( "Target" ) , isource . Get ( "Source" ) , isource . Get ( "Ext" ) , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "StreamerId" ) )
2020-07-17 10:21:38 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-09 03:38:51 +00:00
}
2020-07-24 10:48:33 +00:00
// Deduplicate 去重
func ( store * StoreSource ) Deduplicate ( target Target , field string ) {
2020-07-26 16:35:41 +00:00
sql := ` DELETE FROM ` + store . table + ` WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, ` + field + ` FROM ` + store . table + ` force index(target_type_idx) WHERE target_type = " ` + string ( target ) + ` " ) s GROUP BY s. ` + string ( field ) + ` ) ; `
_ , err := store . db . Exec ( sql )
2020-07-24 10:48:33 +00:00
if err != nil {
panic ( err )
}
}
2020-07-10 04:05:33 +00:00
// Update 更新数据
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) Update ( isource IGet ) {
2020-07-17 10:21:38 +00:00
_ , err := store . db . Exec ( "update " + store . table + " set ext = ?, pass_gob = ?, operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Ext" ) , isource . Get ( "PassGob" ) , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-09 03:38:51 +00:00
}
2020-07-16 08:22:14 +00:00
// UpdateOperator 更新数据操作标志位
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) UpdateOperator ( isource IGet ) {
2020-07-17 10:21:38 +00:00
_ , err := store . db . Exec ( "update " + store . table + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-16 08:22:14 +00:00
}
2020-07-13 10:10:48 +00:00
// UpdateError 更新错误数据
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) UpdateError ( isource IGetSet , err error ) {
isource . Set ( "Operator" , int32 ( OperatorError ) + isource . Get ( "Operator" ) . ( int32 ) )
2020-07-17 10:21:38 +00:00
isource . Set ( "ErrorMsg" , sql . NullString { String : err . Error ( ) , Valid : true } )
_ , dberr := store . db . Exec ( "update " + store . table + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if dberr != nil {
2020-07-20 10:13:54 +00:00
// email tell owner to deal with
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-13 10:10:48 +00:00
}
2020-07-10 04:05:33 +00:00
// Restore 恢复Operator数据状态
2020-07-20 10:13:54 +00:00
func ( store * StoreSource ) Restore ( isource IGet ) {
_ , dberr := store . db . Exec ( "update " + store . table + " set operator = ? where uid = ?" , isource . Get ( "LastOperator" ) , isource . Get ( "Uid" ) )
if dberr != nil {
// email tell owner to deal with
panic ( dberr )
2020-07-17 10:21:38 +00:00
}
2020-07-10 04:05:33 +00:00
}
// Pop 弹出一条未处理的数据
2020-07-23 10:29:56 +00:00
func ( store * StoreSource ) Pop ( targetType Target , operators ... int32 ) ( * Source , error ) {
2020-07-08 07:02:55 +00:00
tx , err := store . db . Begin ( )
if err != nil {
return nil , err
}
2020-07-23 10:29:56 +00:00
var args = [ ] interface { } { string ( targetType ) }
2020-07-21 07:05:56 +00:00
selectSQL := ` select uid, url, target_type, source, ext, operator, update_time, streamer_id from ` + store . table + ` where target_type = ? `
2020-07-08 07:02:55 +00:00
if len ( operators ) == 0 {
selectSQL += " and operator = ?"
args = append ( args , 0 )
} else {
for _ , operator := range operators {
selectSQL += " and operator = ?"
args = append ( args , operator )
}
}
2020-07-08 10:57:57 +00:00
// log.Println(selectSQL + ` limit 1 for update`)
row := tx . QueryRow ( selectSQL + ` limit 1 for update ` , args ... )
2020-07-08 07:02:55 +00:00
defer func ( ) {
err := tx . Commit ( )
if err != nil {
log . Println ( err )
err = tx . Rollback ( )
if err != nil {
log . Println ( err )
}
}
2020-07-22 12:00:02 +00:00
store . popCount ++
2020-07-08 07:02:55 +00:00
} ( )
2020-07-16 03:02:30 +00:00
s := & Source { }
// uid, url, target_type, source, ext, operator
2020-07-23 10:29:56 +00:00
err = row . Scan ( & s . Uid , & s . Url , & s . Target , & s . Source , & s . Ext , & s . Operator , & s . UpdateTime , & s . StreamerId )
2020-07-16 03:02:30 +00:00
if err != nil {
return nil , err
2020-07-08 07:02:55 +00:00
}
2020-07-22 12:00:02 +00:00
2020-07-17 10:21:38 +00:00
s . Set ( "LastOperator" , s . Operator )
2020-07-16 03:02:30 +00:00
_ , err = tx . Exec ( "update " + store . table + " set operator = ? where uid = ?" , OperatorWait , s . Uid )
return s , nil
2020-07-08 07:02:55 +00:00
}
2020-07-10 04:05:33 +00:00
2020-07-17 10:21:38 +00:00
// StreamerTable 主播表名称
const StreamerTable string = "streamer"
2020-07-10 08:13:08 +00:00
// CollectLogTable 采集日志表
const CollectLogTable string = "collect_log"
2020-08-05 10:49:47 +00:00
// StreamerListTable 主播表名称
const StreamerListTable string = "streamer_list"
2020-07-20 10:13:54 +00:00
type StoreExtractor struct {
2020-07-10 08:13:08 +00:00
db * sql . DB
2020-07-22 12:00:02 +00:00
popCount int
2020-07-10 08:13:08 +00:00
errorCount int
errorLimit int
}
2020-07-22 12:00:02 +00:00
func ( store * StoreExtractor ) PopCount ( ) int {
return store . popCount
}
func ( store * StoreExtractor ) Close ( ) error {
return store . db . Close ( )
}
2020-07-20 10:13:54 +00:00
func ( store * StoreExtractor ) errorAlarm ( err error ) {
2020-07-10 04:05:33 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
log . Println ( "store error: " , err )
2020-07-10 08:13:08 +00:00
// 报警. 如果数据插入有问题
store . errorCount ++
if store . errorCount >= store . errorLimit {
// 数据库频繁操作初问题 报警, 减少没意义的请求
}
} else {
if store . errorCount > 0 {
store . errorCount --
}
2020-07-10 04:05:33 +00:00
}
2020-07-10 08:13:08 +00:00
}
2020-07-20 10:13:54 +00:00
// NewStoreExtractor 生成一个extractor库的相关链接
func NewStoreExtractor ( ) * StoreExtractor {
2020-07-10 10:31:17 +00:00
db , err := sql . Open ( "mysql" , InitConfig . Database . ExtractorURI )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-10 10:31:17 +00:00
}
2020-07-20 10:13:54 +00:00
return & StoreExtractor { db : db }
2020-07-10 10:31:17 +00:00
}
2020-07-20 10:13:54 +00:00
// Pop 弹出一条未处理的数据
2020-07-23 10:29:56 +00:00
func ( store * StoreExtractor ) Pop ( platform Platform , operators ... int32 ) ( * Streamer , error ) {
2020-07-20 10:13:54 +00:00
tx , err := store . db . Begin ( )
2020-07-17 10:21:38 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
return nil , err
2020-07-17 10:21:38 +00:00
}
2020-07-23 10:29:56 +00:00
var args = [ ] interface { } { string ( platform ) }
2020-07-31 10:04:10 +00:00
selectSQL := ` select uid, update_time, user_id, update_url, is_update_streamer, update_interval from ` + StreamerTable + ` where platform = ? and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval `
2020-07-20 10:13:54 +00:00
if len ( operators ) == 0 {
selectSQL += " and operator = ?"
args = append ( args , 0 )
} else {
for _ , operator := range operators {
selectSQL += " and operator = ?"
args = append ( args , operator )
}
}
defer func ( ) {
err := tx . Commit ( )
if err != nil {
log . Println ( err )
err = tx . Rollback ( )
if err != nil {
log . Println ( err )
}
}
2020-07-22 12:00:02 +00:00
store . popCount ++
2020-07-20 10:13:54 +00:00
} ( )
// log.Println(selectSQL + ` limit 1 for update`)
row := tx . QueryRow ( selectSQL + ` limit 1 for update ` , args ... )
s := & Streamer { }
// uid, url, target_type, source, ext, operator
2020-07-31 10:04:10 +00:00
err = row . Scan ( & s . Uid , & s . UpdateTime , & s . UserId , & s . UpdateUrl , & s . IsUpdateStreamer , & s . UpdateInterval )
2020-07-20 10:13:54 +00:00
if err != nil {
return nil , err
}
s . Set ( "LastOperator" , s . Operator )
_ , err = tx . Exec ( "update " + StreamerTable + " set operator = ? where uid = ?" , OperatorWait , s . Uid )
return s , nil
2020-07-17 10:21:38 +00:00
}
2020-08-05 10:49:47 +00:00
// UpdateStreamerList streamerlist表, 更新数据
func ( store * StoreExtractor ) UpdateStreamerList ( streamer IGet , fieldvalues ... interface { } ) {
updateSQL := "UPDATE " + StreamerListTable + " SET "
var values [ ] interface { }
for i := 0 ; i < len ( fieldvalues ) ; i += 2 {
field := fieldvalues [ i ]
values = append ( values , fieldvalues [ i + 1 ] )
updateSQL += field . ( string ) + " = ? "
}
updateSQL += "WHERE urlhash = ?"
values = append ( values , streamer . Get ( "UrlHash" ) )
_ , err := store . db . Exec ( updateSQL , values ... )
if err != nil {
panic ( err )
}
}
// InsertStreamer streamerlist表, 插入数据
func ( store * StoreExtractor ) InsertStreamerList ( streamerlist IGet ) ( isExists bool ) {
urlstr := streamerlist . Get ( "Url" ) . ( string )
_ , err := store . db . Exec ( "insert into streamer_list(urlhash, url, platform, label, serialize, update_interval, error_msg, operator) values(?,?,?,?,?,?,?,?)" ,
fmt . Sprintf ( "%x" , md5 . Sum ( [ ] byte ( urlstr ) ) ) ,
urlstr ,
streamerlist . Get ( "Platform" ) ,
streamerlist . Get ( "Label" ) ,
streamerlist . Get ( "Serialize" ) ,
streamerlist . Get ( "UpdateInterval" ) ,
streamerlist . Get ( "ErrorMsg" ) ,
streamerlist . Get ( "Operator" ) ,
)
if err != nil {
if ! strings . HasPrefix ( err . Error ( ) , "Error 1062" ) {
log . Println ( err )
}
return true
}
return false
}
2020-07-17 10:21:38 +00:00
// InsertStreamer Streamer表, 插入数据
2020-07-20 10:13:54 +00:00
func ( store * StoreExtractor ) InsertStreamer ( streamer IGet ) ( isExists bool ) {
2020-07-17 11:20:08 +00:00
// select uid from table where platform = ? and user_id = ?
2020-07-31 10:04:10 +00:00
selectSQL := "SELECT is_update_url, uid FROM " + StreamerTable + " WHERE platform = ? AND user_id = ?"
2020-07-15 10:22:40 +00:00
tx , err := store . db . Begin ( )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-15 10:22:40 +00:00
}
2020-07-16 10:31:13 +00:00
2020-07-17 11:20:08 +00:00
defer func ( ) {
err = tx . Commit ( )
if err != nil {
2020-07-20 10:13:54 +00:00
rerr := tx . Rollback ( )
if rerr != nil {
log . Println ( rerr )
2020-07-17 11:20:08 +00:00
}
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 11:20:08 +00:00
}
} ( )
2020-07-20 10:13:54 +00:00
row := tx . QueryRow ( selectSQL + ` LIMIT 1 FOR UPDATE ` , streamer . Get ( "Platform" ) , streamer . Get ( "UserId" ) )
2020-07-21 07:05:56 +00:00
var isUpdateUrl bool
2020-07-31 10:04:10 +00:00
var Uid int64
if err = row . Scan ( & isUpdateUrl , & Uid ) ; err == nil {
2020-07-21 07:05:56 +00:00
if isUpdateUrl {
2020-07-20 10:13:54 +00:00
tx . Exec ( "UPDATE " + StreamerTable + " SET update_url = ?" , streamer . Get ( "UpdateUrl" ) )
}
2020-07-31 10:04:10 +00:00
streamer . ( ISet ) . Set ( "Uid" , Uid )
2020-07-20 10:13:54 +00:00
return true
2020-07-15 10:22:40 +00:00
}
2020-07-31 10:04:10 +00:00
_ , err = tx . Exec ( "INSERT INTO " + StreamerTable + "(platform, user_id, update_url, update_time) VALUES(?,?,?,?);" , streamer . Get ( "Platform" ) , streamer . Get ( "UserId" ) , streamer . Get ( "UpdateUrl" ) , time . Now ( ) . Add ( - time . Minute * 60 ) )
2020-07-20 10:13:54 +00:00
if err != nil {
panic ( err )
}
return false
}
2020-07-15 10:22:40 +00:00
2020-07-20 10:13:54 +00:00
// UpdateError 更新错误数据
func ( store * StoreExtractor ) UpdateError ( isource IGetSet , err error ) {
isource . Set ( "Operator" , int32 ( OperatorError ) + isource . Get ( "Operator" ) . ( int32 ) )
isource . Set ( "ErrorMsg" , sql . NullString { String : err . Error ( ) , Valid : true } )
_ , dberr := store . db . Exec ( "update " + StreamerTable + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if dberr != nil {
// email tell owner to deal with
panic ( err )
}
}
// UpdateStreamerLog 只更新Streamer的关联日志和时间戳
func ( store * StoreExtractor ) UpdateStreamerLog ( latestUid int64 , streamerUid int64 ) {
_ , err := store . db . Exec ( "UPDATE " + StreamerTable + " SET latest_log_uid = ?, update_time = CURRENT_TIMESTAMP() WHERE uid = ?" , latestUid , streamerUid )
2020-07-15 10:22:40 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-15 10:22:40 +00:00
}
2020-07-20 10:13:54 +00:00
}
2020-07-15 10:22:40 +00:00
2020-07-20 10:13:54 +00:00
// UpdateOperator Streamer表, 插入数据
func ( store * StoreExtractor ) UpdateOperator ( isource IGet ) {
_ , err := store . db . Exec ( "update " + StreamerTable + " set operator = ?, error_msg = ? where uid = ?" , isource . Get ( "Operator" ) , isource . Get ( "ErrorMsg" ) , isource . Get ( "Uid" ) )
if err != nil {
panic ( err )
}
}
// UpdateStreamer Streamer表, 插入数据
2020-07-31 10:04:10 +00:00
func ( store * StoreExtractor ) UpdateStreamer ( streamer IGet ) {
_ , err := store . db . Exec ( "UPDATE " + StreamerTable + " SET user_name = ?, live_url = ?, channel = ?, latest_log_uid = ?, tags = ?, ext = ?, operator = ?, update_time = ?, update_interval = ? WHERE uid = ?;" ,
streamer . Get ( "UserName" ) , streamer . Get ( "LiveUrl" ) , streamer . Get ( "Channel" ) , streamer . Get ( "LatestLogUid" ) , streamer . Get ( "Tags" ) , streamer . Get ( "Ext" ) , streamer . Get ( "Operator" ) , streamer . Get ( "UpdateTime" ) , streamer . Get ( "UpdateInterval" ) , streamer . Get ( "Uid" ) )
2020-07-20 10:13:54 +00:00
if err != nil {
panic ( err )
}
2020-07-10 08:13:08 +00:00
}
2020-07-31 10:04:10 +00:00
// Update Streamer表, 更新指定的字段
func ( store * StoreExtractor ) Update ( streamer IGet , fieldvalues ... interface { } ) {
updateSQL := "UPDATE " + StreamerTable + " SET "
var values [ ] interface { }
for i := 0 ; i < len ( fieldvalues ) ; i += 2 {
field := fieldvalues [ i ]
values = append ( values , fieldvalues [ i + 1 ] )
updateSQL += field . ( string ) + " = ? "
}
updateSQL += "WHERE uid = ?"
values = append ( values , streamer . Get ( "Uid" ) )
_ , err := store . db . Exec ( updateSQL , values ... )
if err != nil {
panic ( err )
}
}
// InsertClog CollectLog表插入数据
func ( store * StoreExtractor ) InsertClog ( clog IGet ) int64 {
2020-07-17 10:21:38 +00:00
tx , err := store . db . Begin ( )
defer func ( ) {
if err := recover ( ) ; err != nil {
2020-07-21 07:05:56 +00:00
tx . Rollback ( )
log . Panic ( err )
2020-07-17 10:21:38 +00:00
}
} ( )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
result , err := tx . Exec ( "insert into " + CollectLogTable + "(streamer_uid, platform, user_id, is_live_streaming, is_error, followers, views, giver, gratuity, live_title, live_start_time, live_end_time, update_time, tags, ext, error_msg) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" ,
2020-07-31 10:04:10 +00:00
clog . Get ( "StreamerUid" ) , clog . Get ( "Platform" ) , clog . Get ( "UserId" ) , clog . Get ( "IsLiveStreaming" ) , clog . Get ( "IsError" ) , clog . Get ( "Followers" ) , clog . Get ( "Views" ) , clog . Get ( "Giver" ) , clog . Get ( "Gratuity" ) , clog . Get ( "LiveTitle" ) , clog . Get ( "LiveStartTime" ) , clog . Get ( "LiveEndTime" ) , clog . Get ( "UpdateTime" ) , clog . Get ( "Tags" ) , clog . Get ( "Ext" ) , clog . Get ( "ErrorMsg" ) ,
2020-07-10 10:31:17 +00:00
)
2020-07-17 10:21:38 +00:00
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
logUid , err := result . LastInsertId ( )
if err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-31 10:04:10 +00:00
_ , err = tx . Exec ( "update " + StreamerTable + " set latest_log_uid = ? where uid = ?" , logUid , clog . Get ( "StreamerUid" ) )
2020-07-17 10:21:38 +00:00
if err = tx . Commit ( ) ; err != nil {
2020-07-20 10:13:54 +00:00
panic ( err )
2020-07-17 10:21:38 +00:00
}
2020-07-21 07:05:56 +00:00
return logUid
2020-07-10 04:05:33 +00:00
}