package intimate import ( "database/sql" "errors" "log" "time" _ "github.com/go-sql-driver/mysql" ) // IGetSource 源接口结构 type IGetSource interface { GetUid() int64 // GetUrl() string // GetTargetType() string // GetSource() sql.NullString // GetExt() interface{} // GetUpdateTime() time.Time // GetOperator() int32 // GetErrorMsg() sql.NullString // } type IUpdateSource interface { IGetSource GetLastOperator() int32 SetExt(ext interface{}) // SetUpdateTime(ut time.Time) // SetOperator(operator int32) // SetErrorMsg(emsg sql.NullString) // } // OperatorFlag 标志 type OperatorFlag int32 const ( // OperatorOK 等待被处理 OperatorOK OperatorFlag = 100 // OperatorWait 等待被处理 OperatorWait OperatorFlag = 1000 // OperatorError 错误标志 OperatorError OperatorFlag = 10000 ) // SourceStore 储存 type SourceStore struct { table string db *sql.DB errorCount int errorLimit int } // NewSourceStore 创建一个存储实例 func NewSourceStore(table string) *SourceStore { db, err := sql.Open("mysql", InitConfig.Database.SourceURI) if err != nil { panic(err) } return &SourceStore{table: table, db: db} } func (store *SourceStore) errorAlarm(err error) { if err != nil { log.Println("store error: ", err) // 报警. 如果数据插入有问题 store.errorCount++ if store.errorCount >= store.errorLimit { // 数据库频繁操作初问题 报警, 减少没意义的请求 } } else { if store.errorCount > 0 { store.errorCount-- } } } // Insert 插入数据 func (store *SourceStore) Insert(isource IGetSource) { _, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) store.errorAlarm(err) } // Update 更新数据 func (store *SourceStore) Update(isource IUpdateSource) { _, err := store.db.Exec("update "+store.table+" set ext = ?, operator = ?, error_msg = ? where uid = ?", isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) store.errorAlarm(err) } // Restore 恢复Operator数据状态 func (store *SourceStore) Restore(isource IUpdateSource) { _, err := store.db.Exec("update "+store.table+" set operator = ? where uid = ?", isource.GetLastOperator(), isource.GetUid()) store.errorAlarm(err) } // Pop 弹出一条未处理的数据 func (store *SourceStore) Pop(targetType string, operators ...int32) (IUpdateSource, error) { tx, err := store.db.Begin() if err != nil { log.Println(err, targetType) return nil, err } var args = []interface{}{targetType} selectSQL := `select uid, url, target_type, source, ext, operator from ` + store.table + ` where target_type = ?` if len(operators) == 0 { selectSQL += " and operator = ?" args = append(args, 0) } else { for _, operator := range operators { selectSQL += " and operator = ?" args = append(args, operator) } } // log.Println(selectSQL + ` limit 1 for update`) row := tx.QueryRow(selectSQL+` limit 1 for update`, args...) defer func() { err := tx.Commit() if err != nil { log.Println(err) err = tx.Rollback() if err != nil { log.Println(err) } } }() if row != nil { s := &Source{} // uid, url, target_type, source, ext, operator err = row.Scan(&s.Uid, &s.Url, &s.TargetType, &s.Source, &s.Ext, &s.Operator) s.SetLastOperator(s.Operator) if err != nil { log.Println(err, targetType) _, err = tx.Exec("update "+store.table+" set error_msg = ?, operator = ? where uid = ?", OperatorError, s.Uid) if err != nil { log.Println(err) } return nil, err } _, err = tx.Exec("update "+store.table+" set operator = ? where uid = ?", OperatorWait, s.Uid) return s, nil } return nil, errors.New("TaskQueue is nil") } // NewExtractorStore 创建一个存储实例 func NewExtractorStore(table string) *SourceStore { db, err := sql.Open("mysql", InitConfig.Database.ExtractorURI) if err != nil { panic(err) } return &SourceStore{table: table, db: db} }