完善自动切换mysql的模块

This commit is contained in:
huangsimin 2018-12-19 17:46:11 +08:00
parent c9d142f8cf
commit 8a3158a3d5

View File

@ -48,12 +48,20 @@ func New(filename string) *LogDB {
}
logdb.hostid = 0
logdb.Connect()
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset))
if err != nil {
log.Println("connect", err)
} else {
logdb.driver = db
}
logdb.Ping()
return &logdb
}
// Ping 是否Ping通数据库
func (logdb *LogDB) Ping() (result bool) {
log.Println("Ping")
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
@ -63,20 +71,31 @@ func (logdb *LogDB) Ping() (result bool) {
log.Println(err, logdb.Hosts[logdb.hostid], " is unconnect ")
hostlen := len(logdb.Hosts)
errorhid := logdb.hostid
for i := 0; i < hostlen; i++ {
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[i], logdb.Port, logdb.DB, logdb.Charset))
curid := errorhid + 1 + i
if curid >= hostlen {
curid = curid - hostlen
}
curHost := logdb.Hosts[curid]
myurl := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=30s&charset=%s", logdb.User, logdb.Password, curHost, logdb.Port, logdb.DB, logdb.Charset)
db, err := sql.Open("mysql", myurl)
if err != nil {
log.Println(err, logdb.Hosts[i], " is connect fail")
log.Println(err, curHost, " is connect fail")
continue
}
if err := db.Ping(); err != nil {
log.Println(err, logdb.Hosts[i], " is connect fail")
log.Println(err, curHost, " is connect fail")
continue
}
logdb.driver = db
logdb.hostid = i
logdb.hostid = curid
result = true
}
}
@ -89,26 +108,6 @@ func (logdb *LogDB) Ping() (result bool) {
return true
}
// Connect 重连
func (logdb *LogDB) Connect() {
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
logdb.hostid++
if logdb.hostid >= len(logdb.Hosts) {
logdb.hostid = 0
}
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset))
if err != nil {
panic(err)
}
if logdb.driver != nil {
logdb.driver.Close()
}
logdb.driver = db
log.Println("connect is", logdb.Ping(), logdb)
}
// ADInsert 插入数据
func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) {
logdb.mutex.Lock()
@ -131,7 +130,7 @@ func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse {
defer logdb.mutex.Unlock()
pid := logdb.pid + 2000
logdb.ADCheckRecover(spider_id)
logdb.adCheckRecover(spider_id)
_, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id)
if err != nil {
@ -157,10 +156,7 @@ func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse {
return adresponse
}
func (logdb *LogDB) ADCheckRecover(spider_id int) {
logdb.mutex.Lock()
defer logdb.mutex.Unlock()
func (logdb *LogDB) adCheckRecover(spider_id int) {
now := time.Now()
if now.Unix() > logdb.nextCheck {
logdb.nextCheck = now.Unix() + logdb.checkLimit