From a027861f5a87e4c8e2f9d483ba07df3e31c9d8ae Mon Sep 17 00:00:00 2001 From: eson Date: Thu, 9 Jul 2020 11:38:51 +0800 Subject: [PATCH] =?UTF-8?q?add:=20=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AEext?= =?UTF-8?q?=E5=8F=82=E8=80=83=E4=BE=8B=E5=AD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- source.go | 10 +-- store.go | 36 +++++++++-- tasks/openrec/openrec_task1/.gitignore | 2 +- tasks/openrec/openrec_task1/task_openrec.go | 4 +- tasks/openrec/openrec_task2/.gitignore | 2 +- tasks/openrec/openrec_task2/task_openrec.go | 70 +++++++++++++++------ 6 files changed, 90 insertions(+), 34 deletions(-) diff --git a/source.go b/source.go index 0452ffe..2939c22 100644 --- a/source.go +++ b/source.go @@ -7,7 +7,7 @@ import ( // Source 的结构体 type Source struct { - Uid sql.NullInt64 // + Uid int64 // Url string // TargetType string // Source sql.NullString // @@ -87,12 +87,12 @@ func (so *Source) SetUrl(Url string) { so.Url = Url } -// GetUid Get return Uid sql.NullInt64 -func (so *Source) GetUid() sql.NullInt64 { +// GetUid Get return Uid int64 +func (so *Source) GetUid() int64 { return so.Uid } -// SetUid Set Uid sql.NullInt64 -func (so *Source) SetUid(Uid sql.NullInt64) { +// SetUid Set Uid int64 +func (so *Source) SetUid(Uid int64) { so.Uid = Uid } diff --git a/store.go b/store.go index 505b1f5..cab54e6 100644 --- a/store.go +++ b/store.go @@ -11,6 +11,7 @@ import ( // IGetSource 源接口结构 type IGetSource interface { + GetUid() int64 // GetUrl() string // GetTargetType() string // GetSource() sql.NullString // @@ -33,6 +34,8 @@ type IUpdateSource interface { type OperatorFlag int32 const ( + // OperatorOK 等待被处理 + OperatorOK OperatorFlag = 100 // OperatorWait 等待被处理 OperatorWait OperatorFlag = 1000 // OperatorError 错误标志 @@ -41,8 +44,10 @@ const ( // Store 储存 type Store struct { - table string - db *sql.DB + table string + db *sql.DB + errorCount int + errorLimit int } // NewStore 创建一个存储实例 @@ -54,14 +59,33 @@ func NewStore(table string) *Store { return &Store{table: table, db: db} } -// Save 储存数据 -func (store *Store) Save(isource IGetSource) { - _, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) +func (store *Store) errorAlarm(err error) { if err != nil { - log.Fatalln(err) + log.Println("store error: ", err) + // 报警. 如果数据插入有问题 + store.errorCount++ + if store.errorCount >= store.errorLimit { + // 数据库频繁操作初问题 报警, 减少没意义的请求 + } + } else { + if store.errorCount > 0 { + store.errorCount-- + } } } +// Insert 储存数据 +func (store *Store) Insert(isource IGetSource) { + _, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg()) + store.errorAlarm(err) +} + +// Update 储存数据 +func (store *Store) Update(isource IUpdateSource) { + _, err := store.db.Exec("update "+store.table+" set ext = ?, operator = ?, error_msg = ? where uid = ?", isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid()) + store.errorAlarm(err) +} + // Pop 储存数据 func (store *Store) Pop(targetType string, operators ...int32) (IUpdateSource, error) { diff --git a/tasks/openrec/openrec_task1/.gitignore b/tasks/openrec/openrec_task1/.gitignore index 8117159..42d0e6c 100644 --- a/tasks/openrec/openrec_task1/.gitignore +++ b/tasks/openrec/openrec_task1/.gitignore @@ -1 +1 @@ -task1 \ No newline at end of file +openrec_task1 \ No newline at end of file diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index 290268a..fa6ec20 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -72,7 +72,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { data.SetErrorMsg(errorMsg) data.SetOperator(10000) - store.Save(data) + store.Insert(data) return } @@ -81,7 +81,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) { wf.SetQuery(querys) data.SetErrorMsg(errorMsg) - store.Save(data) + store.Insert(data) time.Sleep(time.Second * 2) } diff --git a/tasks/openrec/openrec_task2/.gitignore b/tasks/openrec/openrec_task2/.gitignore index 7508886..5d0489a 100644 --- a/tasks/openrec/openrec_task2/.gitignore +++ b/tasks/openrec/openrec_task2/.gitignore @@ -1 +1 @@ -task2 \ No newline at end of file +openrec_task2 \ No newline at end of file diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go index 59c279f..a8a372c 100644 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ b/tasks/openrec/openrec_task2/task_openrec.go @@ -1,8 +1,11 @@ package main import ( + "database/sql" + "encoding/json" "intimate" "log" + "time" "github.com/474420502/hunter" "github.com/tidwall/gjson" @@ -27,35 +30,64 @@ type OpenrecExtratorRanking struct { // Execute 执行方法 func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) { - source, err := store.Pop(targetTypeRanking) - if err != nil { - log.Println(err) - return - } + for { + + source, err := store.Pop(targetTypeRanking) + if err != nil { + log.Println(err) + return + } + + if source == nil { + return + } - if source != nil { result := gjson.Parse(source.GetSource().String) if result.IsArray() { for _, User := range result.Array() { userid := User.Get("channel.id").String() + ext := make(map[string]interface{}) - openrecUser := &OpenrecUser{} - openrecUser.PreGetUrl = hunter.PreGetUrl("https://www.openrec.tv/user/" + userid + "/supporters") + wf := cxt.Session().Get("https://www.openrec.tv/user/" + userid) + resp, err := wf.Execute() + source.SetUpdateTime(time.Now()) - cxt.AddParentTask(openrecUser) + if err != nil { + log.Println(err) + + source.SetOperator(int32(intimate.OperatorError)) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + continue + } + + ext["user"] = string(resp.Content()) + + wf = cxt.Session().Get("https://www.openrec.tv/user/" + userid + "/supporters") + resp, err = wf.Execute() + if err != nil { + log.Println(err) + source.SetOperator(int32(intimate.OperatorError)) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + continue + } + ext["user_supporters"] = string(resp.Content()) + + extJsonBytes, err := json.Marshal(ext) + if err != nil { + log.Println(err) + source.SetOperator(int32(intimate.OperatorError)) + source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true}) + continue + } + + source.SetOperator(int32(intimate.OperatorOK)) + source.SetExt(string(extJsonBytes)) + + store.Update(source) } } else { log.Println("array error:", result.Str) } + } } - -// OpenrecUser 获取用户信息 -type OpenrecUser struct { - hunter.PreGetUrl -} - -// Execute 执行方法 -func (oer *OpenrecUser) Execute(cxt *hunter.TaskContext) { - -}