224 lines
6.0 KiB
Go
224 lines
6.0 KiB
Go
package logdb
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/satori/go.uuid"
|
|
|
|
_ "github.com/go-sql-driver/mysql" // mysql驱动
|
|
yaml "gopkg.in/yaml.v2"
|
|
)
|
|
|
|
// LogDB 属性结构
|
|
type LogDB struct {
|
|
Charset string `yaml:"charset"`
|
|
DB string `yaml:"db"`
|
|
Hosts []string `yaml:"hosts"`
|
|
Password string `yaml:"password"`
|
|
Port string `yaml:"port"`
|
|
User string `yaml:"user"`
|
|
|
|
pid int
|
|
hostid int
|
|
nextCheck int64
|
|
checkLimit int64
|
|
|
|
driver *sql.DB
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// New 创建一个logdb的配置
|
|
func New(filename string) *LogDB {
|
|
logdb := LogDB{}
|
|
logdb.checkLimit = 300
|
|
logdb.pid = os.Getpid()
|
|
|
|
data, err := ioutil.ReadFile(filename)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
err = yaml.Unmarshal(data, &logdb)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
logdb.hostid = 0
|
|
|
|
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset))
|
|
if err != nil {
|
|
log.Println("connect", err)
|
|
} else {
|
|
logdb.driver = db
|
|
}
|
|
|
|
return &logdb
|
|
}
|
|
|
|
// Ping 是否Ping通数据库
|
|
func (logdb *LogDB) Ping() (result bool) {
|
|
log.Println("Ping")
|
|
logdb.mutex.Lock()
|
|
defer logdb.mutex.Unlock()
|
|
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
result = false
|
|
|
|
log.Println(err, logdb.Hosts[logdb.hostid], " is unconnect ")
|
|
hostlen := len(logdb.Hosts)
|
|
|
|
errorhid := logdb.hostid
|
|
|
|
for i := 0; i < hostlen; i++ {
|
|
|
|
curid := errorhid + 1 + i
|
|
if curid >= hostlen {
|
|
curid = curid - hostlen
|
|
}
|
|
curHost := logdb.Hosts[curid]
|
|
|
|
myurl := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=30s&charset=%s", logdb.User, logdb.Password, curHost, logdb.Port, logdb.DB, logdb.Charset)
|
|
db, err := sql.Open("mysql", myurl)
|
|
if err != nil {
|
|
log.Println(err, curHost, " is connect fail")
|
|
continue
|
|
}
|
|
|
|
if err := db.Ping(); err != nil {
|
|
log.Println(err, curHost, " is connect fail")
|
|
continue
|
|
}
|
|
|
|
logdb.driver = db
|
|
logdb.hostid = curid
|
|
result = true
|
|
}
|
|
}
|
|
}()
|
|
|
|
if err := logdb.driver.Ping(); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// ADInsert 插入数据
|
|
func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) {
|
|
logdb.mutex.Lock()
|
|
defer logdb.mutex.Unlock()
|
|
|
|
_, err := logdb.driver.Exec("insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ? ,?, ?, ?)", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl)
|
|
if err != nil {
|
|
log.Println(err)
|
|
log.Printf("for save ad sql: insert into log_spider (uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, error_msg, status, priority, ts_crawl) values(%s, %s, %s, %s, %d, %d, %s, %d, %s, %s, %s, %d, %d, %s)\n", uid, spider_id, device, platform, channel, media, area_cc, catch_account_id, section_id, response, "", status, priority, ts_crawl.Format("2006-01-02 15:04:05"))
|
|
}
|
|
}
|
|
|
|
type ADResonse struct {
|
|
UID string
|
|
Response string
|
|
}
|
|
|
|
// ADParserSelect adpase 根据自己的spider_id, 选择selcount条数据进行处理. 10- 100条最佳
|
|
func (logdb *LogDB) ADParserSelect(spider_id int, selcount int) []ADResonse {
|
|
puid, err := uuid.NewV4()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
logdb.adCheckRecover(spider_id, 5*time.Minute)
|
|
|
|
_, err = logdb.driver.Exec("update log_spider set status = 10000, parse_id = ? where spider_id = ? and status = 0 limit ?", puid.String(), spider_id, selcount)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return nil
|
|
}
|
|
|
|
rows, err := logdb.driver.Query("select uid, response from log_spider where spider_id = ? and parse_id = ? and status = 10000", spider_id, puid.String())
|
|
if err != nil {
|
|
log.Println(err)
|
|
return nil
|
|
}
|
|
|
|
var adresponse []ADResonse
|
|
|
|
var uid, response string
|
|
for rows.Next() {
|
|
rows.Scan(&uid, &response)
|
|
adresponse = append(adresponse, ADResonse{UID: uid, Response: response})
|
|
// log.Println(uid, response)
|
|
}
|
|
|
|
return adresponse
|
|
}
|
|
|
|
// adCheckRecover 处理恢复错误, 或者没处理完的Select 出来的数据, 5分钟以上最佳. 例子: intervalTime := time.Minute * 5
|
|
// spider_id 对应 spider_id
|
|
// intervalTime 每隔多少时间去检查一次
|
|
func (logdb *LogDB) adCheckRecover(spider_id int, intervalTime time.Duration) {
|
|
now := time.Now()
|
|
if now.Unix() > logdb.nextCheck {
|
|
logdb.nextCheck = now.Unix() + logdb.checkLimit
|
|
|
|
tsUpdate := now.Add(-intervalTime) // tsUpdate := now.Add(-time.Minute * 5)
|
|
_, err := logdb.driver.Exec("update log_spider set status = 0, error_msg = CONCAT(error_msg, 'Parser Timeout ') where status = 10000 and spider_id = ? and ts_update <= ?", spider_id, tsUpdate)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ADParserSuccess 解析成功后处理该条数据
|
|
func (logdb *LogDB) ADParserSuccess(uid string, successData string) {
|
|
logdb.mutex.Lock()
|
|
defer logdb.mutex.Unlock()
|
|
|
|
ext := make(map[string]string)
|
|
ext["success_data"] = successData
|
|
|
|
data, err := json.Marshal(&ext)
|
|
|
|
if err != nil || successData == "" {
|
|
_, err := logdb.driver.Exec("update log_spider set status = 200 where uid = ?", uid)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
} else {
|
|
_, err := logdb.driver.Exec("update log_spider set status = 200, ext = ? where uid = ?", string(data), uid)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Select 插入数据
|
|
func (logdb *LogDB) Select(query string, args ...interface{}) *sql.Rows {
|
|
logdb.mutex.Lock()
|
|
defer logdb.mutex.Unlock()
|
|
|
|
Rows, err := logdb.driver.Query(query, args...)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return nil
|
|
}
|
|
return Rows
|
|
}
|
|
|
|
// ADError 广告错误后更新
|
|
func (logdb *LogDB) ADError(uid, error_msg string) {
|
|
logdb.mutex.Lock()
|
|
defer logdb.mutex.Unlock()
|
|
|
|
_, err := logdb.driver.Exec("update log_spider set status = 1000, error_msg=? where uid =?;", error_msg, uid)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|