From 8a3158a3d5e1be81977e70caa73aaca0731a2ab4 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Wed, 19 Dec 2018 17:46:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E8=87=AA=E5=8A=A8=E5=88=87?= =?UTF-8?q?=E6=8D=A2mysql=E7=9A=84=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- logdb.go | 56 ++++++++++++++++++++++++++------------------------------ 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/logdb.go b/logdb.go index 7d23420..9765964 100644 --- a/logdb.go +++ b/logdb.go @@ -48,12 +48,20 @@ func New(filename string) *LogDB { } logdb.hostid = 0 - logdb.Connect() + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset)) + if err != nil { + log.Println("connect", err) + } else { + logdb.driver = db + } + + logdb.Ping() return &logdb } // Ping 是否Ping通数据库 func (logdb *LogDB) Ping() (result bool) { + log.Println("Ping") logdb.mutex.Lock() defer logdb.mutex.Unlock() @@ -63,20 +71,31 @@ func (logdb *LogDB) Ping() (result bool) { log.Println(err, logdb.Hosts[logdb.hostid], " is unconnect ") hostlen := len(logdb.Hosts) + + errorhid := logdb.hostid + for i := 0; i < hostlen; i++ { - db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[i], logdb.Port, logdb.DB, logdb.Charset)) + + curid := errorhid + 1 + i + if curid >= hostlen { + curid = curid - hostlen + } + curHost := logdb.Hosts[curid] + + myurl := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=30s&charset=%s", logdb.User, logdb.Password, curHost, logdb.Port, logdb.DB, logdb.Charset) + db, err := sql.Open("mysql", myurl) if err != nil { - log.Println(err, logdb.Hosts[i], " is connect fail") + log.Println(err, curHost, " is connect fail") continue } if err := db.Ping(); err != nil { - log.Println(err, logdb.Hosts[i], " is connect fail") + log.Println(err, curHost, " is connect fail") continue } logdb.driver = db - logdb.hostid = i + logdb.hostid = curid result = true } } @@ -89,26 +108,6 @@ func (logdb *LogDB) Ping() (result bool) { return true } -// Connect 重连 -func (logdb *LogDB) Connect() { - logdb.mutex.Lock() - defer logdb.mutex.Unlock() - - logdb.hostid++ - if logdb.hostid >= len(logdb.Hosts) { - logdb.hostid = 0 - } - db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?timeout=15s&charset=%s", logdb.User, logdb.Password, logdb.Hosts[logdb.hostid], logdb.Port, logdb.DB, logdb.Charset)) - if err != nil { - panic(err) - } - if logdb.driver != nil { - logdb.driver.Close() - } - logdb.driver = db - log.Println("connect is", logdb.Ping(), logdb) -} - // ADInsert 插入数据 func (logdb *LogDB) ADInsert(uid, device, platform, area_cc, section_id, response string, spider_id, channel, media, catch_account_id, status, priority int, ts_crawl time.Time) { logdb.mutex.Lock() @@ -131,7 +130,7 @@ func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { defer logdb.mutex.Unlock() pid := logdb.pid + 2000 - logdb.ADCheckRecover(spider_id) + logdb.adCheckRecover(spider_id) _, err := logdb.driver.Exec("update log_spider set status = ? where spider_id = ? and status = 0 limit 100", pid, spider_id) if err != nil { @@ -157,10 +156,7 @@ func (logdb *LogDB) ADParserSelect(spider_id int) []ADResonse { return adresponse } -func (logdb *LogDB) ADCheckRecover(spider_id int) { - logdb.mutex.Lock() - defer logdb.mutex.Unlock() - +func (logdb *LogDB) adCheckRecover(spider_id int) { now := time.Now() if now.Unix() > logdb.nextCheck { logdb.nextCheck = now.Unix() + logdb.checkLimit