diff --git a/logdb.go b/logdb.go index 9bd5937..cfa4358 100644 --- a/logdb.go +++ b/logdb.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/satori/go.uuid" + _ "github.com/go-sql-driver/mysql" // mysql驱动 yaml "gopkg.in/yaml.v2" ) @@ -124,17 +126,22 @@ type ADResonse struct { Response string } -func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { - pid := logdb.pid + 2000 - logdb.adCheckRecover(spider_id) +// ADParserSelect adpase 根据自己的spider_id, 选择selcount条数据进行处理. 10- 100条最佳 +func (logdb *LogDB) ADParserSelect(spider_id int, selcount int) []ADResonse { + puid, err := uuid.NewV4() + if err != nil { + panic(err) + } - _, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id) + logdb.adCheckRecover(spider_id, 5*time.Minute) + + _, err = logdb.driver.Exec("update log_spider set status = 10000, parse_id = ? where spider_id = ? and status = 0 limit ?", puid.String(), spider_id, selcount) if err != nil { log.Println(err) return nil } - rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and status = ?", spider_id, pid) + rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and parse_id = ? and status = 10000", spider_id, puid.String()) if err != nil { log.Println(err) return nil @@ -152,19 +159,23 @@ func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { return adresponse } -func (logdb *LogDB) adCheckRecover(spider_id int) { +// adCheckRecover 处理恢复错误, 或者没处理完的Select 出来的数据, 5分钟以上最佳. 例子: intervalTime := time.Minute * 5 +// spider_id 对应 spider_id +// intervalTime 每隔多少时间去检查一次 +func (logdb *LogDB) adCheckRecover(spider_id int, intervalTime time.Duration) { now := time.Now() if now.Unix() > logdb.nextCheck { logdb.nextCheck = now.Unix() + logdb.checkLimit - tsUpdate := now.Add(-time.Minute * 5) - _, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status > 2000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate) + tsUpdate := now.Add(-intervalTime) // tsUpdate := now.Add(-time.Minute * 5) + _, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status = 10000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate) if err != nil { log.Println(err) } } } +// ADParserSuccess 解析成功后处理该条数据 func (logdb *LogDB) ADParserSuccess(uid string, successData string) { logdb.mutex.Lock() defer logdb.mutex.Unlock()