diff --git a/autostore.go b/autostore.go index 4f061be..e3c6976 100644 --- a/autostore.go +++ b/autostore.go @@ -28,6 +28,7 @@ type Store struct { db *sql.DB } +// Table 表 type Table struct { store *Store name string @@ -38,8 +39,6 @@ type Table struct { insertsql string } -// const updatesql = "UPDATE %s SET %s WHERE %s = ?" - func NewStore(uri string) *Store { db, err := sql.Open("mysql", uri) if err != nil { @@ -56,19 +55,26 @@ func (store *Store) Table(name string) *Table { table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)` table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?` - // table.selectsql = `FROM ` + table.name + `WHERE operator` table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s ` return table } // Queue mysql 队列结构 type Queue struct { - table *Table - obj reflect.Type - selected string - condition string - uidname string - uididx int + table *Table + obj reflect.Type + fieldIndex []int + selected string + + cond CondWhere + + uidname string + uididx int +} + +type CondWhere struct { + Condition string + CondArgs []interface{} } // OperatorType 字典Operator 标志位的类型 @@ -83,12 +89,21 @@ const ( OpERROR OperatorType = "10000" ) +// ConditionDefault 默认的条件 +func ConditionDefault(platform Platform) CondWhere { + return CondWhere{ + Condition: "platform = ? and operator = 0 and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval", + CondArgs: []interface{}{string(platform)}, + } +} + // Queue 根据Table生成一个队列. 处理结构. 每次弹出一个 obj 是要处理的结构体 自定义的whereCondition条件 -func (t *Table) Queue(obj interface{}, whereCondition string) *Queue { +func (t *Table) Queue(obj interface{}, whereCondition CondWhere) *Queue { q := &Queue{} - q.condition = whereCondition + q.cond = whereCondition q.obj = reflect.TypeOf(obj) q.table = t + q.fieldIndex = []int{} // select 需要配对字段变量的对应index位置 for i := 0; i < q.obj.NumField(); i++ { field := q.obj.Field(i) @@ -98,6 +113,7 @@ func (t *Table) Queue(obj interface{}, whereCondition string) *Queue { q.uididx = i q.uidname = fname } + q.fieldIndex = append(q.fieldIndex, i) } } @@ -123,14 +139,14 @@ func (queue *Queue) Pop() (result interface{}, err error) { } }() - selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update" - rows, err := tx.Query(selectsql) + selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.cond.Condition + " limit 1 for update" + rows, err := tx.Query(selectsql, queue.cond.CondArgs...) if err != nil { return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) } - var fields = make([]interface{}, queue.obj.NumField()) + var fields = make([]interface{}, len(queue.fieldIndex)) for i := range fields { var iv interface{} fields[i] = &iv @@ -159,15 +175,15 @@ func (queue *Queue) Pop() (result interface{}, err error) { } obj := reflect.New(queue.obj).Elem() - for i := 0; i < obj.NumField(); i++ { - field := obj.Field(i) + for i, idx := range queue.fieldIndex { + field := obj.Field(idx) convert(*fields[i].(*interface{}), field, columntypes[i]) } return obj.Addr().Interface(), err } -// Insert nil 不插入. 不支持嵌套. +// Insert nil 不插入. 不支持嵌套. 必须是Ptr类型 func (t *Table) Insert(obj interface{}) error { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) @@ -210,6 +226,52 @@ func (t *Table) Insert(obj interface{}) error { return err } +// InsertRetAutoID nil 不插入. 不支持嵌套. 并返回auto uid +func (t *Table) InsertRetAutoID(obj interface{}) (int64, error) { + ov := reflect.ValueOf(obj).Elem() + ot := reflect.TypeOf(obj) + + fieldsql := "" + argssql := "" + + var args []interface{} + for i := 0; i < ov.NumField(); i++ { + field := ov.Field(i) + ftype := ot.Elem().Field(i) + + if fname, ok := ftype.Tag.Lookup("field"); ok { + if flag, ok := ftype.Tag.Lookup("uid"); ok { + if flag == "auto" { + continue + } + } + + k := ftype.Type.Kind() + if k == reflect.Ptr || k == reflect.Interface { + if !field.IsNil() { + felem := field.Elem() + args = append(args, felem.Interface()) + fieldsql += fname + "," + argssql += "?," + } + } else { + args = append(args, field.Interface()) + fieldsql += fname + "," + argssql += "?," + } + + } + + } + + ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1]) + result, err := t.store.db.Exec(ssql, args...) + if err != nil { + return 0, err + } + return result.LastInsertId() +} + // Update 结构体更新 func (t *Table) Update(obj interface{}) error { @@ -261,7 +323,40 @@ func (t *Table) Update(obj interface{}) error { return err } +// UpdateError 更新错误数据 +func (t *Table) UpdateError(obj interface{}, err error) { + + ov := reflect.ValueOf(obj).Elem() + ot := reflect.TypeOf(obj) + + var uidname string + var uidvalue interface{} + + for i := 0; i < ov.NumField(); i++ { + field := ov.Field(i) + ftype := ot.Elem().Field(i) + + if fname, ok := ftype.Tag.Lookup("field"); ok { + if _, ok := ftype.Tag.Lookup("uid"); ok { + if uidvalue != nil { + panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname)) + } + uidname = fname + uidvalue = field.Interface() + break + } + } + } + + _, dberr := t.store.db.Exec("update "+t.name+" set operator = ?, error_msg = ? where ? = ?", 10000, sql.NullString{String: err.Error(), Valid: true}, uidname, uidvalue) + if dberr != nil { + // email tell owner to deal with + panic(err) + } +} + func assign(field reflect.Value, src interface{}) (bool, error) { + switch field.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: s := asString(src) diff --git a/autostore_test.go b/autostore_test.go index d0aaae5..e5a452b 100644 --- a/autostore_test.go +++ b/autostore_test.go @@ -13,7 +13,7 @@ func estAutoStore(t *testing.T) { uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" store := NewStore(uri) - queue := store.Table("streamer").Queue(TSreamer{}, "operator = 0") + queue := store.Table("streamer").Queue(TSreamer{}, CondWhere{Condition: "operator = 0"}) re, _ := queue.Pop() pstreamer := re.(*TSreamer) diff --git a/extractor/openrec_extractor/openrec_extractor.go b/extractor/openrec_extractor/openrec_extractor.go index b0c37d9..245c961 100644 --- a/extractor/openrec_extractor/openrec_extractor.go +++ b/extractor/openrec_extractor/openrec_extractor.go @@ -9,12 +9,11 @@ import ( "time" "github.com/474420502/extractor" + "github.com/474420502/gcurl" + "github.com/474420502/requests" "github.com/tidwall/gjson" ) -var estore = intimate.NewStoreExtractor() -var sstore = intimate.NewStoreSource(string(intimate.STOpenrec)) - //UserInfo 提取信息的结构体 type UserInfo struct { UserName string `exp:"//p[ contains(@class, 'c-global__user__profile__list__name__text')]"` @@ -34,135 +33,230 @@ type UserLive struct { func Execute() { ps := intimate.NewPerfectShutdown() + ses := requests.NewSession() + squeue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Popenrec)) var lasterr error = nil for !ps.IsClose() { - var err error + istreamer, err := squeue.Pop() - source, err := sstore.Pop(intimate.TOpenrecUser, 0) - if err != nil { + // streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 + + if istreamer == nil || err != nil { if err != lasterr { log.Println(err, lasterr) lasterr = err } - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 2) continue } - lasterr = nil + streamer := istreamer.(*intimate.Streamer) - sdata := source.Ext.([]byte) - datamap := gjson.ParseBytes(sdata).Map() + userId := *streamer.UserId - source.Operator = int32(intimate.OperatorError) - userId := datamap["var_user_id"].String() + var updateUrl map[string]string + err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url + if err != nil { + log.Println(err) + continue + } + // Check Userid - streamer := &intimate.Streamer{} - streamer.UserId = userId - // streamer.Platform = intimate.Popenrec 不需要更新字段 + userUrl := updateUrl["user"] + log.Println(userUrl) + tp := ses.Get(userUrl) // 获取user url页面数据 + resp, err := tp.Execute() + streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} - htmlUser := datamap["html_user"] - - userEtor := extractor.ExtractHtmlString(htmlUser.String()) - ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo) - - htmlLive := datamap["html_live"] - - liveEtor := extractor.ExtractHtmlString(htmlLive.String()) - ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive) - - jsonSupporters := datamap["json_supporters"] - clog := &intimate.CollectLog{} - - if ok1 { - clog.Followers = sql.NullInt64{Int64: ui.Followers, Valid: true} - clog.Views = sql.NullInt64{Int64: ui.Views, Valid: true} - if ui.Views != 0 { - clog.IsLiveStreaming = true - } - streamer.UserName = sql.NullString{String: ui.UserName, Valid: true} - - giverjson := jsonSupporters - var givers []interface{} - var gratuity int64 = 0 - - for _, v := range giverjson.Array() { - giverSource := gjson.Parse(v.String()) - for _, item := range giverSource.Get("data.items").Array() { - givers = append(givers, item.Map()) - gratuity += item.Get("total_yells").Int() - } - } - - giversbytes, err := json.Marshal(givers) - if err != nil { - log.Println(err) - clog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} - } else { - clog.Giver = giversbytes - } - - clog.Gratuity = sql.NullInt64{Int64: gratuity, Valid: true} - } else { - log.Println("UserInfo may be not exists") - estore.UpdateError(streamer, errors.New("UserInfo may be not exists")) + if err != nil { + log.Println(err) + intimate.TStreamer.UpdateError(streamer, err) continue } - //log.Println(ul) - if ok2 { - clog.LiveTitle = sql.NullString{String: ul.Title, Valid: true} + cookies := ses.GetCookies(tp.GetParsedURL()) - startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local) - if err != nil { - log.Println(err) - } else { - clog.LiveStartTime = sql.NullTime{Time: startTime.Local(), Valid: true} - duration, err := intimate.ParseDuration(ul.LiveEndTime) - if err != nil { - log.Println(err) - } else { - endTime := startTime.Add(duration) - clog.LiveStartTime = sql.NullTime{Time: endTime.Local(), Valid: true} + scurl := updateUrl["supporters"] //获取打赏者的数据 + curl := gcurl.Parse(scurl) + supportersSession := curl.CreateSession() + + temporary := curl.CreateTemporary(supportersSession) + supportersSession.SetCookies(temporary.GetParsedURL(), cookies) + var supporters []string + for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 + + supportersQuery := temporary.GetQuery() + + for _, cookie := range cookies { + if cookie.Name == "uuid" { + supportersQuery.Set("Uuid", cookie.Value) + continue + } + + if cookie.Name == "token" { + supportersQuery.Set("Token", cookie.Value) + continue + } + + if cookie.Name == "random" { + supportersQuery.Set("Random", cookie.Value) + continue } } - if tags, err := json.Marshal(ul.Tags); err == nil { - clog.Tags = tags - } else { - log.Println("json error", ul.Tags, clog.Tags) + supportersQuery.Set("identify_id", userId) + temporary.SetQuery(supportersQuery) + + resp, err := temporary.Execute() + if err != nil { + log.Println(err) } + supporterjson := gjson.ParseBytes(resp.Content()) + supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 + if supporterdata.Type == gjson.Null { + break + } + supporters = append(supporters, string(resp.Content())) + + temporary.QueryParam("page_number").IntAdd(1) } - streamer.Uid = source.StreamerId.Int64 - streamer.UpdateTime = source.UpdateTime - if clog.Tags != nil { - streamer.Tags = clog.Tags + // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) + // ext := make(map[string]interface{}) + + jsonSupporters := supporters + htmlUser := string(resp.Content()) + + liveUrl := updateUrl["live"] + tp = ses.Get(liveUrl) + resp, err = tp.Execute() + if err != nil { + log.Println(err) + intimate.TStreamer.UpdateError(streamer, err) + continue } - clog.Platform = intimate.Popenrec - clog.UserId = userId - clog.UpdateTime = source.UpdateTime - clog.StreamerUid = streamer.Uid + htmlLive := string(resp.Content()) + // ext["var_user_id"] = userId - logUid := estore.InsertClog(clog) - - LiveUrl := "https://www.openrec.tv/live/" + userId - - streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true} - streamer.LatestLogUid = logUid - // streamer.Operator = 0 - - log.Println(streamer.UserId) - estore.Update(streamer, - "user_name", streamer.UserName, - "user_id", streamer.UserId, - "live_url", streamer.LiveUrl, - "latest_log_uid", streamer.LatestLogUid, - "update_time", streamer.UpdateTime, - "tags", streamer.Tags, - ) - - source.Operator = int32(intimate.OperatorExtractorOK) - sstore.UpdateOperator(source) + // streamer.Platform = intimate.Popenrec + streamer.UpdateInterval = 120 + streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} + streamer.Operator = 0 + Extractor(streamer, userId, htmlUser, htmlLive, jsonSupporters) } } + +func Extractor(streamer *intimate.Streamer, userId string, htmlUser, htmlLive string, jsonSupporters []string) { + + // sdata := source.Ext.([]byte) + // datamap := gjson.ParseBytes(sdata).Map() + + // userId := datamap["var_user_id"].String() + + // streamer := &intimate.Streamer{} + // streamer.UserId = &userId + // streamer.Platform = intimate.Popenrec 不需要更新字段 + + // htmlUser := datamap["html_user"] + + userEtor := extractor.ExtractHtmlString(htmlUser) + ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo) + + // htmlLive := datamap["html_live"] + + liveEtor := extractor.ExtractHtmlString(htmlLive) + ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive) + + // jsonSupporters := datamap["json_supporters"] + clog := &intimate.CollectLog{} + + if ok1 { + clog.Followers = &sql.NullInt64{Int64: ui.Followers, Valid: true} + clog.Views = &sql.NullInt64{Int64: ui.Views, Valid: true} + if ui.Views != 0 { + clog.IsLiveStreaming = true + } + streamer.UserName = &sql.NullString{String: ui.UserName, Valid: true} + + // giverjson := jsonSupporters + var givers []interface{} + var gratuity int64 = 0 + + for _, v := range jsonSupporters { + giverSource := gjson.Parse(v) + for _, item := range giverSource.Get("data.items").Array() { + givers = append(givers, item.Map()) + gratuity += item.Get("total_yells").Int() + } + } + + giversbytes, err := json.Marshal(givers) + if err != nil { + log.Println(err) + clog.ErrorMsg = &sql.NullString{String: err.Error(), Valid: true} + } else { + clog.Giver = giversbytes + } + + clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true} + } else { + log.Println("UserInfo may be not exists") + intimate.TStreamer.UpdateError(streamer, errors.New("UserInfo may be not exists")) + return + } + + //log.Println(ul) + if ok2 { + clog.LiveTitle = &sql.NullString{String: ul.Title, Valid: true} + + startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local) + if err != nil { + log.Println(err) + } else { + clog.LiveStartTime = &sql.NullTime{Time: startTime.Local(), Valid: true} + duration, err := intimate.ParseDuration(ul.LiveEndTime) + if err != nil { + log.Println(err) + } else { + endTime := startTime.Add(duration) + clog.LiveEndTime = &sql.NullTime{Time: endTime.Local(), Valid: true} + } + } + + if tags, err := json.Marshal(ul.Tags); err == nil { + clog.Tags = tags + } else { + log.Println("json error", ul.Tags, clog.Tags) + } + } + + // streamer.Uid = source.StreamerId.Int64 + // streamer.UpdateTime = &source.UpdateTime + if clog.Tags != nil { + streamer.Tags = clog.Tags + } + clog.Platform = intimate.Popenrec + clog.UserId = userId + clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} + clog.StreamerUid = streamer.Uid + + logUid, err := intimate.TClog.InsertRetAutoID(clog) + if err != nil { + log.Println(err) + return + } + + LiveUrl := "https://www.openrec.tv/live/" + userId + + streamer.LiveUrl = &sql.NullString{String: LiveUrl, Valid: true} + streamer.LatestLogUid = logUid + // streamer.Operator = 0 + + // log.Println(*streamer.UserId) + intimate.TStreamer.Update(streamer) + + // source.Operator = int32(intimate.OperatorExtractorOK) + // sstore.UpdateOperator(source) + +} diff --git a/extractor/twitcasting_extractor/twitcasting_extractor.go b/extractor/twitcasting_extractor/twitcasting_extractor.go index c0b6a8e..0fe20a6 100644 --- a/extractor/twitcasting_extractor/twitcasting_extractor.go +++ b/extractor/twitcasting_extractor/twitcasting_extractor.go @@ -5,7 +5,6 @@ import ( "encoding/json" "intimate" "log" - "os" "strconv" "strings" "time" @@ -34,26 +33,35 @@ func main() { ps := intimate.NewPerfectShutdown() ses := requests.NewSession() + streamerQueue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Ptwitcasting)) for !ps.IsClose() { - streamer, err := estore.Pop(intimate.Ptwitcasting) + // streamer, err := estore.Pop(intimate.Ptwitcasting) + isteamer, err := streamerQueue.Pop() if err != nil { - log.Println(err, streamer) + log.Println(err, isteamer) + continue } - streamer.LiveUrl = sql.NullString{String: "https://twitcasting.tv/" + streamer.UserId, Valid: true} + streamer := isteamer.(*intimate.Streamer) + streamer.LiveUrl = &sql.NullString{String: "https://twitcasting.tv/" + *streamer.UserId, Valid: true} resp, err := ses.Get(streamer.LiveUrl.String).Execute() if err != nil { - estore.UpdateError(streamer, err) - log.Println(err, streamer.UserId) + intimate.TStreamer.UpdateError(streamer, err) + log.Println(err, *streamer.UserId) continue } var ldata *LiveData - f, _ := os.OpenFile("./twistcasting.html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) - f.Write(resp.Content()) + // f, _ := os.OpenFile("./twistcasting.html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) + // f.Write(resp.Content()) etor := extractor.ExtractHtml(resp.Content()) - ldata = etor.GetObjectByTag(LiveData{}).(*LiveData) + ildata := etor.GetObjectByTag(LiveData{}) + if ildata == nil { + log.Println(streamer.LiveUrl.String) + continue + } + ldata = ildata.(*LiveData) // ldata.MaxViews = regexp.MustCompile("\\d+").FindString(ldata.MaxViews) coincount := 0 @@ -62,14 +70,14 @@ func main() { giverurl := streamer.LiveUrl.String + "/backers/" + strconv.Itoa(i) resp, err = ses.Get(giverurl).Execute() if err != nil { - estore.UpdateError(streamer, err) + intimate.TStreamer.UpdateError(streamer, err) log.Panic(err) } etor := extractor.ExtractHtml(resp.Content()) xp, err := etor.XPaths("//td[@class='tw-memorial-table-recent-point']") if err != nil { - estore.UpdateError(streamer, err) + intimate.TStreamer.UpdateError(streamer, err) log.Panic(err) } @@ -100,20 +108,20 @@ func main() { } streamer.Platform = intimate.Ptwitcasting - streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} - streamer.UserName = sql.NullString{String: ldata.UserName, Valid: true} + streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true} + streamer.UserName = &sql.NullString{String: ldata.UserName, Valid: true} streamer.Operator = 0 streamer.Tags = tags // streamer.UpdateInterval = 60 clog := &intimate.CollectLog{} - clog.UserId = streamer.UserId - clog.Gratuity = sql.NullInt64{Int64: int64(coincount), Valid: true} + clog.UserId = *streamer.UserId + clog.Gratuity = &sql.NullInt64{Int64: int64(coincount), Valid: true} clog.Platform = streamer.Platform clog.UpdateTime = streamer.UpdateTime - clog.LiveTitle = sql.NullString{String: ldata.LiveTitle, Valid: true} + clog.LiveTitle = &sql.NullString{String: ldata.LiveTitle, Valid: true} clog.Tags = tags - clog.Followers = sql.NullInt64{Int64: int64(ldata.Follower), Valid: true} + clog.Followers = &sql.NullInt64{Int64: int64(ldata.Follower), Valid: true} switch { case ldata.Follower <= 100: streamer.UpdateInterval = 720 @@ -125,12 +133,12 @@ func main() { streamer.UpdateInterval = 120 } - clog.Views = sql.NullInt64{Int64: ldata.MaxViews, Valid: true} + clog.Views = &sql.NullInt64{Int64: ldata.MaxViews, Valid: true} if ldata.LiveStart != "" { st, err := time.Parse("Mon, 02 Jan 2006 15:04:05 -0700", ldata.LiveStart) if err == nil { startTime := st - clog.LiveStartTime = sql.NullTime{Time: startTime, Valid: true} + clog.LiveStartTime = &sql.NullTime{Time: startTime, Valid: true} dt, err := strconv.Atoi(ldata.LiveDuration) liveduration := time.Now().Sub(startTime) @@ -149,7 +157,7 @@ func main() { if err == nil { endTime := startTime.Add((time.Duration)(dt) * time.Millisecond) - clog.LiveEndTime = sql.NullTime{Time: endTime, Valid: true} + clog.LiveEndTime = &sql.NullTime{Time: endTime, Valid: true} } else { log.Println(err, streamer.UserId) } @@ -158,8 +166,16 @@ func main() { } } - streamer.LatestLogUid = estore.InsertClog(clog) - estore.UpdateStreamer(streamer) - log.Println(streamer.UserId) + clog.StreamerUid = streamer.Uid + uid, err := intimate.TClog.InsertRetAutoID(clog) + if err != nil { + log.Println(err) + continue + } + + streamer.LatestLogUid = uid + intimate.TStreamer.Update(streamer) + // estore.UpdateStreamer(streamer) + log.Println(*streamer.UserId) } } diff --git a/extractor_field.go b/extractor_field.go index d4266da..3107771 100644 --- a/extractor_field.go +++ b/extractor_field.go @@ -3,26 +3,25 @@ package intimate import ( "database/sql" "reflect" - "time" ) type GetSet struct { } type StreamerList struct { - UrlHash []byte // - Platform Platform // - Url string // + UrlHash string `field:"urlhash" ` // + Platform string `field:"platform" ` // + Url string `field:"url" ` // - Label sql.NullString // + Label *sql.NullString `field:"label" ` // - Serialize interface{} + Serialize interface{} `field:"serialize" ` - UpdateInterval int32 - UpdateTime time.Time // + UpdateInterval int32 `field:"update_interval" ` + UpdateTime *sql.NullTime `field:"update_time" ` // - ErrorMsg sql.NullString - Operator int32 + ErrorMsg *sql.NullString `field:"error_msg" ` // + Operator int32 `field:"operator" ` LastOperator int32 } @@ -51,7 +50,7 @@ type Streamer struct { IsUpdateStreamer bool // 更新上面的内容 IsUpdateUrl bool UpdateInterval int32 `field:"update_interval"` - UpdateUrl interface{} `field:"update_url"` + UpdateUrl interface{} `field:"update_url"` // TODO: nil LatestLogUid int64 `field:"latest_log_uid"` UpdateTime *sql.NullTime `field:"update_time"` // @@ -72,24 +71,24 @@ func (ai *Streamer) Set(field string, value interface{}) { } type CollectLog struct { - LogUid int64 // 日志id - StreamerUid int64 // StreamerId 表id与 + LogUid int64 `field:"log_uid"` // 日志id + StreamerUid int64 `field:"streamer_uid"` // StreamerId 表id与 - Platform Platform // - UserId string // 平台的UserId - IsLiveStreaming bool // - IsError bool // - Followers sql.NullInt64 // - Views sql.NullInt64 // - Giver interface{} // - Gratuity sql.NullInt64 // - LiveTitle sql.NullString // - LiveStartTime sql.NullTime // - LiveEndTime sql.NullTime // - UpdateTime sql.NullTime // - Tags interface{} - Ext interface{} // - ErrorMsg sql.NullString // + Platform Platform `field:"platform"` // + UserId string `field:"user_id"` // 平台的UserId + IsLiveStreaming bool `field:"is_live_streaming"` // + IsError bool `field:"is_error"` // + Followers *sql.NullInt64 `field:"followers"` // + Views *sql.NullInt64 `field:"views"` // + Giver interface{} `field:"giver"` // + Gratuity *sql.NullInt64 `field:"gratuity"` // + LiveTitle *sql.NullString `field:"live_title"` // + LiveStartTime *sql.NullTime `field:"live_start_time"` // + LiveEndTime *sql.NullTime `field:"live_end_time"` // + UpdateTime *sql.NullTime `field:"update_time"` // + Tags interface{} `field:"tags"` + Ext interface{} `field:"ext"` // + ErrorMsg *sql.NullString `field:"error_msg"` // } // Get Simple Value diff --git a/tasks/nimo/nimo_task1/nimo_task1.go b/tasks/nimo/nimo_task1/nimo_task1.go index 96281a3..a5def5c 100644 --- a/tasks/nimo/nimo_task1/nimo_task1.go +++ b/tasks/nimo/nimo_task1/nimo_task1.go @@ -70,13 +70,13 @@ func Execute() { if userid := room.Get("id").String(); userid != "" { - streamer.UserId = userid - streamer.LiveUrl = sql.NullString{String: "https://www.nimo.tv/live/" + userid, Valid: true} + streamer.UserId = &userid + streamer.LiveUrl = &sql.NullString{String: "https://www.nimo.tv/live/" + userid, Valid: true} channel := room.Get("roomTypeName").String() - streamer.Channel = sql.NullString{String: channel, Valid: channel != ""} + streamer.Channel = &sql.NullString{String: channel, Valid: channel != ""} username := room.Get("anchorName").String() - streamer.UserName = sql.NullString{String: username, Valid: username != ""} + streamer.UserName = &sql.NullString{String: username, Valid: username != ""} if rtags := room.Get("anchorLabels"); rtags.IsArray() { diff --git a/tasks/openrec/openrec_task1/task_openrec.go b/tasks/openrec/openrec_task1/task_openrec.go index 40ee747..c46800d 100644 --- a/tasks/openrec/openrec_task1/task_openrec.go +++ b/tasks/openrec/openrec_task1/task_openrec.go @@ -11,11 +11,11 @@ import ( "github.com/tidwall/gjson" ) -// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) +// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) -// estore 解析存储连接实例 -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() +// // estore 解析存储连接实例 +// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() // Execute 执行方法 func Execute() { @@ -71,7 +71,7 @@ func Execute() { userid := User.Get("channel.id").String() streamer := &intimate.Streamer{} - streamer.UserId = userid + streamer.UserId = &userid streamer.Platform = intimate.Popenrec updateUrl := make(map[string]interface{}) @@ -83,15 +83,16 @@ func Execute() { updateUrlBytes, err := json.Marshal(updateUrl) if err != nil { - estore.UpdateError(streamer, err) + intimate.TStreamer.UpdateError(streamer, err) continue } streamer.UpdateUrl = updateUrlBytes - estore.InsertStreamer(streamer) + intimate.TStreamer.Insert(streamer) } } + log.Println("streamer count:", len(result.Array()), tp.ParsedURL.String()) // 修改url query 参数的page递增. 遍历所有页面 tp.QueryParam("page").IntAdd(1) time.Sleep(time.Second * 1) diff --git a/tasks/openrec/openrec_task2/.gitignore b/tasks/openrec/openrec_task2/.gitignore deleted file mode 100644 index 374c872..0000000 --- a/tasks/openrec/openrec_task2/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -openrec_task2 -log diff --git a/tasks/openrec/openrec_task2/main.go b/tasks/openrec/openrec_task2/main.go deleted file mode 100644 index 736ef31..0000000 --- a/tasks/openrec/openrec_task2/main.go +++ /dev/null @@ -1,5 +0,0 @@ -package main - -func main() { - Execute() -} diff --git a/tasks/openrec/openrec_task2/task_openrec.go b/tasks/openrec/openrec_task2/task_openrec.go deleted file mode 100644 index abfad26..0000000 --- a/tasks/openrec/openrec_task2/task_openrec.go +++ /dev/null @@ -1,154 +0,0 @@ -package main - -import ( - "database/sql" - "encoding/json" - "intimate" - "log" - "time" - - "github.com/474420502/gcurl" - "github.com/474420502/requests" - "github.com/tidwall/gjson" -) - -// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec)) - -// estore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_extractor.sql -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() - -func init() { - -} - -// Execute 执行方法 -func Execute() { - - ps := intimate.NewPerfectShutdown() - ses := requests.NewSession() - - var lasterr error = nil - - for !ps.IsClose() { - - streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析 - - if streamer == nil || err != nil { - if err != lasterr { - log.Println(err, lasterr) - lasterr = err - } - time.Sleep(time.Second * 2) - continue - } - - userId := streamer.UserId - - var updateUrl map[string]string - - err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url - if err != nil { - log.Println(err) - continue - } - // Check Userid - - userUrl := updateUrl["user"] - log.Println(userUrl) - tp := ses.Get(userUrl) // 获取user url页面数据 - resp, err := tp.Execute() - streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} - - if err != nil { - log.Println(err) - estore.UpdateError(streamer, err) - continue - } - - cookies := ses.GetCookies(tp.GetParsedURL()) - - scurl := updateUrl["supporters"] //获取打赏者的数据 - curl := gcurl.Parse(scurl) - supportersSession := curl.CreateSession() - - temporary := curl.CreateTemporary(supportersSession) - supportersSession.SetCookies(temporary.GetParsedURL(), cookies) - var supporters []string - for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码 - - supportersQuery := temporary.GetQuery() - - for _, cookie := range cookies { - if cookie.Name == "uuid" { - supportersQuery.Set("Uuid", cookie.Value) - continue - } - - if cookie.Name == "token" { - supportersQuery.Set("Token", cookie.Value) - continue - } - - if cookie.Name == "random" { - supportersQuery.Set("Random", cookie.Value) - continue - } - } - - supportersQuery.Set("identify_id", userId) - temporary.SetQuery(supportersQuery) - - resp, err := temporary.Execute() - if err != nil { - log.Println(err) - } - supporterjson := gjson.ParseBytes(resp.Content()) - supporterdata := supporterjson.Get("data") //解析supporters获取的json数据 - if supporterdata.Type == gjson.Null { - break - } - supporters = append(supporters, string(resp.Content())) - - temporary.QueryParam("page_number").IntAdd(1) - } - - // cookies := cxt.Session().GetCookies(wf.GetParsedURL()) - ext := make(map[string]interface{}) - - ext["json_supporters"] = supporters - ext["html_user"] = string(resp.Content()) - - liveUrl := updateUrl["live"] - tp = ses.Get(liveUrl) - resp, err = tp.Execute() - if err != nil { - log.Println(err) - estore.UpdateError(streamer, err) - continue - } - ext["html_live"] = string(resp.Content()) - ext["var_user_id"] = userId - - extJsonBytes, err := json.Marshal(ext) - if err != nil { - log.Println(err) - estore.UpdateError(streamer, err) - continue - } - - // streamer.Platform = intimate.Popenrec - streamer.UpdateInterval = 120 - streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} - streamer.Operator = 0 - - source := &intimate.Source{} - source.Target = intimate.TOpenrecUser - source.Ext = string(extJsonBytes) - source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true} - sstore.Insert(source) - - estore.UpdateStreamer(streamer) - } - -} diff --git a/tasks/openrec/openrec_task2/task_openrec_test.go b/tasks/openrec/openrec_task2/task_openrec_test.go deleted file mode 100644 index 80820ef..0000000 --- a/tasks/openrec/openrec_task2/task_openrec_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package main - -import ( - "testing" -) - -func TestMain(t *testing.T) { - main() -} diff --git a/tasks/twitcasting/twitcasting_task1/twitcasting.go b/tasks/twitcasting/twitcasting_task1/twitcasting.go index dbd5bdf..44a733f 100644 --- a/tasks/twitcasting/twitcasting_task1/twitcasting.go +++ b/tasks/twitcasting/twitcasting_task1/twitcasting.go @@ -75,8 +75,10 @@ func Execute() { sl.Operator = 0 sl.UpdateInterval = 120 sl.UpdateTime = time.Now() + sl.UrlHash = intimate.GetUrlHash(sl.Url) - estore.InsertStreamerList(sl) + intimate.TStreamerList.Insert(sl) + // estore.InsertStreamerList(sl) queue.Put(wurl) queuedict[wurl] = true @@ -107,7 +109,8 @@ func Execute() { sl.Operator = 0 sl.UpdateInterval = 120 sl.UpdateTime = time.Now() - estore.InsertStreamerList(sl) + sl.UrlHash = intimate.GetUrlHash(sl.Url) + intimate.TStreamerList.Insert(sl) queue.Put(wurl) queuedict[wurl] = true diff --git a/tasks/twitch/twitch_task1/task_twitch.go b/tasks/twitch/twitch_task1/task_twitch.go index 03fcaf2..8a6fc0d 100644 --- a/tasks/twitch/twitch_task1/task_twitch.go +++ b/tasks/twitch/twitch_task1/task_twitch.go @@ -1,7 +1,6 @@ package main import ( - "database/sql" "intimate" "log" "time" @@ -9,100 +8,120 @@ import ( "github.com/tebeka/selenium" ) -// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql -var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) +// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql +// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch)) -// estore 解析存储连接实例 -var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() +// // estore 解析存储连接实例 +// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor() // 获取类型的所有频道链接 // Execute 执行任务 func Execute() { - var err error - wd := intimate.GetChromeDriver(3030) + ps := intimate.NewPerfectShutdown() - weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" - err = wd.Get(weburl) - if err != nil { - panic(err) - } + for !ps.IsClose() { + var err error + wd := intimate.GetChromeDriver(3030) - cardCondition := func(wd selenium.WebDriver) (bool, error) { - elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") + weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT" + err = wd.Get(weburl) if err != nil { - return false, err - } - return len(elements) > 0, nil - } - wd.WaitWithTimeout(cardCondition, time.Second*15) - time.Sleep(time.Second) - - e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") - if err != nil { - panic(err) - } - e.Click() - - var hrefs map[string]bool = make(map[string]bool) - var delayerror = 5 - for i := 0; i <= 200; i++ { - cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") - if err != nil { - log.Println(err) - break + panic(err) } - if len(hrefs) == 0 { - delayerror-- - if delayerror <= 0 { + cardCondition := func(wd selenium.WebDriver) (bool, error) { + elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") + if err != nil { + return false, err + } + return len(elements) > 0, nil + } + wd.WaitWithTimeout(cardCondition, time.Second*15) + time.Sleep(time.Second) + + e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']") + if err != nil { + panic(err) + } + e.Click() + + var lasthreflen = 0 + var hrefs map[string]bool = make(map[string]bool) + var delayerror = 5 + for i := 0; i <= 200; i++ { + cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]") + if err != nil { + log.Println(err) break } - } else { - delayerror = 5 - } - for ii := 0; ii < 10; ii++ { - for _, card := range cards { - href, err := card.GetAttribute("href") - if err != nil { - log.Println(href, err) - continue - } else { - hrefs[href] = true + if len(hrefs) == lasthreflen { + delayerror-- + if delayerror <= 0 { + break } + } else { + delayerror = 7 } - break - } + lasthreflen = len(hrefs) - if ps.IsClose() { - break - } + for ii := 0; ii < 10; ii++ { + for _, card := range cards { + href, err := card.GetAttribute("href") + if err != nil { + log.Println(href, err) + continue + } else { + hrefs[href] = true + } + } + break + } - if len(cards) > 10 { - log.Println(len(cards)) - wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null); + if ps.IsClose() { + break + } + + if len(cards) > 10 { + log.Println(len(cards)) + wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null); for (var i = 0; i < items.snapshotLength - 10; i++) { item = items.snapshotItem(i); item.remove() ;};`, nil) + } + time.Sleep(time.Millisecond * 200) + wd.KeyDown(selenium.EndKey) + time.Sleep(time.Millisecond * 200) + wd.KeyUp(selenium.EndKey) + time.Sleep(time.Millisecond * 2500) } - time.Sleep(time.Millisecond * 200) - wd.KeyDown(selenium.EndKey) - time.Sleep(time.Millisecond * 200) - wd.KeyUp(selenium.EndKey) - time.Sleep(time.Millisecond * 2500) + + for href := range hrefs { + + sl := &intimate.StreamerList{} + sl.Url = href + sl.UrlHash = intimate.GetUrlHash(sl.Url) + sl.Platform = string(intimate.Ptwitch) + sl.UpdateTime = intimate.GetUpdateTimeNow() + err := intimate.TStreamerList.Insert(sl) + if err != nil { + log.Println(err) + } + + // TODO: Save href + // source := &intimate.Source{} + // source.Source = sql.NullString{String: href, Valid: true} + // source.Operator = 0 + // source.Target = intimate.TTwitchChannel + // source.Url = weburl + // sstore.Insert(source) + } + + log.Println("hrefs len:", len(hrefs)) + // sstore.Deduplicate(intimate.TTwitchChannel, "source") + + wd.Close() + wd.Quit() + time.Sleep(time.Minute * 30) } - - for href := range hrefs { - - // TODO: Save href - source := &intimate.Source{} - source.Source = sql.NullString{String: href, Valid: true} - source.Operator = 0 - source.Target = intimate.TTwitchChannel - source.Url = weburl - sstore.Insert(source) - } - - log.Println("hrefs len:", len(hrefs)) - sstore.Deduplicate(intimate.TTwitchChannel, "source") } diff --git a/utils.go b/utils.go index 6bf7713..1a699b7 100644 --- a/utils.go +++ b/utils.go @@ -1,6 +1,7 @@ package intimate import ( + "crypto/md5" "database/sql" "fmt" "log" @@ -34,6 +35,10 @@ func GetUpdateTimeNow() *sql.NullTime { return &sql.NullTime{Time: time.Now().Add(-time.Hour * 100000), Valid: true} } +func GetUrlHash(urlstr string) string { + return fmt.Sprintf("%x", md5.Sum([]byte(urlstr))) +} + // ParseNumber 去逗号解析数字 func ParseNumber(num string) (int64, error) { num = strings.Trim(num, " ")