add: 更新数据ext参考例子

This commit is contained in:
eson 2020-07-09 11:38:51 +08:00
parent 0a2a134511
commit a027861f5a
6 changed files with 90 additions and 34 deletions

View File

@ -7,7 +7,7 @@ import (
// Source 的结构体
type Source struct {
Uid sql.NullInt64 //
Uid int64 //
Url string //
TargetType string //
Source sql.NullString //
@ -87,12 +87,12 @@ func (so *Source) SetUrl(Url string) {
so.Url = Url
}
// GetUid Get return Uid sql.NullInt64
func (so *Source) GetUid() sql.NullInt64 {
// GetUid Get return Uid int64
func (so *Source) GetUid() int64 {
return so.Uid
}
// SetUid Set Uid sql.NullInt64
func (so *Source) SetUid(Uid sql.NullInt64) {
// SetUid Set Uid int64
func (so *Source) SetUid(Uid int64) {
so.Uid = Uid
}

View File

@ -11,6 +11,7 @@ import (
// IGetSource 源接口结构
type IGetSource interface {
GetUid() int64 //
GetUrl() string //
GetTargetType() string //
GetSource() sql.NullString //
@ -33,6 +34,8 @@ type IUpdateSource interface {
type OperatorFlag int32
const (
// OperatorOK 等待被处理
OperatorOK OperatorFlag = 100
// OperatorWait 等待被处理
OperatorWait OperatorFlag = 1000
// OperatorError 错误标志
@ -41,8 +44,10 @@ const (
// Store 储存
type Store struct {
table string
db *sql.DB
table string
db *sql.DB
errorCount int
errorLimit int
}
// NewStore 创建一个存储实例
@ -54,14 +59,33 @@ func NewStore(table string) *Store {
return &Store{table: table, db: db}
}
// Save 储存数据
func (store *Store) Save(isource IGetSource) {
_, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg())
func (store *Store) errorAlarm(err error) {
if err != nil {
log.Fatalln(err)
log.Println("store error: ", err)
// 报警. 如果数据插入有问题
store.errorCount++
if store.errorCount >= store.errorLimit {
// 数据库频繁操作初问题 报警, 减少没意义的请求
}
} else {
if store.errorCount > 0 {
store.errorCount--
}
}
}
// Insert 储存数据
func (store *Store) Insert(isource IGetSource) {
_, err := store.db.Exec("insert into `source_openrec`(url, target_type, source, ext, operator, error_msg) values(?,?,?,?,?,?)", isource.GetUrl(), isource.GetTargetType(), isource.GetSource(), isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg())
store.errorAlarm(err)
}
// Update 储存数据
func (store *Store) Update(isource IUpdateSource) {
_, err := store.db.Exec("update "+store.table+" set ext = ?, operator = ?, error_msg = ? where uid = ?", isource.GetExt(), isource.GetOperator(), isource.GetErrorMsg(), isource.GetUid())
store.errorAlarm(err)
}
// Pop 储存数据
func (store *Store) Pop(targetType string, operators ...int32) (IUpdateSource, error) {

View File

@ -1 +1 @@
task1
openrec_task1

View File

@ -72,7 +72,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) {
data.SetErrorMsg(errorMsg)
data.SetOperator(10000)
store.Save(data)
store.Insert(data)
return
}
@ -81,7 +81,7 @@ func (or *OpenrecRanking) Execute(cxt *hunter.TaskContext) {
wf.SetQuery(querys)
data.SetErrorMsg(errorMsg)
store.Save(data)
store.Insert(data)
time.Sleep(time.Second * 2)
}

View File

@ -1 +1 @@
task2
openrec_task2

View File

@ -1,8 +1,11 @@
package main
import (
"database/sql"
"encoding/json"
"intimate"
"log"
"time"
"github.com/474420502/hunter"
"github.com/tidwall/gjson"
@ -27,35 +30,64 @@ type OpenrecExtratorRanking struct {
// Execute 执行方法
func (oer *OpenrecExtratorRanking) Execute(cxt *hunter.TaskContext) {
source, err := store.Pop(targetTypeRanking)
if err != nil {
log.Println(err)
return
}
for {
source, err := store.Pop(targetTypeRanking)
if err != nil {
log.Println(err)
return
}
if source == nil {
return
}
if source != nil {
result := gjson.Parse(source.GetSource().String)
if result.IsArray() {
for _, User := range result.Array() {
userid := User.Get("channel.id").String()
ext := make(map[string]interface{})
openrecUser := &OpenrecUser{}
openrecUser.PreGetUrl = hunter.PreGetUrl("https://www.openrec.tv/user/" + userid + "/supporters")
wf := cxt.Session().Get("https://www.openrec.tv/user/" + userid)
resp, err := wf.Execute()
source.SetUpdateTime(time.Now())
cxt.AddParentTask(openrecUser)
if err != nil {
log.Println(err)
source.SetOperator(int32(intimate.OperatorError))
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
continue
}
ext["user"] = string(resp.Content())
wf = cxt.Session().Get("https://www.openrec.tv/user/" + userid + "/supporters")
resp, err = wf.Execute()
if err != nil {
log.Println(err)
source.SetOperator(int32(intimate.OperatorError))
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
continue
}
ext["user_supporters"] = string(resp.Content())
extJsonBytes, err := json.Marshal(ext)
if err != nil {
log.Println(err)
source.SetOperator(int32(intimate.OperatorError))
source.SetErrorMsg(sql.NullString{String: err.Error(), Valid: true})
continue
}
source.SetOperator(int32(intimate.OperatorOK))
source.SetExt(string(extJsonBytes))
store.Update(source)
}
} else {
log.Println("array error:", result.Str)
}
}
}
// OpenrecUser 获取用户信息
type OpenrecUser struct {
hunter.PreGetUrl
}
// Execute 执行方法
func (oer *OpenrecUser) Execute(cxt *hunter.TaskContext) {
}