添加parse_id的处理, 以绝对区分唯一处理过程.

This commit is contained in:
huangsimin 2018-12-24 14:16:16 +08:00
parent 2c996115f0
commit 06073e692c

View File

@ -10,6 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/satori/go.uuid"
_ "github.com/go-sql-driver/mysql" // mysql驱动 _ "github.com/go-sql-driver/mysql" // mysql驱动
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
) )
@ -124,17 +126,22 @@ type ADResonse struct {
Response string Response string
} }
func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { // ADParserSelect adpase 根据自己的spider_id, 选择selcount条数据进行处理. 10- 100条最佳
pid := logdb.pid + 2000 func (logdb *LogDB) ADParserSelect(spider_id int, selcount int) []ADResonse {
logdb.adCheckRecover(spider_id) puid, err := uuid.NewV4()
if err != nil {
panic(err)
}
_, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id) logdb.adCheckRecover(spider_id, 5*time.Minute)
_, err = logdb.driver.Exec("update log_spider set status = 10000, parse_id = ? where spider_id = ? and status = 0 limit ?", puid.String(), spider_id, selcount)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return nil return nil
} }
rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and status = ?", spider_id, pid) rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and parse_id = ? and status = 10000", spider_id, puid.String())
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return nil return nil
@ -152,19 +159,23 @@ func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse {
return adresponse return adresponse
} }
func (logdb *LogDB) adCheckRecover(spider_id int) { // adCheckRecover 处理恢复错误, 或者没处理完的Select 出来的数据, 5分钟以上最佳. 例子: intervalTime := time.Minute * 5
// spider_id 对应 spider_id
// intervalTime 每隔多少时间去检查一次
func (logdb *LogDB) adCheckRecover(spider_id int, intervalTime time.Duration) {
now := time.Now() now := time.Now()
if now.Unix() > logdb.nextCheck { if now.Unix() > logdb.nextCheck {
logdb.nextCheck = now.Unix() + logdb.checkLimit logdb.nextCheck = now.Unix() + logdb.checkLimit
tsUpdate := now.Add(-time.Minute * 5) tsUpdate := now.Add(-intervalTime) // tsUpdate := now.Add(-time.Minute * 5)
_, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status > 2000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate) _, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status = 10000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
} }
} }
// ADParserSuccess 解析成功后处理该条数据
func (logdb *LogDB) ADParserSuccess(uid string, successData string) { func (logdb *LogDB) ADParserSuccess(uid string, successData string) {
logdb.mutex.Lock() logdb.mutex.Lock()
defer logdb.mutex.Unlock() defer logdb.mutex.Unlock()